---

title: Kafka Connect S3 Sink 连接器配置与对象存储落盘实践

keywords: Kafka Connect, S3 Sink, TimeBasedPartitioner, JsonFormat, flush.size, plugin.path,

REST API

description: 配置并部署 S3 Sink 连接器,将 Kafka 主题按时间分区写入对象存储并控制落盘粒度与滚动周期。

tags:

  • JsonFormat
  • Kafka
  • Kafka Connect
  • REST API
  • S3
  • S3 Sink
  • TimeBasedPartitioner
  • flush.size
  • plugin.path
  • 数据流

categories:

  • 文章资讯
  • 技术教程

---

Connect Worker 基本配置:

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
plugin.path=/usr/share/java,/etc/kafka-connect/jars
rest.port=8083

S3 Sink 连接器配置(s3-sink.json):

{
  "name": "s3-sink",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "2",
    "topics": "orders",
    "s3.bucket.name": "my-bucket",
    "s3.region": "us-east-1",
    "aws.access.key.id": "AKIA...",
    "aws.secret.access.key": "SECRET",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "flush.size": "1000",
    "rotate.interval.ms": "600000",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "path.format": "yyyy/MM/dd/HH",
    "timezone": "UTC",
    "locale": "en",
    "timestamp.extractor": "Record",
    "schema.compatibility": "NONE"
  }
}

通过 REST 创建并查看状态:

curl -X POST -H "Content-Type: application/json" --data @s3-sink.json http://localhost:8083/connectors
curl http://localhost:8083/connectors/s3-sink/status

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论
立即
投稿

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部