--- title: Kafka Connect S3 Sink 连接器配置与对象存储落盘实践 keywords: Kafka Connect, S3 Sink, TimeBasedPartitioner, JsonFormat, flush.size, plugin.path, REST API description: 配置并部署 S3 Sink 连接器,将 Kafka 主题按时间分区写入对象存储并控制落盘粒度与滚动周期。 tags: - JsonFormat - Kafka - Kafka Connect - REST API - S3 - S3 Sink - TimeBasedPartitioner - flush.size - plugin.path - 数据流 categories: - 文章资讯 - 技术教程 --- Connect Worker 基本配置: ``` bootstrap.servers=localhost:9092 key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable=false plugin.path=/usr/share/java,/etc/kafka-connect/jars rest.port=8083 ``` S3 Sink 连接器配置(s3-sink.json): ``` { "name": "s3-sink", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": "2", "topics": "orders", "s3.bucket.name": "my-bucket", "s3.region": "us-east-1", "aws.access.key.id": "AKIA...", "aws.secret.access.key": "SECRET", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "flush.size": "1000", "rotate.interval.ms": "600000", "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner", "path.format": "yyyy/MM/dd/HH", "timezone": "UTC", "locale": "en", "timestamp.extractor": "Record", "schema.compatibility": "NONE" } } ``` 通过 REST 创建并查看状态: ``` curl -X POST -H "Content-Type: application/json" --data @s3-sink.json http://localhost:8083/connectors curl http://localhost:8083/connectors/s3-sink/status ```

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论
立即
投稿

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部