---
title: Kafka Producer 幂等与事务写入实践
keywords: enable.idempotence, transactional.id, acks, max.in.flight.requests.per.connection,
initTransactions
description: 配置 Kafka Producer 幂等与事务写入,保证端到端一致性与去重;示例展示事务开启、发送与提交流程。
tags:
- Kafka
- acks
- enable.idempotence
- initTransactions
- max.in.flight.requests.per.connection
- transactional.id
- 数据流
- 生产者
categories:
- 文章资讯
- 技术教程
---
Java Producer 属性与事务示例:
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class TxProducer {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "orders-tx-1");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("orders", "id-1", "{\"amount\":9.99}"));
producer.send(new ProducerRecord<>("orders", "id-2", "{\"amount\":19.99}"));
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
} finally {
producer.close();
}
}
}

发表评论 取消回复