---

title: Kafka Producer 幂等与事务写入实践

keywords: enable.idempotence, transactional.id, acks, max.in.flight.requests.per.connection,

initTransactions

description: 配置 Kafka Producer 幂等与事务写入,保证端到端一致性与去重;示例展示事务开启、发送与提交流程。

tags:

  • Kafka
  • acks
  • enable.idempotence
  • initTransactions
  • max.in.flight.requests.per.connection
  • transactional.id
  • 数据流
  • 生产者

categories:

  • 文章资讯
  • 技术教程

---

Java Producer 属性与事务示例:

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class TxProducer {
  public static void main(String[] args) throws Exception {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    props.put(ProducerConfig.ACKS_CONFIG, "all");
    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
    props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "orders-tx-1");

    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    producer.initTransactions();

    try {
      producer.beginTransaction();
      producer.send(new ProducerRecord<>("orders", "id-1", "{\"amount\":9.99}"));
      producer.send(new ProducerRecord<>("orders", "id-2", "{\"amount\":19.99}"));
      producer.commitTransaction();
    } catch (Exception e) {
      producer.abortTransaction();
    } finally {
      producer.close();
    }
  }
}

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论
立即
投稿

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部