Java Producer 属性与事务示例:import org.apache.kafka.clients.producer.*; import java.util.Properties; public class TxProducer { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "orders-tx-1"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); producer.initTransactions(); try { producer.beginTransaction(); producer.send(new ProducerRecord<>("orders", "id-1", "{\"amount\":9.99}")); producer.send(new ProducerRecord<>("orders", "id-2", "{\"amount\":19.99}")); producer.commitTransaction(); } catch (Exception e) { producer.abortTransaction(); } finally { producer.close(); } } }

发表评论 取消回复