概述CDC通过变更日志捕获数据库写入并转发到消息系统。Outbox模式将业务事件写入专用表,避免直接监听业务表带来的耦合与隐藏字段问题,提升一致性与可追踪性。关键实践与参数连接器: `database.history.kafka.bootstrap.servers` 与 `table.include.list`Outbox表: 事件 `aggregate_type` `aggregate_id` `type` `payload`事务性写入: 业务写入与Outbox在同一事务路由转换: 使用 SMT 将Outbox事件路由至主题示例/配置/实现{ "name": "pg-outbox", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "plugin.name": "pgoutput", "database.hostname": "db", "database.port": "5432", "database.user": "replicator", "database.password": "secret", "database.dbname": "shop", "table.include.list": "public.outbox", "transforms": "outbox", "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter", "transforms.outbox.route.by.field": "aggregate_type", "transforms.outbox.route.topic.replacement": "events.${routedBy}" } } CREATE TABLE outbox ( id BIGSERIAL PRIMARY KEY, aggregate_type TEXT NOT NULL, aggregate_id BIGINT NOT NULL, type TEXT NOT NULL, payload JSONB NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT now() ); BEGIN; INSERT INTO orders(id, customer_id, status) VALUES (1001, 42, 'PAID'); INSERT INTO outbox(aggregate_type, aggregate_id, type, payload) VALUES ('order', 1001, 'OrderPaid', '{"id":1001,"status":"PAID"}'); COMMIT; 验证事务一致: 在事务失败时Outbox不写入事件路由正确: 事件进入 `events.order` 主题且载荷正确重放安全: 使用消费位点与幂等处理避免重复副作用审计: 保留Outbox与消费日志以追溯注意事项控制Outbox表体量与归档连接器权限与安全配置SMT路由规则与主题命名稳定下游处理幂等与告警

发表评论 取消回复