创建流与持久消费者(CLI):
nats stream add ORDERS --subjects orders.* --storage file --retention limits --max-age 72h --replicas 1
nats consumer add ORDERS worker --filter orders.created --pull --ack explicit --max-deliver 5
Node.js 订阅处理(手动确认):
import { connect } from 'nats'
const nc = await connect({ servers: 'nats://127.0.0.1:4222' })
const js = nc.jetstream()
const sub = await js.subscribe('orders.created', { durable: 'worker', manualAck: true })
for await (const m of sub) {
try {
const data = JSON.parse(m.string())
// 业务处理...
m.ack()
} catch (e) {
// 未 ack 将触发重投递,受 max-deliver 控制
}
}

发表评论 取消回复