应用属性与拓扑:Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "orders-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); props.put(StreamsConfig.producerPrefix("transaction.timeout.ms"), 60000); StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> src = builder.stream("orders"); KStream<String, Long> agg = src.groupByKey() .count(Materialized.as("orders-count")) .toStream(); agg.to("orders_count", Produced.with(Serdes.String(), Serdes.Long())); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start();

发表评论 取消回复