---

title: Debezium CDC与Kafka入湖实践

keywords:

  • Debezium
  • CDC
  • Kafka
  • Postgres
  • MySQL
  • 变更数据捕获

description: 使用Debezium从数据库捕获变更并写入Kafka,提供可验证的连接器配置与主题治理方法,支撑入湖与实时处理。

date: 2025-11-26

tags:

  • CDC
  • Debezium
  • Kafka
  • MySQL
  • PostgreSQL
  • 变更数据捕获
  • 数据

categories:

  • 文章资讯
  • 技术教程

---

概述

  • 目标:以Debezium捕获数据库变更日志(INSERT/UPDATE/DELETE),写入Kafka主题,为实时计算与数据入湖提供可靠来源。
  • 适用:PostgreSQL/MySQL等数据库的事务级变更同步。

核心与实战

  • PostgreSQL连接器配置(JSON):
{
  "name": "postgres-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "db",
    "database.port": "5432",
    "database.user": "repl",
    "database.password": "secret",
    "database.dbname": "app",
    "plugin.name": "pgoutput",
    "slot.name": "debezium_slot",
    "publication.autocreate.mode": "filtered",
    "tombstones.on.delete": "false",
    "include.schema.changes": "false",
    "table.include.list": "public.orders,public.users",
    "topic.prefix": "appdb",
    "snapshot.mode": "initial"
  }
}
  • MySQL连接器要点:
-- 需开启binlog与行格式 ROW;Debezium使用server.id与include list控制范围
  • Kafka主题治理:
kafka-topics.sh --create --topic appdb.public.orders --partitions 6 --replication-factor 3 --bootstrap-server broker:9092
kafka-configs.sh --alter --entity-type topics --entity-name appdb.public.orders --add-config min.insync.replicas=2 --bootstrap-server broker:9092

示例

  • 提交连接器(Kafka Connect REST):
curl -X POST http://connect:8083/connectors -H 'Content-Type: application/json' -d @postgres-connector.json
  • 验证消息:
kafka-console-consumer.sh --topic appdb.public.orders --bootstrap-server broker:9092 --from-beginning --property print.key=true
  • 变更格式(示例):
{"before":null,"after":{"id":1,"status":"PAID"},"op":"c","ts_ms":1732617600000}

验证与监控

  • 连接器状态:
curl -s http://connect:8083/connectors/postgres-connector/status | jq
  • 复制槽与发布:
SELECT * FROM pg_replication_slots;
SELECT * FROM pg_publication_tables;
  • 消费延迟与积压:
  • 监控consumer lag与分区数据量,确保下游处理能力匹配。

常见误区

  • 未开启逻辑复制或binlog导致无变更;需配置数据库日志方式。
  • 主题分区与ISR不足导致写入风险;需设置min.insync.replicasacks=all
  • 删除生成墓碑消息影响下游;可设置tombstones.on.delete=false或在下游处理。

结语

  • Debezium为数据库CDC提供稳定通道,结合Kafka主题治理与监控可支撑实时入湖与流式处理的核心链路。

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论
立即
投稿

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部