全球市场消息队列完整指南
把主流消息队列做一次完整盘点,重点包括:
- 先横向对比 Kafka、RabbitMQ、RocketMQ、NATS、Pulsar 五个主流产品的定位、吞吐、延迟、事务、多租户和学习曲线
- 再分别拆解每个系统的 架构模型、核心角色、使用场景、示例代码和运维调优要点
- 补充一些更轻量或更窄场景的方案,比如 Redis Pub/Sub、Redis Streams、PostgreSQL LISTEN/NOTIFY、MySQL 轮询、ActiveMQ
- 最后把 AWS、Azure、Google Cloud 的托管消息服务和自建方案放在一起比较,给出选型和运维上的实际建议
全球市场消息队列完整指南
本文档深度分析全球市场最主流的 5 个消息队列系统:Kafka、RabbitMQ、RocketMQ、NATS、Pulsar
以及其他产品,三大云厂商产品
产品特点对比
| 特性 | Kafka | RabbitMQ | RocketMQ | NATS | Pulsar |
|---|---|---|---|---|---|
| 诞生公司 | VMware | Alibaba | Derek Collison | Yahoo | |
| 开源年份 | 2011 | 2007 | 2012 | 2010 | 2016 |
| GitHub Stars | ~28k | ~11k | ~26k | ~14k | ~13k |
| 核心优势 | 高吞吐、流处理 | 灵活路由、企业级 | 事务、低延迟 | 超轻量、极速 | 云原生、多租户 |
| 吞吐量 | 🏆 最高 | ⚠️ 中等 | ✅ 高 | ✅ 很高 | ⚠️ 中高 |
| 延迟 | 低 | 低 | 🏆 最低 | 🏆 极低 | 中 |
| 事务支持 | ✅ | ✅ | 🏆 最佳 | ❌ | ✅ |
| 消息追踪 | ⚠️ 需要外部 | ✅ | 🏆 内置 | ⚠️ 有限 | ✅ |
| 延迟消息 | ❌ | ✅ | 🏆 18级 | ❌ | ✅ 任意 |
| 多租户 | ❌ | ⚠️ 基本 | ⚠️ 基本 | ✅ | 🏆 最强 |
| 学习曲线 | 中 | 低 | 低 | 🏆 最低 | 高 |
1. Apache Kafka
产品定位
分布式事件流平台,日志聚合和实时流处理的事实标准
适用场景
- 日志收集与聚合
- 实时数据管道
- 事件溯源 (Event Sourcing)
- CDC (变更数据捕获)
- 与 Flink/Spark Streaming 集成的流处理
架构详解
flowchart TB
subgraph Producers["Producer Clients"]
direction LR
P[Producers send messages<br/>to topics/partitions]
end
subgraph ZK["ZooKeeper / KRaft — Metadata Management"]
direction LR
ZK1[ZK Node 1]
ZK2[ZK Node 2]
ZK3[ZK Node 3]
end
subgraph Cluster["Broker Cluster (Commit Log)"]
direction LR
subgraph B1["Broker 1"]
direction TB
B1P0["Topic-A P0<br/>Leader"]
B1P1["Topic-A P1<br/>Follower"]
B1P2["Topic-A P2<br/>Follower"]
end
subgraph B2["Broker 2"]
direction TB
B2P0["Topic-A P0<br/>Follower"]
B2P1["Topic-A P1<br/>Follower"]
B2P2["Topic-A P2<br/>Leader"]
end
subgraph B3["Broker 3"]
direction TB
B3P0["Topic-A P0<br/>Follower"]
B3P1["Topic-A P1<br/>Leader"]
B3P2["Topic-A P2<br/>Follower"]
end
end
subgraph Consumers["Consumer Groups (Parallel Consumption)"]
direction LR
C1["Consumer 1<br/>Group A"]
C2["Consumer 2<br/>Group A"]
C3["Consumer 3<br/>Group B"]
end
Producers --> Cluster
ZK -.协调 / 元数据 / Leader 选举.-> Cluster
Cluster --> Consumers
B1P0 -.副本同步.- B2P0
B2P2 -.副本同步.- B3P2
B3P1 -.副本同步.- B1P1
核心模块/角色详解
| 角色 | 职责 | 详细说明 |
|---|---|---|
| Producer | 消息生产者 | - 将消息发布到 Topic - 选择分区 (round-robin 或 key-hash) - 支持异步发送、同步发送、事务发送 |
| Broker | 消息存储节点 | - 存储分区数据 (commit log) - 处理读写请求 - Leader 处理读写,Follower 仅复制 - 可水平扩展 |
| ZooKeeper/KRaft | 元数据管理 | - 存储集群元数据 (topic/partition 配置) - Controller 选举 - 旧版本用 ZooKeeper,2.8+ 用 KRaft |
| Topic | 消息类别 | - 消息的逻辑分类 - 一个 Topic 包含多个 Partition |
| Partition | 并行单元 | - 有序、不可变的消息序列 - 每个 Partition 有一个 Leader,多个 Follower - 是并行消费的基本单位 |
| Consumer | 消息消费者 | - 从 Partition 读取消息 - 记录消费位置 (offset) - 支持自动/手动提交 offset |
| Consumer Group | 消费组 | - 一组 Consumer 协同消费 - 组内每个 Consumer 消费不同 Partition - 不同组消费完整数据副本 |
使用示例
|
|
运维与调优
部署建议
| 配置项 | 推荐值 | 说明 |
|---|---|---|
| JVM 堆内存 | 6-8G | 不超过 32G (避免大堆 GC 问题) |
| OS 文件描述符 | 100,000+ | 调整 /etc/security/limits.conf |
| 磁盘类型 | NVMe SSD | 顺序写密集,需要高 IOPS |
| 网络 | 10Gbps+ | 副本同步和数据传输 |
关键配置参数 (server.properties)
# 日志保留策略 - 按大小或时间
log.retention.hours=168 # 保留7天
log.retention.bytes=107374182400 # 100GB
log.segment.bytes=1073741824 # 每个segment 1GB
# 副本配置
default.replication.factor=3 # 生产环境至少3副本
min.insync.replicas=2 # 至少2个同步副本
# 性能优化
num.network.threads=3 # 网络线程数 = CPU核数
num.io.threads=8 # IO线程数 = 2*CPU核数
socket.send.buffer.bytes=102400 # 100KB发送缓冲区
socket.receive.buffer.bytes=102400 # 100KB接收缓冲区
# 清理策略
log.cleanup.policy=delete # 或 compact (压缩)
log.cleaner.enable=true # 启用日志清理
生产者调优
| 参数 | 说明 | 推荐值 |
|---|---|---|
acks |
确认级别 | all (强一致) / 1 (平衡) / 0 (最大吞吐) |
retries |
重试次数 | 2147483647 (无限重试) |
batch.size |
批次大小 | 16384 (16KB) - 可调整到 32KB |
linger.ms |
等待时间 | 5 (毫秒) - 增加批次 |
buffer.memory |
缓冲区大小 | 33554432 (32MB) |
compression.type |
压缩 | lz4 / snappy (平衡速度和压缩率) |
消费者调优
| 参数 | 说明 | 推荐值 |
|---|---|---|
fetch.min.bytes |
最小拉取字节 | 1 (低延迟) / 4096 (高吞吐) |
fetch.max.wait.ms |
最大等待 | 500 (毫秒) |
max.poll.records |
单次拉取记录数 | 500 - 根据处理能力调整 |
enable.auto.commit |
自动提交 | false (精确控制) |
auto.offset.reset |
偏移重置 | earliest (从头) / latest (从新) |
监控指标
- JVM: GC 频率、堆内存使用、线程数
- Broker: Under Replicated Partitions, ISR 收缩率
- Topic: 消息流入/流出速率、消息大小
- Consumer: Lag (消费滞后)、消费速率
- 网络: 入站/出站流量
常见问题排查
| 问题 | 可能原因 | 解决方法 |
|---|---|---|
| 高延迟 | 批次太小、副本同步慢 | 增加 linger.ms、检查网络 |
| 数据丢失 | acks=0 或 1 |
改为 acks=all + min.insync.replicas=2 |
| 消费滞后 | 消费者处理慢 | 增加分区数、增加消费者 |
| Leader 选举频繁 | Broker 不稳定 | 检查网络、增加 zookeeper.session.timeout.ms |
运维工具
- 官方工具:
kafka-topics.sh,kafka-consumer-groups.sh,kafka-producer-perf-test.sh - 监控: Prometheus + Grafana, Confluent Control Center
- 管理: AKHQ, Kafka Manager, CMAK
- 日志收集: ELK Stack, Loki
2. RabbitMQ
产品定位
最灵活的开源消息代理,企业级应用和微服务通信的首选
适用场景
- 微服务间通信
- 任务队列分发
- RPC 调用模式
- 需要复杂路由规则的场景
- 企业遗留系统集成
架构详解
flowchart TB
P["Producer Clients<br/>(Publishes messages)"]
subgraph Broker["RabbitMQ Broker"]
subgraph EX["Exchange (Router)"]
direction LR
Direct[Direct]
Fanout[Fanout]
Topic[Topic]
Headers[Headers]
end
Q1[Queue 1]
Q2[Queue 2]
Q3[Queue 3]
EX -->|Bindings| Q1
EX -->|Bindings| Q2
EX -->|Bindings| Q3
end
CA[Consumer A]
CB[Consumer B]
CC[Consumer C]
P --> EX
Q1 --> CA
Q2 --> CB
Q3 --> CC
Exchange Types(路由策略):
- Direct:Routing key 完全匹配
- Fanout:广播到所有绑定的 Queue
- Topic:通配符匹配(
*,#) - Headers:基于消息头匹配
核心模块/角色详解
| 角色 | 职责 | 详细说明 |
|---|---|---|
| Producer | 消息生产者 | - 发送消息到 Exchange - 指定 Routing Key - 可设置消息属性 (persistent, priority, etc.) |
| Exchange | 消息路由器 | - 接收生产者消息 - 根据类型和 Routing Key 路由到 Queue - 四种类型: Direct, Fanout, Topic, Headers |
| Binding | 绑定规则 | - 连接 Exchange 和 Queue - 定义 Routing Key 匹配规则 |
| Queue | 消息队列 | - 存储消息直到被消费 - FIFO 顺序 - 支持持久化、消息 TTL、死信 |
| Consumer | 消息消费者 | - 从 Queue 获取消息 - Push (订阅) 或 Pull 模式 - 手动/自动 ACK 确认 |
| Virtual Host (vhost) | 逻辑隔离 | - 类似数据库的 schema - 不同 vhost 完全隔离 - 用于多租户 |
| Connection/Channel | 连接管理 | - Connection: TCP 连接 - Channel: 轻量级会话,复用 Connection |
使用示例
|
|
运维与调优
部署建议
| 配置项 | 推荐值 | 说明 |
|---|---|---|
| Erlang VM 内存 | 4-8G | RabbitMQ 是 Erlang 应用,注意 Erlang 和 JVM 内存分配平衡 |
| 文件描述符 | 50,000+ | ulimit -n 65536 |
| 磁盘类型 | SSD | 消息持久化需要快速 IO |
| 集群规模 | 3-7 节点 | 奇数节点便于脑裂投票 |
关键配置参数 (rabbitmq.conf)
|
|
性能调优
| 调优项 | 说明 |
|---|---|
| 队列类型 | 优先使用 Quorum Queues (3.8+) 替代 Classic Queues 实现高可用 |
| 惰性队列 | x-queue-mode=lazy - 消息尽快写入磁盘,减少内存使用 |
| 消息 TTL | 设置合理的消息过期时间,避免队列积压 |
| 死信队列 | 配置 DLQ 处理失败消息,避免无限重试 |
| prefetch_count | 消费者预取数,设为 10-100 平衡吞吐和公平 |
| 连接复用 | 复用 Connection,多 Channel 共享 |
集群与高可用
|
|
监控指标
- 队列指标: 消息堆积数、消息速率、消费者数量
- Erlang VM: 内存使用、调度器利用率、进程数
- 网络: 连接数、通道数、入站/出站流量
- 磁盘: I/O 等待、磁盘使用率
- 资源: 文件描述符使用、套接字缓存
常见问题排查
| 问题 | 可能原因 | 解决方法 |
|---|---|---|
| 内存告警 | 队列堆积、非惰性队列 | 开启惰性队列、清理堆积消息 |
| 高延迟 | 持久化配置、磁盘慢 | 调整持久化策略、升级 SSD |
| 连接断开 | 心跳超时、网络问题 | 增加 heartbeat 间隔 |
| 流控 | 生产太快消费太慢 | 增加消费者、限速生产者 |
运维工具
- 管理界面: RabbitMQ Management Plugin (Web UI)
- 命令行:
rabbitmqctl,rabbitmqadmin - 监控: Prometheus + Grafana (rabbitmq_prometheus plugin)
- 日志:
/var/log/rabbitmq/- 检查 scheduler、connection、queue 日志
3. Apache RocketMQ
产品定位
阿里开源的金融级消息中间件,专为电商和交易场景打造
适用场景
- 电商订单、支付交易
- 金融级可靠消息投递
- 需要消息追踪和查询
- 需要延迟消息的场景
- 分布式事务消息
架构详解
flowchart TB
P["Producer Clients<br/>(transactional / normal / delay messages)"]
subgraph NS["NameServer Cluster (stateless)"]
direction LR
NS1["NameServer 1<br/>Topic route<br/>Broker info"]
NS2["NameServer 2<br/>Topic route<br/>Broker info"]
NS3["NameServer 3<br/>Topic route<br/>Broker info"]
end
subgraph Brokers["Broker Cluster"]
direction LR
BM1["Broker 1 Master<br/>Topic-A Queue 0-3"]
BS1["Broker 1 Slave<br/>Topic-A Queue 0-3"]
BM2["Broker 2 Master<br/>Topic-B Queue 0-3"]
BM1 <-.Sync.-> BS1
end
subgraph Consumers["Consumer Groups"]
direction LR
C1["Consumer 1 (Push)"]
C2["Consumer 2 (Pull)"]
C3["Consumer 3 (Push)"]
end
P --> NS
NS -.路由.-> Brokers
Brokers --> Consumers
每个 Broker 内部有两个关键结构:Commit Log(所有消息顺序写盘的主日志)+ Consume Queue(按 Topic/Queue 维护的偏移量索引),提供高性能写入和快速消费。详见下方角色表。
核心模块/角色详解
| 角色 | 职责 | 详细说明 |
|---|---|---|
| NameServer | 路由注册中心 | - 无状态,可水平扩展 - 维护 Topic/Broker 路由信息 - Producer/Consumer 从这里获取路由 - 类似轻量级 ZooKeeper,但更简单 |
| Broker | 消息存储节点 | - Master-Slave 架构 - Master 处理读写,Slave 同步备份 - 一个 Topic 分为多个 Queue(分区) |
| Commit Log | 存储文件 | - 所有消息顺序写入 Commit Log - 固定大小文件(默认 1G) - 顺序写,高性能 |
| Consume Queue | 消费索引 | - 按 Topic/Queue 组织的索引 - 存储 Commit Log 的偏移量 - 加速消费查询 |
| Topic | 消息主题 | - 逻辑分类 - 由多个 Queue 组成(并行度) |
| Queue | 分区 | - Topic 内的并行单元 - 类似 Kafka Partition - 每个 Queue 在一个 Broker 上 |
| Producer | 消息生产者 | - 支持同步/异步/OneWay 发送 - 事务消息(2PC) - 延迟消息(18级) - 消息查询(按 Message ID/Key) |
| Consumer | 消息消费者 | - Push/Pull 两种模式 - 集群消费(负载均衡)/广播消费 - 消息重试、死信队列 |
延迟消息级别
| 级别 | 延迟时间 | 级别 | 延迟时间 |
|---|---|---|---|
| 1 | 1秒 | 10 | 10分钟 |
| 2 | 5秒 | 11 | 20分钟 |
| 3 | 10秒 | 12 | 30分钟 |
| 4 | 30秒 | 13 | 1小时 |
| 5 | 1分钟 | 14 | 2小时 |
| 6 | 2分钟 | … | … |
| 7 | 3分钟 | 18 | 2小时 |
| 8 | 4分钟 | ||
| 9 | 5分钟 |
使用示例
|
|
运维与调优
部署建议
| 配置项 | 推荐值 | 说明 |
|---|---|---|
| JVM 堆内存 | 8-16G | NameServer 4G, Broker 8-16G |
| OS 调优 | 关闭 Swap | Swap 会严重影响性能 |
| 磁盘 | NVMe SSD | Commit Log 和 Consume Queue 需要高速 IO |
| 网络 | 10Gbps+ | Master-Slave 同步需要大带宽 |
| 架构 | 2n+1 NameServer | 建议 3 或 5 个 NameServer |
NameServer 配置 (namesrv.properties)
# 端口
listenPort=9876
# JVM 选项 (启动时设置)
# -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m
# 线程配置
serverWorkerThreads=8
serverCallbackExecutorThreads=16
# 路由信息过期时间
routeInfoUnavailablePeriod=10000
Broker 配置 (broker.conf)
# 集群名称
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
# NameServer 地址
namesrvAddr=nameserver1:9876;nameserver2:9876;nameserver3:9876
# 存储路径
storePathRootDir=/data/rocketmq/store
storePathCommitLog=/data/rocketmq/store/commitlog
storePathConsumeQueue=/data/rocketmq/store/consumequeue
storePathIndex=/data/rocketmq/store/index
# 刷盘策略 - 同步刷盘 (高可靠) 或 异步刷盘 (高性能)
flushDiskType=ASYNC_FLUSH # SYNC_FLUSH / ASYNC_FLUSH
flushCommitLogTimed=true
flushIntervalTimeToCommitLog=500 # 500ms 刷盘一次
# 副本同步策略
syncFlush=true # 从节点同步刷盘
syncMaster=true # 同步主从复制
# 文件大小
mapedFileSizeCommitLog=1073741824 # CommitLog 每个文件 1G
mapedFileSizeConsumeQueue=300000 # ConsumeQueue 文件大小
# 消息保留
fileReservedTime=72 # 保留 72 小时
deleteWhen=04 # 凌晨4点删除过期文件
# 性能调优
sendMessageThreadPoolNums=128 # 发送线程池
pullMessageThreadPoolNums=128 # 拉取线程池
rejectTransactionMessage=false
Master-Slave 部署架构
flowchart TB
NS["NameServer Cluster (3 nodes)"]
subgraph GroupA["Group A"]
direction TB
MA["Broker Master<br/>broker-a"]
SA["Broker Slave<br/>broker-a-s"]
MA -->|Sync| SA
end
subgraph GroupB["Group B"]
direction TB
MB["Broker Master<br/>broker-b"]
SB["Broker Slave<br/>broker-b-s"]
MB -->|Sync| SB
end
subgraph GroupC["Group C"]
direction TB
MC["Broker Master<br/>broker-c"]
SC["Broker Slave<br/>broker-c-s"]
MC -->|Sync| SC
end
NS --> GroupA
NS --> GroupB
NS --> GroupC
生产者调优
| 参数 | 说明 | 推荐值 |
|---|---|---|
sendMsgTimeout |
发送超时 | 3000ms |
retryTimesWhenSendFailed |
同步发送重试 | 2 次 |
retryAnotherBrokerWhenNotStoreOK |
失败时换 Broker | true |
compressMsgBodyOverHowmuch |
压缩阈值 | 4096 (4KB) |
maxMessageSize |
最大消息大小 | 4194304 (4MB) |
消费者调优
| 模式 | 配置 | 说明 |
|---|---|---|
| Push 模式 | consumeThreadMin`/consumeThreadMax` |
线程数 20-64 |
| Pull 模式 | pullBatchSize |
批量大小 32 |
| 重试策略 | retryTimesWhenConsumeFailed |
-1 (无限重试) |
| 消费模式 | CLUSTERING / BROADCASTING |
集群/广播 |
监控指标
- NameServer: 心跳注册数、路由信息
- Broker: TPS、消息堆积、刷盘耗时、HA 同步状态
- JVM: GC、堆内存、线程
- 存储: Commit Log 写入耗时、磁盘使用率
- 网络: 连接数、入站/出站流量
常见问题排查
| 问题 | 可能原因 | 解决方法 |
|---|---|---|
| 消息发送失败 | NameServer 不可达、Broker 繁忙 | 检查 NameServer 连接、增加 Broker 线程 |
| 消息丢失 | 异步刷盘 + Master 故障 | 改为同步刷盘 + 同步复制 |
| 消费堆积 | 消费慢、重试多 | 增加消费者、优化消费逻辑 |
| 高延迟 | 刷盘慢、GC 频繁 | 优化 GC、使用 SSD |
运维工具
- 官方工具:
mqadmin(命令行管理工具) - 控制台: RocketMQ Dashboard (Web UI)
- 监控: Prometheus + Grafana (rocketmq-exporter)
- 追踪: RocketMQ Message Trace (消息轨迹)
- 日志:
~/logs/rocketmqlogs/- broker.log, namesrv.log
4. NATS
产品定位
云原生、超轻量、极速的消息系统,CNCF 毕业项目
适用场景
- 云原生微服务通信
- 边缘计算场景
- IoT 设备消息
- 需要极低延迟的场景
- Kubernetes 原生部署
架构详解
flowchart TB
P["Publisher Clients<br/>(Publishes to subjects, wildcards supported)"]
subgraph Cluster["NATS Server Cluster"]
direction LR
N1["NATS Server 1<br/>Subjects: task.*, service.><br/>No storage (in-flight only)"]
N2["NATS Server 2<br/>Subjects: task.*, service.><br/>No storage (in-flight only)"]
N3["NATS Server 3<br/>Subjects: task.*, service.><br/>No storage (in-flight only)"]
N1 <-.gossip.-> N2
N2 <-.gossip.-> N3
end
S1["Subscriber 1<br/>task.running"]
S2["Subscriber 2<br/>task.success"]
S3["Subscriber 3<br/>service.>"]
P --> Cluster
Cluster --> S1
Cluster --> S2
Cluster --> S3
Subject Wildcards(主题通配符):
*— 匹配一个 token(例:task.*匹配task.running、task.success)>— 匹配所有后续(例:service.>匹配service.a、service.a.b)
核心模块/角色详解
| 角色 | 职责 | 详细说明 |
|---|---|---|
| NATS Server | 消息代理 | - 超轻量(二进制 < 20MB) - 无状态,无持久化存储(Core NATS) - 自动集群形成(Gossip 协议) - 支持 TLS、认证、授权 |
| Subject | 消息主题 | - 类似 Topic,但更轻量 - 支持通配符 * 和 >- 例如: task.running, service.> |
| Publisher | 消息发布者 | - 发布消息到 Subject - Fire-and-forget 模式 - 极低延迟(微秒级) |
| Subscriber | 消息订阅者 | - 订阅 Subject(支持通配符) - 接收推送的消息 - 自动负载均衡(队列组) |
| Queue Group | 队列组 | - 一组订阅者形成负载均衡 - 消息只发送给组内一个订阅者 - 类似 Kafka Consumer Group |
NATS 产品系列:
- Core NATS: 纯内存,极速,不持久化
- NATS JetStream: 持久化,Exactly-once,流处理
使用示例
|
|
运维与调优
部署建议
| 配置项 | 推荐值 | 说明 |
|---|---|---|
| 内存 | 2-4GB | NATS Server 超轻量,2GB 足够 |
| CPU | 2-4 核 | 多连接场景可增加 |
| 网络 | 1Gbps+ | 优先低延迟网络 |
| 集群规模 | 3-5 节点 | Raft 共识需要奇数节点 |
NATS Server 配置 (nats-server.conf)
|
|
JetStream 调优 (持久化)
| 参数 | 说明 | 推荐值 |
|---|---|---|
store_dir |
存储目录 | SSD 挂载目录 |
max_memory_store |
内存存储限制 | 1-4GB |
max_file_store |
文件存储限制 | 10-100GB |
snapshot_threshold |
快照阈值 | 1000-10000 |
Stream 配置
|
|
监控指标
- Connections: 当前连接数、总连接数
- Subscriptions: 订阅数、主题数
- Messages: 消息速率、消息字节数
- JetStream: 流状态、消费者状态、持久化存储
- Server: 内存、CPU、Goroutine (Go 语言)
常见问题排查
| 问题 | 可能原因 | 解决方法 |
|---|---|---|
| 延迟增加 | 消息过大、连接过多 | 分批发送、优化连接池 |
| 消息丢失 | Core NATS 不持久化 | 改用 JetStream |
| 连接断开 | 心跳超时 | 调整 ping_interval |
| 吞吐量低 | 单连接瓶颈 | 使用多连接并发 |
运维工具
- 管理工具:
natsCLI,nats-top - 监控: NATS Exporter + Prometheus + Grafana
- Dashboard: NATS Surveyor
- K8s: Helm Chart, NATS Operator
5. Apache Pulsar
产品定位
云原生、分层架构的下一代消息系统,多租户和地理复制是核心优势
适用场景
- 多租户 SaaS 平台
- 需要地理复制的全球部署
- 数据存储与计算独立扩展
- 长期数据留存 + 实时访问
- 企业级隔离和安全需求
架构详解
flowchart TB
P["Producer Clients (multi-tenant aware)"]
subgraph Compute["Pulsar Brokers (无状态计算层)"]
direction LR
B1[Broker 1] ~~~ B2[Broker 2] ~~~ B3[Broker 3]
end
subgraph Storage["BookKeeper (存储层, 3 副本)"]
direction LR
BK1["Bookie 1<br/>本地 SSD"] ~~~ BK2["Bookie 2<br/>本地 SSD"] ~~~ BK3["Bookie 3<br/>本地 SSD"]
end
subgraph Aux["辅助组件"]
direction LR
ZK["ZooKeeper<br/>元数据"] ~~~ Tier["Tiered Storage<br/>S3 / GCS / OSS"] ~~~ Schema["Schema Registry"]
end
subgraph Consumers["Consumer Groups"]
direction LR
C1["Consumer Exclusive"] ~~~ C2["Consumer Shared"] ~~~ C3["Consumer Failover"]
end
P --> Compute
Compute <-.读写.-> Storage
Storage -.-> Aux
Compute --> Consumers
Pulsar 多租户模型:Tenant(租户) → Namespace(命名空间) → Topic(主题)。每个租户有独立的配额、认证、隔离。
核心模块/角色详解
| 角色 | 职责 | 详细说明 |
|---|---|---|
| Broker | 计算层 | - 无状态,不存储数据 - 接收/分发消息 - 处理消费者请求 - 可独立扩展(秒级扩缩容) |
| BookKeeper | 存储层 | - 持久化存储 Ledger(分片) - 每个 Ledger 多副本(Quorum) - 本地 SSD 存储 - 可独立扩展存储容量 |
| Ledger | 存储分片 | - 消息存储的基本单元 - 分片存储到多个 Bookie - 类似 Kafka Partition,但更小更灵活 |
| Tenant | 租户 | - 最高级别的隔离 - 独立的认证/授权 - 独立的资源配额 |
| Namespace | 命名空间 | - 租户下的二级隔离 - 配置策略(保留、复制等) |
| Topic | 主题 | - 持久/非持久 - 分区/非分区 - 全局有序/局部有序 |
| ZooKeeper | 元数据管理 | - 存储集群元数据 - 协调 Broker 和 Bookie |
| Tiered Storage | 分层存储 | - 旧数据自动卸载到 S3/GCS/OSS - 透明访问,对消费者无感知 - 大幅降低存储成本 |
| Producer | 消息生产者 | - 支持批次、压缩 - 自动分区路由 - Exactly-once 语义 |
| Consumer | 消息消费者 | - 四种订阅模式: Exclusive, Shared, Failover, Key_Shared - 消息确认 ACK - 游标 (Cursor) 记录消费位置 |
消费者订阅模式
| 模式 | 描述 | 适用场景 |
|---|---|---|
| Exclusive | 只有一个消费者能消费 | 需要严格顺序 |
| Shared | 多个消费者轮询消费 | 高吞吐,顺序不重要 |
| Failover | 主备切换 | 高可用 + 顺序 |
| Key_Shared | 按 Key 分发 | 需要 Key 级顺序 |
使用示例
|
|
运维与调优
部署建议
| 配置项 | 推荐值 | 说明 |
|---|---|---|
| Broker JVM | 8-16G | 无状态计算层,中等内存即可 |
| BookKeeper JVM | 16-32G | 存储层需要大内存做缓存 |
| ZooKeeper | 4-8G | 元数据管理,稳定性优先 |
| 磁盘类型 | NVMe SSD | BookKeeper 需要高 IOPS |
| 网络 | 10Gbps+ | BookKeeper 数据复制需要大带宽 |
| 集群规模 | 3-5 Broker + 3-7 Bookies | 独立扩展计算和存储 |
Broker 配置 (broker.conf)
# 集群配置
clusterName=pulsar-cluster
zookeeperServers=zk1:2181,zk2:2181,zk3:2181
configurationStoreServers=zk1:2181,zk2:2181,zk3:2181
# 端口
brokerServicePort=6650
brokerServicePortTls=6651
webServicePort=8080
webServicePortTls=8443
# 存储配置
managedLedgerDefaultEnsembleSize=3 # Bookies 集合大小
managedLedgerDefaultWriteQuorum=2 # 写入 Quorum
managedLedgerDefaultAckQuorum=2 # 确认 Quorum
# 性能调优
numIoThreads=8 # IO 线程数 = CPU 核数
numHttpServerThreads=16
maxConcurrentLookupRequests=50000
maxConcurrentProducerRequests=100000
# 消息保留
defaultRetentionTimeInMinutes=10080 # 7天
defaultRetentionSizeInMB=-1 # 无限制
# 负载均衡
loadBalancerSheddingIntervalMinutes=1
loadBalancerSheddingThreshold=0.7
BookKeeper 配置 (bookkeeper.conf)
# 存储路径
journalDirectory=/data/bookkeeper/journal
ledgerDirectories=/data/bookkeeper/ledgers
indexDirectories=/data/bookkeeper/index
# 存储配置
journalSyncData=true # 同步刷盘
ledgerStorageClass=org.apache.bookkeeper.bookie.InterleavedLedgerStorage
ensemblePlacementPolicy=RackawareEnsemblePlacementPolicy
# 性能调优
numJournalCallbackThreads=8
numAddWorkerThreads=8
numReadWorkerThreads=16
fileInfoFormatVersion=3
# 磁盘使用
diskUsageThreshold=0.95
diskUsageWarnThreshold=0.90
gcWaitTime=900
# 网络
nettyServerTcpNoDelay=true
nettyServerSoSndBuf=131072
nettyServerSoRcvBuf=131072
分层存储配置 (Tiered Storage)
# 启用分层存储
managedLedgerOffloadDriver=aws-s3
s3ManagedLedgerOffloadBucket=pulsar-cold-storage
s3ManagedLedgerOffloadRegion=us-east-1
s3ManagedLedgerOffloadMaxBlockSizeInBytes=67108864 # 64MB
# 触发卸载的阈值
managedLedgerOffloadThresholdInSeconds=86400 # 1天后卸载
managedLedgerOffloadDeletionLagInSeconds=604800 # 保留本地副本7天
生产者调优
| 参数 | 说明 | 推荐值 |
|---|---|---|
sendTimeoutMs |
发送超时 | 30000ms |
blockIfQueueFull |
队列满时阻塞 | true |
maxPendingMessages |
最大挂起消息 | 1000-10000 |
batchingEnabled |
启用批次 | true |
batchingMaxPublishDelayMicros |
批次延迟 | 1000 (1ms) |
compressionType |
压缩 | LZ4 / ZSTD |
消费者调优
| 参数 | 说明 | 推荐值 |
|---|---|---|
receiverQueueSize |
接收队列大小 | 1000 |
ackTimeout |
确认超时 | 60000ms |
negativeAckRedeliveryDelay |
重试延迟 | 60000ms |
subscriptionType |
订阅模式 | Shared (高吞吐) / Exclusive (顺序) |
监控指标
- Broker: 消息速率、生产/消费延迟、连接数、存储水位
- BookKeeper: 写入延迟、Quorum 大小、磁盘使用、缓存命中
- ZooKeeper: 会话数、请求延迟、znode 数
- Topic/Namespace: 消息速率、存储使用、订阅数
- JVM: GC 频率、堆内存、线程数
常见问题排查
| 问题 | 可能原因 | 解决方法 |
|---|---|---|
| 写入延迟高 | BookKeeper Quorum 慢 | 检查 Bookie 磁盘、增加 ackQuorum |
| 读延迟高 | 数据已卸载到 S3 | 调整 offload 阈值、增加 Bookie 缓存 |
| Ledger 无法关闭 | Bookie 故障 | 检查 Bookie 状态、修复或替换 Bookie |
| Backlog 增长 | 消费慢 | 增加消费者、调整订阅模式 |
运维工具
- 官方工具:
pulsar-admin,pulsar-client,pulsar-perf - 管理界面: Pulsar Manager, StreamNative Console
- 监控: Prometheus + Grafana (pulsar-metrics)
- K8s: Helm Chart, Pulsar Operator (StreamNative / DataStax)
- 日志: Broker 日志、Bookie 日志、ZooKeeper 日志
6. 小众方案 & 数据库消息队列
概述
除了主流的专业消息队列,还有一些轻量级或基于数据库的消息队列方案,适合特定场景。
6.1 Redis Pub/Sub
| 特性 | 说明 |
|---|---|
| 定位 | Redis 内置的简单发布订阅功能 |
| 持久化 | ❌ 不持久化,消息仅在内存中 |
| 适用场景 | 简单实时通知、缓存失效、轻量级事件 |
| 优点 | 零依赖、极低延迟、部署简单 |
| 缺点 | 消息可能丢失、无消费确认、无回溯 |
使用示例:
|
|
6.2 Redis Streams (Redis 5.0+)
| 特性 | 说明 |
|---|---|
| 定位 | Redis 的持久化事件流 |
| 持久化 | ✅ 支持 RDB/AOF 持久化 |
| 适用场景 | 事件溯源、轻量级消息队列 |
| 优点 | 持久化、消费组、消息回溯 |
| 缺点 | 容量受限于内存、无跨节点复制 |
使用示例:
|
|
6.3 PostgreSQL LISTEN/NOTIFY
| 特性 | 说明 |
|---|---|
| 定位 | PostgreSQL 内置的消息通知 |
| 持久化 | ⚠️ 通知本身不持久化,但可以结合表 |
| 适用场景 | 数据库变更通知、轻量级事件 |
| 优点 | 事务一致性、无额外依赖 |
| 缺点 | 不适合高吞吐、无消费确认 |
使用示例:
|
|
6.4 MySQL + 定时轮询
| 特性 | 说明 |
|---|---|
| 定位 | 用数据库表模拟消息队列 |
| 持久化 | ✅ 完全持久化 |
| 适用场景 | 简单任务队列、已有数据库的场景 |
| 优点 | 事务支持、可靠、无额外运维 |
| 缺点 | 性能有限、轮询有延迟 |
使用示例:
|
|
6.5 Apache ActiveMQ / ActiveMQ Artemis
| 特性 | 说明 |
|---|---|
| 定位 | 老牌 JMS 消息中间件 |
| 持久化 | ✅ 支持 |
| 适用场景 | Java 企业遗留系统、JMS 标准 |
| 优点 | JMS 兼容、多种协议、企业级 |
| 缺点 | 性能一般、社区活跃度下降 |
使用示例:
|
|
6.6 其他小众方案
| 方案 | 特点 | 适用场景 |
|---|---|---|
| Beanstalkd | 简单任务队列、延迟任务 | 任务调度、后台作业 |
| Gearman | 任务分发框架 | 分布式任务处理 |
| ZeroMQ | 消息传输库 | 自定义消息系统、高性能 |
| NSQ | 实时分布式消息平台 | 替代方案之一 |
| IBM MQ | 企业级商业 MQ | 传统企业集成 |
何时选择这些方案?
| 场景 | 推荐方案 |
|---|---|
| 已有 Redis,简单通知 | Redis Pub/Sub |
| 已有 Redis,需要持久化 | Redis Streams |
| 已有 PostgreSQL,数据库变更 | PostgreSQL LISTEN/NOTIFY |
| 已有 MySQL,简单任务 | MySQL 任务表 + 轮询 |
| Java 遗留系统 | ActiveMQ Artemis |
7. 云厂商消息队列服务
概述
三大云厂商都提供托管的消息队列服务,免去运维负担,提供高可用和弹性扩展。
7.1 AWS SQS + SNS
架构图
flowchart TB
subgraph Region["AWS Region"]
subgraph SNS["SNS (Simple Notification Service)"]
direction LR
TA[Topic A]
TB[Topic B]
end
end
SQS1["SQS Queue 1 (Standard)"]
SQS2["SQS Queue 2 (FIFO)"]
Lambda["Lambda (Serverless)"]
HTTP["HTTP/S Webhook"]
EC2["EC2 Consumer (Polling)"]
ECS["ECS Consumer (Polling)"]
TA --> SQS1
TA --> Lambda
TB --> SQS2
TB --> HTTP
SQS1 --> EC2
SQS2 --> ECS
SNS (Simple Notification Service) - 发布订阅
特点:
- Pub/Sub 消息推送
- 支持多种订阅端:SQS、Lambda、HTTP、Email、SMS
- 高可用、全托管
使用示例:
|
|
SQS (Simple Queue Service) - 消息队列
两种队列类型:
| 特性 | Standard Queue | FIFO Queue |
|---|---|---|
| 顺序 | 最佳努力 | 严格有序 |
| 去重 | 不保证 | 自动去重 |
| 吞吐量 | 几乎无限 | ~3000 msg/s |
| 延迟 | 低 | 略高 |
使用示例:
|
|
7.2 Azure Service Bus
架构图
flowchart TB
subgraph NS["Azure Service Bus Namespace"]
subgraph Topics["Topics — Pub/Sub"]
direction LR
subgraph TopicA["Topic A"]
S1A[Subscription 1]
S2A[Subscription 2]
end
subgraph TopicB["Topic B"]
S1B[Subscription 1]
S2B[Subscription 2]
end
end
subgraph Queues["Queues — Point-to-Point"]
direction LR
Q1["Queue 1<br/>(Standard)"]
Q2["Queue 2<br/>(Session)"]
end
end
Fn["Azure Functions<br/>(Serverless)"]
App["Azure App Service<br/>(WebJobs)"]
VM["VM / AKS<br/>(Worker Role)"]
NS --> Fn
NS --> App
NS --> VM
核心概念
| 概念 | 说明 |
|---|---|
| Namespace | 顶级容器,包含 Queue 和 Topic |
| Queue | 点对点,先入先出 |
| Topic + Subscription | 发布订阅,支持过滤 |
| Session | 会话状态,按顺序处理 |
| Dead Letter Queue | 死信队列,存放处理失败的消息 |
使用示例:
|
|
7.3 Google Cloud Pub/Sub
架构图
flowchart TB
subgraph Region["Google Cloud Region (e.g. us-central1)"]
subgraph PS["Google Cloud Pub/Sub<br/>(Globally distributed, fully managed)"]
direction LR
TA[Topic A]
TB[Topic B]
end
end
Sub1["Subscription 1 (Push)<br/>Acknowledgements<br/>Retry policy"]
Sub2["Subscription 2 (Pull)<br/>Acknowledgements<br/>Exactly-once"]
Sub3["Subscription 3 (Push)<br/>Dead letter topic<br/>Ordering key"]
CF["Cloud Functions<br/>(Serverless)"]
GKE["GKE (K8s)<br/>Consumer Pods"]
CR["Cloud Run<br/>(Containers)"]
PS --> Sub1
PS --> Sub2
PS --> Sub3
Sub1 --> CF
Sub2 --> GKE
Sub3 --> CR
Pub/Sub 核心特性:
- 全球复制:消息自动在多个 Region 复制
- 至少一次投递:保证消息至少被投递一次
- Exactly-once:订阅级别精确一次处理
- 无限扩展:自动扩展,无需预配置容量
- 消息保留:最多 7 天
产品特点
| 特性 | 说明 |
|---|---|
| 全球分布 | 跨 Region 自动复制,高可用 |
| 完全托管 | 无需管理任何服务器 |
| 弹性扩展 | 自动扩展,支持百万级 TPS |
| 消息保留 | 最多 7 天 |
| 最大消息大小 | 10MB |
| Exactly-once | 订阅级别精确一次处理 |
| 死信队列 | 自动创建和管理 |
| 排序 | 支持按排序键 (Ordering Key) 排序 |
使用示例 (Google Cloud SDK):
|
|
其他云厂商
- 阿里云 - 消息队列 RocketMQ、消息队列 MQ
- 腾讯云 - TDMQ for RocketMQ、TDMQ for Pulsar、CMQ
7.4 云厂商消息队列对比总结
| 特性 | AWS SQS/SNS | Azure Service Bus | Google Cloud Pub/Sub |
|---|---|---|---|
| 最大消息大小 | 256KB (SQS) / 256KB (SNS) | 256KB (Standard) / 100MB (Premium) | 10MB |
| 消息保留 | 最多 14 天 | 最多 90 天 | 最多 7 天 |
| 延迟消息 | ⚠️ (Visibility Timeout 模拟) | ✅ (Scheduled) | ❌ |
| 事务消息 | ❌ | ✅ (Sessions) | ❌ |
| 顺序保证 | FIFO 队列 | Sessions/Sequencing | Ordering Key |
| 死信队列 | ✅ | ✅ | ✅ |
| 消息追踪 | ⚠️ (CloudTrail) | ✅ | ✅ |
| Pub/Sub | SNS | Topics | Pub/Sub 原生 |
| Serverless 集成 | Lambda | Azure Functions | Cloud Functions |
| 全球部署 | ✅ 多 Region | ✅ 多 Region | ✅ 全球复制 |
| Exactly-once | ❌ | ⚠️ | ✅ |
何时选择云厂商托管服务?
| 情况 | 推荐 |
|---|---|
| 不想运维 MQ 集群 | 云厂商托管服务 |
| 已经在使用特定云厂商 | 该云厂商的 MQ 服务 |
| 需要快速上线 | 云厂商托管服务 |
| 全球多 Region 部署 | AWS / Azure / Google Cloud |
| 主要在中国运营 | 阿里云 / 腾讯云 |
| 有极高的定制化需求 | 自建开源 MQ |
运维与调优对比总结
部署难度对比
| 产品 | 组件数 | 部署难度 | 学习曲线 | 推荐方式 |
|---|---|---|---|---|
| Kafka | 2 (Broker + ZK/KRaft) | 中 | 中 | Kubernetes Operator / Confluent |
| RabbitMQ | 1 (Broker) | 🏆 低 | 🏆 低 | Docker / 包管理器 |
| RocketMQ | 2 (NameServer + Broker) | 中 | 中 | Docker / 官方脚本 |
| NATS | 1 (Server) | 🏆 极低 | 🏆 最低 | Docker / 单二进制 |
| Pulsar | 3 (Broker + BookKeeper + ZK) | 高 | 高 | Kubernetes Operator |
资源需求对比 (生产环境)
| 产品 | 内存 | CPU | 磁盘 | 网络 |
|---|---|---|---|---|
| Kafka | 8-16G | 4-8核 | SSD/NVMe | 10Gbps |
| RabbitMQ | 4-8G | 2-4核 | SSD | 1Gbps+ |
| RocketMQ | 8-16G | 4-8核 | NVMe SSD | 10Gbps |
| NATS | 2-4G | 2-4核 | 可选 (JetStream) | 1Gbps+ |
| Pulsar | Broker: 8-16G Bookie: 16-32G |
4-8核 | NVMe SSD | 10Gbps+ |
监控难度对比
| 产品 | 监控工具 | 指标完备度 | 告警配置 | 可视化 |
|---|---|---|---|---|
| Kafka | JMX Exporter | 高 | 中 | Grafana |
| RabbitMQ | Prometheus Plugin | 高 | 中 | Web UI + Grafana |
| RocketMQ | Exporter | 中 | 中 | Dashboard + Grafana |
| NATS | Exporter | 中 | 易 | Grafana |
| Pulsar | Prometheus Metrics | 🏆 极高 | 中 | Grafana |
调优复杂度对比
| 产品 | 参数数量 | 调优粒度 | 文档质量 | 社区支持 |
|---|---|---|---|---|
| Kafka | 多 | 细 | 🏆 极佳 | 🏆 最强 |
| RabbitMQ | 中 | 中 | 好 | 强 |
| RocketMQ | 中 | 中 | 中 (中文) | 中 |
| NATS | 🏆 少 | 粗 | 好 | 强 |
| Pulsar | 多 | 🏆 极细 | 中 | 增长中 |
日常运维工作对比
| 运维任务 | Kafka | RabbitMQ | RocketMQ | NATS | Pulsar |
|---|---|---|---|---|---|
| 容量规划 | 需规划分区 | 简单 | 需规划队列 | 🏆 极简单 | 独立扩展 |
| 扩容 | 重平衡分区 | 简单 | 简单 | 🏆 极简单 | 🏆 独立扩展 |
| 备份恢复 | 复制分区 | 复制队列 | 复制文件 | (JetStream) | Ledger 复制 |
| 故障排查 | 中 | 简单 | 中 | 🏆 最简单 | 复杂 |
| 升级 | 滚动升级 | 简单 | 滚动升级 | 🏆 极简单 | 分阶段升级 |
总结对比
| 维度 | Kafka | RabbitMQ | RocketMQ | NATS | Pulsar |
|---|---|---|---|---|---|
| 架构复杂度 | 中 | 低 | 中 | 🏆 最低 | 高 |
| 运维复杂度 | 中 | 低 | 中 | 🏆 最低 | 高 |
| 吞吐量 | 🏆 最高 | 低 | 高 | 很高 | 中高 |
| 延迟 | 低 | 低 | 🏆 最低 | 🏆 极低 | 中 |
| 多租户 | ❌ | ⚠️ | ⚠️ | ✅ | 🏆 最佳 |
| 事务 | ✅ | ✅ | 🏆 最佳 | ❌ | ✅ |
| 延迟消息 | ❌ | ✅ | 🏆 最佳 | ❌ | ✅ |
| 云原生 | ⚠️ | ⚠️ | ⚠️ | 🏆 最佳 | ✅ |
| 全球部署 | ⚠️ | ⚠️ | ⚠️ | ✅ | 🏆 最佳 |
选择建议
- 选 Kafka:日志聚合、流处理、大数据生态
- 选 RabbitMQ:企业应用、微服务通信、灵活路由
- 选 RocketMQ:电商交易、金融场景、需要事务/延迟消息
- 选 NATS:云原生微服务、边缘计算、极低延迟
- 选 Pulsar:多租户 SaaS、全球部署、分层存储需求
运维选择建议
| 团队情况 | 推荐产品 | 理由 |
|---|---|---|
| 小型团队,运维经验少 | RabbitMQ / NATS | 部署简单,运维成本低 |
| 已有 Kafka 经验 | Kafka | 生态成熟,人才多 |
| 中国团队,电商/金融 | RocketMQ | 中文文档,场景匹配 |
| 云原生/K8s 团队 | NATS / Pulsar | 云原生设计,Operator 支持 |
| 需要多租户/全球部署 | Pulsar | 架构最匹配 |
| 资源受限,边缘场景 | NATS | 超轻量,低资源消耗 |
| 维度 | Kafka | RabbitMQ | RocketMQ | NATS | Pulsar |
|---|---|---|---|---|---|
| 架构复杂度 | 中 | 低 | 中 | 🏆 最低 | 高 |
| 运维复杂度 | 中 | 低 | 中 | 🏆 最低 | 高 |
| 吞吐量 | 🏆 最高 | 低 | 高 | 很高 | 中高 |
| 延迟 | 低 | 低 | 🏆 最低 | 🏆 极低 | 中 |
| 多租户 | ❌ | ⚠️ | ⚠️ | ✅ | 🏆 最佳 |
| 事务 | ✅ | ✅ | 🏆 最佳 | ❌ | ✅ |
| 延迟消息 | ❌ | ✅ | 🏆 最佳 | ❌ | ✅ |
| 云原生 | ⚠️ | ⚠️ | ⚠️ | 🏆 最佳 | ✅ |
| 全球部署 | ⚠️ | ⚠️ | ⚠️ | ✅ | 🏆 最佳 |
选择建议
- 选 Kafka:日志聚合、流处理、大数据生态
- 选 RabbitMQ:企业应用、微服务通信、灵活路由
- 选 RocketMQ:电商交易、金融场景、需要事务/延迟消息
- 选 NATS:云原生微服务、边缘计算、极低延迟
- 选 Pulsar:多租户 SaaS、全球部署、分层存储需求