本文目标是在不依赖外部队列中间件的前提下,使用 Redis Streams 构建稳定的事件流处理。所有命令可在 Redis 6/7 上直接验证,配合 `XINFO`/`XPENDING` 进行观测。


## 初始化与生产消息


# 创建流并写入消息
XADD mystream * type order event create payload '{"orderId":123}'
XADD mystream * type order event pay    payload '{"orderId":123,"amount":99.00}'

# 创建消费者组(从头消费)
XGROUP CREATE mystream groupA 0 MKSTREAM

说明:`XADD` 写入具备全局递增 ID;`MKSTREAM` 在首次创建时自动生成流。


## 拉取与确认(至少一次)


# 消费者 c1 拉取(阻塞 5 秒)
XREADGROUP GROUP groupA c1 BLOCK 5000 COUNT 10 STREAMS mystream >

# 处理成功后确认
XACK mystream groupA <message-id>

要点与验证:


  • 未确认的消息进入 PEL(pending entries list),通过 `XPENDING` 可观测。
  • “至少一次”交付:同一消息在超时或消费者故障时会被重新投递给同组其他消费者。

# 观测 PEL
XPENDING mystream groupA

# 领取超时消息(claim)
XCLAIM mystream groupA c2 <message-id> MINIDLE 60000

## 重试与死信队列策略


  • 记录重试次数:将 `retries` 计数保存在消息值或外部存储中;超过阈值入死信队列。
  • 死信队列(DLQ):使用单独的流 `mystream:dlq` 保存处理失败的消息与失败原因,供离线修复或人工介入。

示例:


# 处理失败,入 DLQ
XADD mystream:dlq * origin mystream id <message-id> reason "invalid state"

# 定期巡检 DLQ
XRANGE mystream:dlq - + COUNT 100

## 并发与性能建议


  • 消费者并发:同一组下多个消费者横向扩展,配合 `BLOCK` 拉取降低空轮询。
  • 分片:按业务键(如 `userId`/`orderId`)将流拆分为若干子流,减小热点与单点开销。
  • 可观测性:
  • `XINFO STREAM mystream` 观察长度与最后生成 ID。
  • `XINFO GROUPS mystream` 查看组内消费者与挂起消息。

## 与事务/一致性注意事项


  • 处理逻辑如需写数据库,使用业务幂等键确保重复投递下的安全(如 `ON CONFLICT DO NOTHING`)。
  • 将外部写入与 `XACK` 解耦为最终一致流程:先持久化成功,再确认;异常回滚由重试机制覆盖。

## 示例脚本(redis-cli)


# 生产者循环(伪代码)
while true; do
  XADD mystream * type event payload '{"ts":'$(date +%s)'}'
  sleep 1
done

# 消费者(c1)
while true; do
  RES=$(XREADGROUP GROUP groupA c1 BLOCK 5000 COUNT 1 STREAMS mystream >)
  if [[ -n "$RES" ]]; then
    # 处理并确认
    XACK mystream groupA <parsed-id>
  fi
done

## 结语


Redis Streams 的消费者组机制提供“至少一次”交付与可观测的挂起列表,通过重试与死信策略可在生产环境实现稳定的事件流消费。配合幂等与外部持久化,可将复杂一致性场景收敛为可控流程。



点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论
立即
投稿

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部