## 概述 通过控制拉取批次与Ack策略、设置最大投递与等待时间,并结合客户端速率限制与队列回压,可在突发流量下维持稳定吞吐与一致性。 ## 关键实践与参数 - 拉取批次: `batch=32` `expires=2s` - Ack模式: 显式确认,最大投递 `max_deliver=5` - 并发与速率: 每消费者 `concurrency=4`,令牌桶限速 - 回压策略: 根据处理耗时动态调整批次与并发 ## 示例/配置/实现 ```bash nats stream add jobs --subjects "jobs.*" --retention limits --max-msgs 200000 --max-age 168h nats consumer add jobs worker --filter "jobs.image" --deliver all --ack explicit --max-deliver 5 --pull ``` ```go package main import ( "github.com/nats-io/nats.go" "time" ) func main() { nc, _ := nats.Connect(nats.DefaultURL) js, _ := nc.JetStream() sub, _ := js.PullSubscribe("jobs.image", "worker") for { msgs, _ := sub.Fetch(32, nats.MaxWait(2*time.Second)) for _, m := range msgs { // 处理 m.Ack() } time.Sleep(200 * time.Millisecond) } } ``` ## 验证 - 速率稳定: 在高并发下记录处理速率与延迟,确认无积压 - 重试控制: 失败消息最大投递不超过设定值 - 回压效果: 增加处理耗时后批次与并发调整有效 - 一致性: 无重复处理与丢失消息 ## 注意事项 - 批次与等待时间需结合业务耗时调优 - 处理逻辑必须幂等 - 监测滞留与错误率设置告警 - 安全管理访问凭证与主题权限

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论
立即
投稿

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部