概述
- 目标:通过Kafka引擎订阅主题并用物化视图写入MergeTree,实现稳定的实时摄取与查询。
- 适用:事件日志、订单变化、用户行为流。
核心与实战
- Kafka源表:
```
CREATE TABLE kafka_orders (
id UInt64,
status String,
amount Float64,
ts DateTime
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'broker1:9092,broker2:9092',
kafka_topic_list = 'appdb.public.orders',
kafka_group_name = 'ch_orders_consumer',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 2;
```
- 目标MergeTree表:
```
CREATE TABLE orders_rt (
id UInt64,
status String,
amount Float64,
ts DateTime
) ENGINE = MergeTree
ORDER BY (id, ts)
PARTITION BY toDate(ts);
```
- 物化视图连接:
```
CREATE MATERIALIZED VIEW mv_orders_rt TO orders_rt AS
SELECT id, status, amount, ts FROM kafka_orders;
```
示例
- 启动消费与检查:
```
SYSTEM START DISTRIBUTED SENDS;
SELECT count() FROM orders_rt;
```
- 生产测试消息:
```
kafka-console-producer.sh --topic appdb.public.orders --bootstrap-server broker1:9092 <

发表评论 取消回复