概述 - 目标:以Debezium捕获数据库变更日志(INSERT/UPDATE/DELETE),写入Kafka主题,为实时计算与数据入湖提供可靠来源。 - 适用:PostgreSQL/MySQL等数据库的事务级变更同步。 核心与实战 - PostgreSQL连接器配置(JSON): ``` { "name": "postgres-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "db", "database.port": "5432", "database.user": "repl", "database.password": "secret", "database.dbname": "app", "plugin.name": "pgoutput", "slot.name": "debezium_slot", "publication.autocreate.mode": "filtered", "tombstones.on.delete": "false", "include.schema.changes": "false", "table.include.list": "public.orders,public.users", "topic.prefix": "appdb", "snapshot.mode": "initial" } } ``` - MySQL连接器要点: ``` -- 需开启binlog与行格式 ROW;Debezium使用server.id与include list控制范围 ``` - Kafka主题治理: ``` kafka-topics.sh --create --topic appdb.public.orders --partitions 6 --replication-factor 3 --bootstrap-server broker:9092 kafka-configs.sh --alter --entity-type topics --entity-name appdb.public.orders --add-config min.insync.replicas=2 --bootstrap-server broker:9092 ``` 示例 - 提交连接器(Kafka Connect REST): ``` curl -X POST http://connect:8083/connectors -H 'Content-Type: application/json' -d @postgres-connector.json ``` - 验证消息: ``` kafka-console-consumer.sh --topic appdb.public.orders --bootstrap-server broker:9092 --from-beginning --property print.key=true ``` - 变更格式(示例): ``` {"before":null,"after":{"id":1,"status":"PAID"},"op":"c","ts_ms":1732617600000} ``` 验证与监控 - 连接器状态: ``` curl -s http://connect:8083/connectors/postgres-connector/status | jq ``` - 复制槽与发布: ``` SELECT * FROM pg_replication_slots; SELECT * FROM pg_publication_tables; ``` - 消费延迟与积压: - 监控`consumer lag`与分区数据量,确保下游处理能力匹配。 常见误区 - 未开启逻辑复制或binlog导致无变更;需配置数据库日志方式。 - 主题分区与ISR不足导致写入风险;需设置`min.insync.replicas`与`acks=all`。 - 删除生成墓碑消息影响下游;可设置`tombstones.on.delete=false`或在下游处理。 结语 - Debezium为数据库CDC提供稳定通道,结合Kafka主题治理与监控可支撑实时入湖与流式处理的核心链路。

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论
立即
投稿

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部