本文目标是在不依赖外部队列中间件的前提下,使用 Redis Streams 构建稳定的事件流处理。所有命令可在 Redis 6/7 上直接验证,配合 `XINFO`/`XPENDING` 进行观测。
## 初始化与生产消息
# 创建流并写入消息
XADD mystream * type order event create payload '{"orderId":123}'
XADD mystream * type order event pay payload '{"orderId":123,"amount":99.00}'
# 创建消费者组(从头消费)
XGROUP CREATE mystream groupA 0 MKSTREAM
说明:`XADD` 写入具备全局递增 ID;`MKSTREAM` 在首次创建时自动生成流。
## 拉取与确认(至少一次)
# 消费者 c1 拉取(阻塞 5 秒)
XREADGROUP GROUP groupA c1 BLOCK 5000 COUNT 10 STREAMS mystream >
# 处理成功后确认
XACK mystream groupA <message-id>
要点与验证:
- 未确认的消息进入 PEL(pending entries list),通过 `XPENDING` 可观测。
- “至少一次”交付:同一消息在超时或消费者故障时会被重新投递给同组其他消费者。
# 观测 PEL
XPENDING mystream groupA
# 领取超时消息(claim)
XCLAIM mystream groupA c2 <message-id> MINIDLE 60000
## 重试与死信队列策略
- 记录重试次数:将 `retries` 计数保存在消息值或外部存储中;超过阈值入死信队列。
- 死信队列(DLQ):使用单独的流 `mystream:dlq` 保存处理失败的消息与失败原因,供离线修复或人工介入。
示例:
# 处理失败,入 DLQ
XADD mystream:dlq * origin mystream id <message-id> reason "invalid state"
# 定期巡检 DLQ
XRANGE mystream:dlq - + COUNT 100
## 并发与性能建议
- 消费者并发:同一组下多个消费者横向扩展,配合 `BLOCK` 拉取降低空轮询。
- 分片:按业务键(如 `userId`/`orderId`)将流拆分为若干子流,减小热点与单点开销。
- 可观测性:
- `XINFO STREAM mystream` 观察长度与最后生成 ID。
- `XINFO GROUPS mystream` 查看组内消费者与挂起消息。
## 与事务/一致性注意事项
- 处理逻辑如需写数据库,使用业务幂等键确保重复投递下的安全(如 `ON CONFLICT DO NOTHING`)。
- 将外部写入与 `XACK` 解耦为最终一致流程:先持久化成功,再确认;异常回滚由重试机制覆盖。
## 示例脚本(redis-cli)
# 生产者循环(伪代码)
while true; do
XADD mystream * type event payload '{"ts":'$(date +%s)'}'
sleep 1
done
# 消费者(c1)
while true; do
RES=$(XREADGROUP GROUP groupA c1 BLOCK 5000 COUNT 1 STREAMS mystream >)
if [[ -n "$RES" ]]; then
# 处理并确认
XACK mystream groupA <parsed-id>
fi
done
## 结语
Redis Streams 的消费者组机制提供“至少一次”交付与可观测的挂起列表,通过重试与死信策略可在生产环境实现稳定的事件流消费。配合幂等与外部持久化,可将复杂一致性场景收敛为可控流程。

发表评论 取消回复