clickhouse-io
affaan-m
ClickHouse database patterns, query optimization, analytics, and data engineering best practices for high-performance analytical workloads.
bunx add-skill affaan-m/everything-claude-code -s clickhouse-ioLoading…
affaan-m
ClickHouse database patterns, query optimization, analytics, and data engineering best practices for high-performance analytical workloads.
bunx add-skill affaan-m/everything-claude-code -s clickhouse-ioLoading…
用於高效能分析和資料工程的 ClickHouse 特定模式。
ClickHouse 是一個列式資料庫管理系統(DBMS),用於線上分析處理(OLAP)。它針對大型資料集的快速分析查詢進行了優化。
關鍵特性:
CREATE TABLE markets_analytics (
date Date,
market_id String,
market_name String,
volume UInt64,
trades UInt32,
unique_traders UInt32,
avg_trade_size Float64,
created_at DateTime
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (date, market_id)
SETTINGS index_granularity = 8192;
-- 用於可能有重複的資料(例如來自多個來源)
CREATE TABLE user_events (
event_id String,
user_id String,
event_type String,
timestamp DateTime,
properties String
) ENGINE = ReplacingMergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (user_id, event_id, timestamp)
PRIMARY KEY (user_id, event_id);
-- 用於維護聚合指標
CREATE TABLE market_stats_hourly (
hour DateTime,
market_id String,
total_volume AggregateFunction(sum, UInt64),
total_trades AggregateFunction(count, UInt32),
unique_users AggregateFunction(uniq, String)
) ENGINE = AggregatingMergeTree()
PARTITION BY toYYYYMM(hour)
ORDER BY (hour, market_id);
-- 查詢聚合資料
SELECT
hour,
market_id,
sumMerge(total_volume) AS volume,
countMerge(total_trades) AS trades,
uniqMerge(unique_users) AS users
FROM market_stats_hourly
WHERE hour >= toStartOfHour(now() - INTERVAL 24 HOUR)
GROUP BY hour, market_id
ORDER BY hour DESC;
-- ✅ 良好:先使用索引欄位
SELECT *
FROM markets_analytics
WHERE date >= '2025-01-01'
AND market_id = 'market-123'
AND volume > 1000
ORDER BY date DESC
LIMIT 100;
-- ❌ 不良:先過濾非索引欄位
SELECT *
FROM markets_analytics
WHERE volume > 1000
AND market_name LIKE '%election%'
AND date >= '2025-01-01';
-- ✅ 良好:使用 ClickHouse 特定聚合函式
SELECT
toStartOfDay(created_at) AS day,
market_id,
sum(volume) AS total_volume,
count() AS total_trades,
uniq(trader_id) AS unique_traders,
avg(trade_size) AS avg_size
FROM trades
WHERE created_at >= today() - INTERVAL 7 DAY
GROUP BY day, market_id
ORDER BY day DESC, total_volume DESC;
-- ✅ 使用 quantile 計算百分位數(比 percentile 更高效)
SELECT
quantile(0.50)(trade_size) AS median,
quantile(0.95)(trade_size) AS p95,
quantile(0.99)(trade_size) AS p99
FROM trades
WHERE created_at >= now() - INTERVAL 1 HOUR;
-- 計算累計總和
SELECT
date,
market_id,
volume,
sum(volume) OVER (
PARTITION BY market_id
ORDER BY date
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS cumulative_volume
FROM markets_analytics
WHERE date >= today() - INTERVAL 30 DAY
ORDER BY market_id, date;
import { ClickHouse } from 'clickhouse'
const clickhouse = new ClickHouse({
url: process.env.CLICKHOUSE_URL,
port: 8123,
basicAuth: {
username: process.env.CLICKHOUSE_USER,
password: process.env.CLICKHOUSE_PASSWORD
}
})
// ✅ 批量插入(高效)
async function bulkInsertTrades(trades: Trade[]) {
const values = trades.map(trade => `(
'${trade.id}',
'${trade.market_id}',
'${trade.user_id}',
${trade.amount},
'${trade.timestamp.toISOString()}'
)`).join(',')
await clickhouse.query(`
INSERT INTO trades (id, market_id, user_id, amount, timestamp)
VALUES ${values}
`).toPromise()
}
// ❌ 個別插入(慢)
async function insertTrade(trade: Trade) {
// 不要在迴圈中這樣做!
await clickhouse.query(`
INSERT INTO trades VALUES ('${trade.id}', ...)
`).toPromise()
}
// 用於持續資料攝取
import { createWriteStream } from 'fs'
import { pipeline } from 'stream/promises'
async function streamInserts() {
const stream = clickhouse.insert('trades').stream()
for await (const batch of dataSource) {
stream.write(batch)
}
await stream.end()
}
-- 建立每小時統計的物化視圖
CREATE MATERIALIZED VIEW market_stats_hourly_mv
TO market_stats_hourly
AS SELECT
toStartOfHour(timestamp) AS hour,
market_id,
sumState(amount) AS total_volume,
countState() AS total_trades,
uniqState(user_id) AS unique_users
FROM trades
GROUP BY hour, market_id;
-- 查詢物化視圖
SELECT
hour,
market_id,
sumMerge(total_volume) AS volume,
countMerge(total_trades) AS trades,
uniqMerge(unique_users) AS users
FROM market_stats_hourly
WHERE hour >= now() - INTERVAL 24 HOUR
GROUP BY hour, market_id;
-- 檢查慢查詢
SELECT
query_id,
user,
query,
query_duration_ms,
read_rows,
read_bytes,
memory_usage
FROM system.query_log
WHERE type = 'QueryFinish'
AND query_duration_ms > 1000
AND event_time >= now() - INTERVAL 1 HOUR
ORDER BY query_duration_ms DESC
LIMIT 10;
-- 檢查表格大小
SELECT
database,
table,
formatReadableSize(sum(bytes)) AS size,
sum(rows) AS rows,
max(modification_time) AS latest_modification
FROM system.parts
WHERE active
GROUP BY database, table
ORDER BY sum(bytes) DESC;
-- 每日活躍使用者
SELECT
toDate(timestamp) AS date,
uniq(user_id) AS daily_active_users
FROM events
WHERE timestamp >= today() - INTERVAL 30 DAY
GROUP BY date
ORDER BY date;
-- 留存分析
SELECT
signup_date,
countIf(days_since_signup = 0) AS day_0,
countIf(days_since_signup = 1) AS day_1,
countIf(days_since_signup = 7) AS day_7,
countIf(days_since_signup = 30) AS day_30
FROM (
SELECT
user_id,
min(toDate(timestamp)) AS signup_date,
toDate(timestamp) AS activity_date,
dateDiff('day', signup_date, activity_date) AS days_since_signup
FROM events
GROUP BY user_id, activity_date
)
GROUP BY signup_date
ORDER BY signup_date DESC;
-- 轉換漏斗
SELECT
countIf(step = 'viewed_market') AS viewed,
countIf(step = 'clicked_trade') AS clicked,
countIf(step = 'completed_trade') AS completed,
round(clicked / viewed * 100, 2) AS view_to_click_rate,
round(completed / clicked * 100, 2) AS click_to_completion_rate
FROM (
SELECT
user_id,
session_id,
event_type AS step
FROM events
WHERE event_date = today()
)
GROUP BY session_id;
-- 按註冊月份的使用者世代
SELECT
toStartOfMonth(signup_date) AS cohort,
toStartOfMonth(activity_date) AS month,
dateDiff('month', cohort, month) AS months_since_signup,
count(DISTINCT user_id) AS active_users
FROM (
SELECT
user_id,
min(toDate(timestamp)) OVER (PARTITION BY user_id) AS signup_date,
toDate(timestamp) AS activity_date
FROM events
)
GROUP BY cohort, month, months_since_signup
ORDER BY cohort, months_since_signup;
// 提取、轉換、載入
async function etlPipeline() {
// 1. 從來源提取
const rawData = await extractFromPostgres()
// 2. 轉換
const transformed = rawData.map(row => ({
date: new Date(row.created_at).toISOString().split('T')[0],
market_id: row.market_slug,
volume: parseFloat(row.total_volume),
trades: parseInt(row.trade_count)
}))
// 3. 載入到 ClickHouse
await bulkInsertToClickHouse(transformed)
}
// 定期執行
setInterval(etlPipeline, 60 * 60 * 1000) // 每小時
// 監聽 PostgreSQL 變更並同步到 ClickHouse
import { Client } from 'pg'
const pgClient = new Client({ connectionString: process.env.DATABASE_URL })
pgClient.query('LISTEN market_updates')
pgClient.on('notification', async (msg) => {
const update = JSON.parse(msg.payload)
await clickhouse.insert('market_updates', [
{
market_id: update.id,
event_type: update.operation, // INSERT, UPDATE, DELETE
timestamp: new Date(),
data: JSON.stringify(update.new_data)
}
])
})
記住:ClickHouse 擅長分析工作負載。為你的查詢模式設計表格,批量插入,並利用物化視圖進行即時聚合。
Use when you need to run Flow type checking, or when seeing Flow type errors in React code.
Use when you want to validate changes before committing, or when you need to check all React contribution requirements.
Use when feature flag tests fail, flags need updating, understanding @gate pragmas, debugging channel-specific test failures, or adding new flags to React.
Use when you need to check feature flag states, compare channels, or debug why a feature behaves differently across release channels.