把主流消息队列做一次完整盘点,重点包括:

  • 先横向对比 Kafka、RabbitMQ、RocketMQ、NATS、Pulsar 五个主流产品的定位、吞吐、延迟、事务、多租户和学习曲线
  • 再分别拆解每个系统的 架构模型、核心角色、使用场景、示例代码和运维调优要点
  • 补充一些更轻量或更窄场景的方案,比如 Redis Pub/Sub、Redis Streams、PostgreSQL LISTEN/NOTIFY、MySQL 轮询、ActiveMQ
  • 最后把 AWS、Azure、Google Cloud 的托管消息服务和自建方案放在一起比较,给出选型和运维上的实际建议

全球市场消息队列完整指南

本文档深度分析全球市场最主流的 5 个消息队列系统:Kafka、RabbitMQ、RocketMQ、NATS、Pulsar
以及其他产品,三大云厂商产品


产品特点对比

特性 Kafka RabbitMQ RocketMQ NATS Pulsar
诞生公司 LinkedIn VMware Alibaba Derek Collison Yahoo
开源年份 2011 2007 2012 2010 2016
GitHub Stars ~28k ~11k ~26k ~14k ~13k
核心优势 高吞吐、流处理 灵活路由、企业级 事务、低延迟 超轻量、极速 云原生、多租户
吞吐量 🏆 最高 ⚠️ 中等 ✅ 高 ✅ 很高 ⚠️ 中高
延迟 🏆 最低 🏆 极低
事务支持 🏆 最佳
消息追踪 ⚠️ 需要外部 🏆 内置 ⚠️ 有限
延迟消息 🏆 18级 ✅ 任意
多租户 ⚠️ 基本 ⚠️ 基本 🏆 最强
学习曲线 🏆 最低


1. Apache Kafka

产品定位

分布式事件流平台,日志聚合和实时流处理的事实标准

适用场景

  • 日志收集与聚合
  • 实时数据管道
  • 事件溯源 (Event Sourcing)
  • CDC (变更数据捕获)
  • 与 Flink/Spark Streaming 集成的流处理

架构详解

flowchart TB
    subgraph Producers["Producer Clients"]
        direction LR
        P[Producers send messages<br/>to topics/partitions]
    end

    subgraph ZK["ZooKeeper / KRaft — Metadata Management"]
        direction LR
        ZK1[ZK Node 1]
        ZK2[ZK Node 2]
        ZK3[ZK Node 3]
    end

    subgraph Cluster["Broker Cluster (Commit Log)"]
        direction LR
        subgraph B1["Broker 1"]
            direction TB
            B1P0["Topic-A P0<br/>Leader"]
            B1P1["Topic-A P1<br/>Follower"]
            B1P2["Topic-A P2<br/>Follower"]
        end
        subgraph B2["Broker 2"]
            direction TB
            B2P0["Topic-A P0<br/>Follower"]
            B2P1["Topic-A P1<br/>Follower"]
            B2P2["Topic-A P2<br/>Leader"]
        end
        subgraph B3["Broker 3"]
            direction TB
            B3P0["Topic-A P0<br/>Follower"]
            B3P1["Topic-A P1<br/>Leader"]
            B3P2["Topic-A P2<br/>Follower"]
        end
    end

    subgraph Consumers["Consumer Groups (Parallel Consumption)"]
        direction LR
        C1["Consumer 1<br/>Group A"]
        C2["Consumer 2<br/>Group A"]
        C3["Consumer 3<br/>Group B"]
    end

    Producers --> Cluster
    ZK -.协调 / 元数据 / Leader 选举.-> Cluster
    Cluster --> Consumers

    B1P0 -.副本同步.- B2P0
    B2P2 -.副本同步.- B3P2
    B3P1 -.副本同步.- B1P1

核心模块/角色详解

角色 职责 详细说明
Producer 消息生产者 - 将消息发布到 Topic
- 选择分区 (round-robin 或 key-hash)
- 支持异步发送、同步发送、事务发送
Broker 消息存储节点 - 存储分区数据 (commit log)
- 处理读写请求
- Leader 处理读写,Follower 仅复制
- 可水平扩展
ZooKeeper/KRaft 元数据管理 - 存储集群元数据 (topic/partition 配置)
- Controller 选举
- 旧版本用 ZooKeeper,2.8+ 用 KRaft
Topic 消息类别 - 消息的逻辑分类
- 一个 Topic 包含多个 Partition
Partition 并行单元 - 有序、不可变的消息序列
- 每个 Partition 有一个 Leader,多个 Follower
- 是并行消费的基本单位
Consumer 消息消费者 - 从 Partition 读取消息
- 记录消费位置 (offset)
- 支持自动/手动提交 offset
Consumer Group 消费组 - 一组 Consumer 协同消费
- 组内每个 Consumer 消费不同 Partition
- 不同组消费完整数据副本

使用示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// ====== 生产者示例 ======
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");              // 强一致性
props.put("retries", 3);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);

// 发送消息
producer.send(new ProducerRecord<>("task-status", 
    "job-001", 
    "{\"status\":\"RUNNING\",\"timestamp\":\"2025-04-20\"}"));
producer.close();

// ====== 消费者示例 ======
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:9092");
consumerProps.put("group.id", "task-processor");
consumerProps.put("enable.auto.commit", "true");
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Arrays.asList("task-status"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println("Received: " + record.value());
    }
}

运维与调优

部署建议

配置项 推荐值 说明
JVM 堆内存 6-8G 不超过 32G (避免大堆 GC 问题)
OS 文件描述符 100,000+ 调整 /etc/security/limits.conf
磁盘类型 NVMe SSD 顺序写密集,需要高 IOPS
网络 10Gbps+ 副本同步和数据传输

关键配置参数 (server.properties)

# 日志保留策略 - 按大小或时间
log.retention.hours=168                    # 保留7天
log.retention.bytes=107374182400          # 100GB
log.segment.bytes=1073741824              # 每个segment 1GB

# 副本配置
default.replication.factor=3               # 生产环境至少3副本
min.insync.replicas=2                       # 至少2个同步副本

# 性能优化
num.network.threads=3                       # 网络线程数 = CPU核数
num.io.threads=8                            # IO线程数 = 2*CPU核数
socket.send.buffer.bytes=102400            # 100KB发送缓冲区
socket.receive.buffer.bytes=102400         # 100KB接收缓冲区

# 清理策略
log.cleanup.policy=delete                   # 或 compact (压缩)
log.cleaner.enable=true                     # 启用日志清理

生产者调优

参数 说明 推荐值
acks 确认级别 all (强一致) / 1 (平衡) / 0 (最大吞吐)
retries 重试次数 2147483647 (无限重试)
batch.size 批次大小 16384 (16KB) - 可调整到 32KB
linger.ms 等待时间 5 (毫秒) - 增加批次
buffer.memory 缓冲区大小 33554432 (32MB)
compression.type 压缩 lz4 / snappy (平衡速度和压缩率)

消费者调优

参数 说明 推荐值
fetch.min.bytes 最小拉取字节 1 (低延迟) / 4096 (高吞吐)
fetch.max.wait.ms 最大等待 500 (毫秒)
max.poll.records 单次拉取记录数 500 - 根据处理能力调整
enable.auto.commit 自动提交 false (精确控制)
auto.offset.reset 偏移重置 earliest (从头) / latest (从新)

监控指标

  • JVM: GC 频率、堆内存使用、线程数
  • Broker: Under Replicated Partitions, ISR 收缩率
  • Topic: 消息流入/流出速率、消息大小
  • Consumer: Lag (消费滞后)、消费速率
  • 网络: 入站/出站流量

常见问题排查

问题 可能原因 解决方法
高延迟 批次太小、副本同步慢 增加 linger.ms、检查网络
数据丢失 acks=0 或 1 改为 acks=all + min.insync.replicas=2
消费滞后 消费者处理慢 增加分区数、增加消费者
Leader 选举频繁 Broker 不稳定 检查网络、增加 zookeeper.session.timeout.ms

运维工具

  • 官方工具: kafka-topics.sh, kafka-consumer-groups.sh, kafka-producer-perf-test.sh
  • 监控: Prometheus + Grafana, Confluent Control Center
  • 管理: AKHQ, Kafka Manager, CMAK
  • 日志收集: ELK Stack, Loki


2. RabbitMQ

产品定位

最灵活的开源消息代理,企业级应用和微服务通信的首选

适用场景

  • 微服务间通信
  • 任务队列分发
  • RPC 调用模式
  • 需要复杂路由规则的场景
  • 企业遗留系统集成

架构详解

flowchart TB
    P["Producer Clients<br/>(Publishes messages)"]

    subgraph Broker["RabbitMQ Broker"]
        subgraph EX["Exchange (Router)"]
            direction LR
            Direct[Direct]
            Fanout[Fanout]
            Topic[Topic]
            Headers[Headers]
        end
        Q1[Queue 1]
        Q2[Queue 2]
        Q3[Queue 3]
        EX -->|Bindings| Q1
        EX -->|Bindings| Q2
        EX -->|Bindings| Q3
    end

    CA[Consumer A]
    CB[Consumer B]
    CC[Consumer C]

    P --> EX
    Q1 --> CA
    Q2 --> CB
    Q3 --> CC

Exchange Types(路由策略)

  • Direct:Routing key 完全匹配
  • Fanout:广播到所有绑定的 Queue
  • Topic:通配符匹配(*, #
  • Headers:基于消息头匹配

核心模块/角色详解

角色 职责 详细说明
Producer 消息生产者 - 发送消息到 Exchange
- 指定 Routing Key
- 可设置消息属性 (persistent, priority, etc.)
Exchange 消息路由器 - 接收生产者消息
- 根据类型和 Routing Key 路由到 Queue
- 四种类型: Direct, Fanout, Topic, Headers
Binding 绑定规则 - 连接 Exchange 和 Queue
- 定义 Routing Key 匹配规则
Queue 消息队列 - 存储消息直到被消费
- FIFO 顺序
- 支持持久化、消息 TTL、死信
Consumer 消息消费者 - 从 Queue 获取消息
- Push (订阅) 或 Pull 模式
- 手动/自动 ACK 确认
Virtual Host (vhost) 逻辑隔离 - 类似数据库的 schema
- 不同 vhost 完全隔离
- 用于多租户
Connection/Channel 连接管理 - Connection: TCP 连接
- Channel: 轻量级会话,复用 Connection

使用示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// ====== 生产者示例 ======
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");

try (Connection connection = factory.newConnection();
     Channel channel = connection.createChannel()) {

    // 声明 Exchange
    channel.exchangeDeclare("task-exchange", BuiltinExchangeType.DIRECT, true);

    // 声明 Queue
    channel.queueDeclare("task-queue", true, false, false, null);

    // 绑定
    channel.queueBind("task-queue", "task-exchange", "task.routing.key");

    // 发布消息
    String message = "{\"jobId\":\"job-001\",\"status\":\"RUNNING\"}";
    channel.basicPublish("task-exchange", "task.routing.key",
        MessageProperties.PERSISTENT_TEXT_PLAIN,
        message.getBytes(StandardCharsets.UTF_8));
}

// ====== 消费者示例 ======
try (Connection connection = factory.newConnection();
     Channel channel = connection.createChannel()) {

    channel.queueDeclare("task-queue", true, false, false, null);

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
        System.out.println("Received: " + message);

        // 手动确认
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    };

    channel.basicConsume("task-queue", false, deliverCallback, consumerTag -> {});
}

运维与调优

部署建议

配置项 推荐值 说明
Erlang VM 内存 4-8G RabbitMQ 是 Erlang 应用,注意 Erlang 和 JVM 内存分配平衡
文件描述符 50,000+ ulimit -n 65536
磁盘类型 SSD 消息持久化需要快速 IO
集群规模 3-7 节点 奇数节点便于脑裂投票

关键配置参数 (rabbitmq.conf)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
# 内存相关
vm_memory_high_watermark.relative = 0.4       # 使用40%内存
vm_memory_high_watermark_paging_ratio = 0.5   # 达到50%时开始换页

# 磁盘相关
disk_free_limit.relative = 1.0                 # 保留1倍内存大小的磁盘空间
disk_free_limit.absolute = 2GB                 # 最小2GB

# 网络相关
listeners.tcp.default = 5672
num_acceptors.tcp = 10                          # 接受器线程数

# 队列镜像
queue.master_locator = min-masters              # 主队列位置策略

# 消息持久化
message_store.ignore_confirms = false           # 不忽略确认

# 集群配置
cluster_partition_handling = pause_minority     # 分区处理策略

性能调优

调优项 说明
队列类型 优先使用 Quorum Queues (3.8+) 替代 Classic Queues 实现高可用
惰性队列 x-queue-mode=lazy - 消息尽快写入磁盘,减少内存使用
消息 TTL 设置合理的消息过期时间,避免队列积压
死信队列 配置 DLQ 处理失败消息,避免无限重试
prefetch_count 消费者预取数,设为 10-100 平衡吞吐和公平
连接复用 复用 Connection,多 Channel 共享

集群与高可用

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# Quorum Queue (推荐)
quorum_queue.initial_group_size = 3           # 初始副本数

# Classic Queue 镜像 (旧方式)
policy definition:
{
  "ha-mode": "exactly",
  "ha-params": 2,
  "ha-sync-mode": "automatic"
}

监控指标

  • 队列指标: 消息堆积数、消息速率、消费者数量
  • Erlang VM: 内存使用、调度器利用率、进程数
  • 网络: 连接数、通道数、入站/出站流量
  • 磁盘: I/O 等待、磁盘使用率
  • 资源: 文件描述符使用、套接字缓存

常见问题排查

问题 可能原因 解决方法
内存告警 队列堆积、非惰性队列 开启惰性队列、清理堆积消息
高延迟 持久化配置、磁盘慢 调整持久化策略、升级 SSD
连接断开 心跳超时、网络问题 增加 heartbeat 间隔
流控 生产太快消费太慢 增加消费者、限速生产者

运维工具

  • 管理界面: RabbitMQ Management Plugin (Web UI)
  • 命令行: rabbitmqctl, rabbitmqadmin
  • 监控: Prometheus + Grafana (rabbitmq_prometheus plugin)
  • 日志: /var/log/rabbitmq/ - 检查 scheduler、connection、queue 日志


3. Apache RocketMQ

产品定位

阿里开源的金融级消息中间件,专为电商和交易场景打造

适用场景

  • 电商订单、支付交易
  • 金融级可靠消息投递
  • 需要消息追踪和查询
  • 需要延迟消息的场景
  • 分布式事务消息

架构详解

flowchart TB
    P["Producer Clients<br/>(transactional / normal / delay messages)"]

    subgraph NS["NameServer Cluster (stateless)"]
        direction LR
        NS1["NameServer 1<br/>Topic route<br/>Broker info"]
        NS2["NameServer 2<br/>Topic route<br/>Broker info"]
        NS3["NameServer 3<br/>Topic route<br/>Broker info"]
    end

    subgraph Brokers["Broker Cluster"]
        direction LR
        BM1["Broker 1 Master<br/>Topic-A Queue 0-3"]
        BS1["Broker 1 Slave<br/>Topic-A Queue 0-3"]
        BM2["Broker 2 Master<br/>Topic-B Queue 0-3"]
        BM1 <-.Sync.-> BS1
    end

    subgraph Consumers["Consumer Groups"]
        direction LR
        C1["Consumer 1 (Push)"]
        C2["Consumer 2 (Pull)"]
        C3["Consumer 3 (Push)"]
    end

    P --> NS
    NS -.路由.-> Brokers
    Brokers --> Consumers

每个 Broker 内部有两个关键结构:Commit Log(所有消息顺序写盘的主日志)+ Consume Queue(按 Topic/Queue 维护的偏移量索引),提供高性能写入和快速消费。详见下方角色表。

核心模块/角色详解

角色 职责 详细说明
NameServer 路由注册中心 - 无状态,可水平扩展
- 维护 Topic/Broker 路由信息
- Producer/Consumer 从这里获取路由
- 类似轻量级 ZooKeeper,但更简单
Broker 消息存储节点 - Master-Slave 架构
- Master 处理读写,Slave 同步备份
- 一个 Topic 分为多个 Queue(分区)
Commit Log 存储文件 - 所有消息顺序写入 Commit Log
- 固定大小文件(默认 1G)
- 顺序写,高性能
Consume Queue 消费索引 - 按 Topic/Queue 组织的索引
- 存储 Commit Log 的偏移量
- 加速消费查询
Topic 消息主题 - 逻辑分类
- 由多个 Queue 组成(并行度)
Queue 分区 - Topic 内的并行单元
- 类似 Kafka Partition
- 每个 Queue 在一个 Broker 上
Producer 消息生产者 - 支持同步/异步/OneWay 发送
- 事务消息(2PC)
- 延迟消息(18级)
- 消息查询(按 Message ID/Key)
Consumer 消息消费者 - Push/Pull 两种模式
- 集群消费(负载均衡)/广播消费
- 消息重试、死信队列

延迟消息级别

级别 延迟时间 级别 延迟时间
1 1秒 10 10分钟
2 5秒 11 20分钟
3 10秒 12 30分钟
4 30秒 13 1小时
5 1分钟 14 2小时
6 2分钟
7 3分钟 18 2小时
8 4分钟
9 5分钟

使用示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// ====== 生产者示例 ======
DefaultMQProducer producer = new DefaultMQProducer("task-producer-group");
producer.setNamesrvAddr("localhost:9876");
producer.start();

// 普通消息
Message msg = new Message("task-status",
    "TAG_RUNNING",
    "job-001",
    "{\"jobId\":\"job-001\",\"status\":\"RUNNING\"}".getBytes());
SendResult result = producer.send(msg);

// 延迟消息 (级别 3 = 10秒)
msg.setDelayTimeLevel(3);
producer.send(msg);

// 事务消息
TransactionMQProducer txProducer = new TransactionMQProducer("tx-producer-group");
txProducer.setTransactionListener(new TransactionListener() {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 执行本地事务
        return LocalTransactionState.COMMIT_MESSAGE;
    }
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 事务回查
        return LocalTransactionState.COMMIT_MESSAGE;
    }
});
txProducer.sendMessageInTransaction(msg, null);

// ====== 消费者示例 ======
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("task-consumer-group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("task-status", "TAG_RUNNING || TAG_SUCCESS");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                   ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            System.out.println("Received: " + new String(msg.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

运维与调优

部署建议

配置项 推荐值 说明
JVM 堆内存 8-16G NameServer 4G, Broker 8-16G
OS 调优 关闭 Swap Swap 会严重影响性能
磁盘 NVMe SSD Commit Log 和 Consume Queue 需要高速 IO
网络 10Gbps+ Master-Slave 同步需要大带宽
架构 2n+1 NameServer 建议 3 或 5 个 NameServer

NameServer 配置 (namesrv.properties)

# 端口
listenPort=9876

# JVM 选项 (启动时设置)
# -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m

# 线程配置
serverWorkerThreads=8
serverCallbackExecutorThreads=16

# 路由信息过期时间
routeInfoUnavailablePeriod=10000

Broker 配置 (broker.conf)

# 集群名称
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0

# NameServer 地址
namesrvAddr=nameserver1:9876;nameserver2:9876;nameserver3:9876

# 存储路径
storePathRootDir=/data/rocketmq/store
storePathCommitLog=/data/rocketmq/store/commitlog
storePathConsumeQueue=/data/rocketmq/store/consumequeue
storePathIndex=/data/rocketmq/store/index

# 刷盘策略 - 同步刷盘 (高可靠) 或 异步刷盘 (高性能)
flushDiskType=ASYNC_FLUSH          # SYNC_FLUSH / ASYNC_FLUSH
flushCommitLogTimed=true
flushIntervalTimeToCommitLog=500    # 500ms 刷盘一次

# 副本同步策略
syncFlush=true                        # 从节点同步刷盘
syncMaster=true                       # 同步主从复制

# 文件大小
mapedFileSizeCommitLog=1073741824    # CommitLog 每个文件 1G
mapedFileSizeConsumeQueue=300000      # ConsumeQueue 文件大小

# 消息保留
fileReservedTime=72                     # 保留 72 小时
deleteWhen=04                           # 凌晨4点删除过期文件

# 性能调优
sendMessageThreadPoolNums=128          # 发送线程池
pullMessageThreadPoolNums=128          # 拉取线程池
rejectTransactionMessage=false

Master-Slave 部署架构

flowchart TB
    NS["NameServer Cluster (3 nodes)"]

    subgraph GroupA["Group A"]
        direction TB
        MA["Broker Master<br/>broker-a"]
        SA["Broker Slave<br/>broker-a-s"]
        MA -->|Sync| SA
    end
    subgraph GroupB["Group B"]
        direction TB
        MB["Broker Master<br/>broker-b"]
        SB["Broker Slave<br/>broker-b-s"]
        MB -->|Sync| SB
    end
    subgraph GroupC["Group C"]
        direction TB
        MC["Broker Master<br/>broker-c"]
        SC["Broker Slave<br/>broker-c-s"]
        MC -->|Sync| SC
    end

    NS --> GroupA
    NS --> GroupB
    NS --> GroupC

生产者调优

参数 说明 推荐值
sendMsgTimeout 发送超时 3000ms
retryTimesWhenSendFailed 同步发送重试 2 次
retryAnotherBrokerWhenNotStoreOK 失败时换 Broker true
compressMsgBodyOverHowmuch 压缩阈值 4096 (4KB)
maxMessageSize 最大消息大小 4194304 (4MB)

消费者调优

模式 配置 说明
Push 模式 consumeThreadMin`/consumeThreadMax` 线程数 20-64
Pull 模式 pullBatchSize 批量大小 32
重试策略 retryTimesWhenConsumeFailed -1 (无限重试)
消费模式 CLUSTERING / BROADCASTING 集群/广播

监控指标

  • NameServer: 心跳注册数、路由信息
  • Broker: TPS、消息堆积、刷盘耗时、HA 同步状态
  • JVM: GC、堆内存、线程
  • 存储: Commit Log 写入耗时、磁盘使用率
  • 网络: 连接数、入站/出站流量

常见问题排查

问题 可能原因 解决方法
消息发送失败 NameServer 不可达、Broker 繁忙 检查 NameServer 连接、增加 Broker 线程
消息丢失 异步刷盘 + Master 故障 改为同步刷盘 + 同步复制
消费堆积 消费慢、重试多 增加消费者、优化消费逻辑
高延迟 刷盘慢、GC 频繁 优化 GC、使用 SSD

运维工具

  • 官方工具: mqadmin (命令行管理工具)
  • 控制台: RocketMQ Dashboard (Web UI)
  • 监控: Prometheus + Grafana (rocketmq-exporter)
  • 追踪: RocketMQ Message Trace (消息轨迹)
  • 日志: ~/logs/rocketmqlogs/ - broker.log, namesrv.log


4. NATS

产品定位

云原生、超轻量、极速的消息系统,CNCF 毕业项目

适用场景

  • 云原生微服务通信
  • 边缘计算场景
  • IoT 设备消息
  • 需要极低延迟的场景
  • Kubernetes 原生部署

架构详解

flowchart TB
    P["Publisher Clients<br/>(Publishes to subjects, wildcards supported)"]

    subgraph Cluster["NATS Server Cluster"]
        direction LR
        N1["NATS Server 1<br/>Subjects: task.*, service.><br/>No storage (in-flight only)"]
        N2["NATS Server 2<br/>Subjects: task.*, service.><br/>No storage (in-flight only)"]
        N3["NATS Server 3<br/>Subjects: task.*, service.><br/>No storage (in-flight only)"]
        N1 <-.gossip.-> N2
        N2 <-.gossip.-> N3
    end

    S1["Subscriber 1<br/>task.running"]
    S2["Subscriber 2<br/>task.success"]
    S3["Subscriber 3<br/>service.>"]

    P --> Cluster
    Cluster --> S1
    Cluster --> S2
    Cluster --> S3

Subject Wildcards(主题通配符)

  • * — 匹配一个 token(例:task.* 匹配 task.runningtask.success
  • > — 匹配所有后续(例:service.> 匹配 service.aservice.a.b

核心模块/角色详解

角色 职责 详细说明
NATS Server 消息代理 - 超轻量(二进制 < 20MB)
- 无状态,无持久化存储(Core NATS)
- 自动集群形成(Gossip 协议)
- 支持 TLS、认证、授权
Subject 消息主题 - 类似 Topic,但更轻量
- 支持通配符 *>
- 例如: task.running, service.>
Publisher 消息发布者 - 发布消息到 Subject
- Fire-and-forget 模式
- 极低延迟(微秒级)
Subscriber 消息订阅者 - 订阅 Subject(支持通配符)
- 接收推送的消息
- 自动负载均衡(队列组)
Queue Group 队列组 - 一组订阅者形成负载均衡
- 消息只发送给组内一个订阅者
- 类似 Kafka Consumer Group

NATS 产品系列:

  • Core NATS: 纯内存,极速,不持久化
  • NATS JetStream: 持久化,Exactly-once,流处理

使用示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// ====== Go 语言示例 ======
import "github.com/nats-io/nats.go"

// 连接
nc, _ := nats.Connect(nats.DefaultURL)

// ====== 发布者 ======
nc.Publish("task.running", []byte(`{"jobId":"job-001"}`))

// 请求-响应模式
msg, _ := nc.Request("service.hello", []byte("hello"), time.Second)

// ====== 订阅者 ======
nc.Subscribe("task.running", func(m *nats.Msg) {
    fmt.Printf("Received: %s\n", string(m.Data))
})

// 队列组(负载均衡)
nc.QueueSubscribe("task.running", "worker-group", func(m *nats.Msg) {
    fmt.Printf("Worker received: %s\n", string(m.Data))
})

// ====== JetStream (持久化) ======
js, _ := nc.JetStream()

// 创建流
js.AddStream(&nats.StreamConfig{
    Name:     "TASKS",
    Subjects: []string{"task.>"},
    Storage:  nats.FileStorage,
})

// 发布持久化消息
js.Publish("task.running", []byte(`{"jobId":"job-001"}`))

// 订阅并确认
sub, _ := js.Subscribe("task.running", func(m *nats.Msg) {
    fmt.Printf("Received: %s\n", string(m.Data))
    m.Ack()  // 确认消息
})

运维与调优

部署建议

配置项 推荐值 说明
内存 2-4GB NATS Server 超轻量,2GB 足够
CPU 2-4 核 多连接场景可增加
网络 1Gbps+ 优先低延迟网络
集群规模 3-5 节点 Raft 共识需要奇数节点

NATS Server 配置 (nats-server.conf)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 监听端口
port: 4222
http: 8222          # 监控端点
websocket: 8080

# 集群配置
cluster {
  name: nats-cluster
  listen: 0.0.0.0:6222
  routes: [
    nats-route://nats-1:6222,
    nats-route://nats-2:6222,
    nats-route://nats-3:6222
  ]
}

# 日志
debug: false
trace: false
logtime: true

# 连接限制
max_connections: 65536
max_subscriptions: 0  # 0 = 无限制

# 认证
authorization {
  user: nats
  password: secret
  timeout: 1
}

# JetStream 配置 (如果启用持久化)
jetstream {
  store_dir: /data/jetstream
  max_memory_store: 1GB
  max_file_store: 10GB
}

JetStream 调优 (持久化)

参数 说明 推荐值
store_dir 存储目录 SSD 挂载目录
max_memory_store 内存存储限制 1-4GB
max_file_store 文件存储限制 10-100GB
snapshot_threshold 快照阈值 1000-10000

Stream 配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# Stream 配置示例
name: TASKS
subjects: ["task.>"]
retention: limits    # limits / workqueue / interest
max_consumers: -1    # 无限制
max_msgs: -1         # 无限制
max_bytes: -1        # 无限制
max_age: 168h        # 7天
storage: file        # file / memory
discard: old         # 旧消息先丢弃
num_replicas: 3      # 副本数
duplicate_window: 2m # 去重窗口

监控指标

  • Connections: 当前连接数、总连接数
  • Subscriptions: 订阅数、主题数
  • Messages: 消息速率、消息字节数
  • JetStream: 流状态、消费者状态、持久化存储
  • Server: 内存、CPU、Goroutine (Go 语言)

常见问题排查

问题 可能原因 解决方法
延迟增加 消息过大、连接过多 分批发送、优化连接池
消息丢失 Core NATS 不持久化 改用 JetStream
连接断开 心跳超时 调整 ping_interval
吞吐量低 单连接瓶颈 使用多连接并发

运维工具

  • 管理工具: nats CLI, nats-top
  • 监控: NATS Exporter + Prometheus + Grafana
  • Dashboard: NATS Surveyor
  • K8s: Helm Chart, NATS Operator


5. Apache Pulsar

产品定位

云原生、分层架构的下一代消息系统,多租户和地理复制是核心优势

适用场景

  • 多租户 SaaS 平台
  • 需要地理复制的全球部署
  • 数据存储与计算独立扩展
  • 长期数据留存 + 实时访问
  • 企业级隔离和安全需求

架构详解

flowchart TB
    P["Producer Clients (multi-tenant aware)"]

    subgraph Compute["Pulsar Brokers (无状态计算层)"]
        direction LR
        B1[Broker 1] ~~~ B2[Broker 2] ~~~ B3[Broker 3]
    end

    subgraph Storage["BookKeeper (存储层, 3 副本)"]
        direction LR
        BK1["Bookie 1<br/>本地 SSD"] ~~~ BK2["Bookie 2<br/>本地 SSD"] ~~~ BK3["Bookie 3<br/>本地 SSD"]
    end

    subgraph Aux["辅助组件"]
        direction LR
        ZK["ZooKeeper<br/>元数据"] ~~~ Tier["Tiered Storage<br/>S3 / GCS / OSS"] ~~~ Schema["Schema Registry"]
    end

    subgraph Consumers["Consumer Groups"]
        direction LR
        C1["Consumer Exclusive"] ~~~ C2["Consumer Shared"] ~~~ C3["Consumer Failover"]
    end

    P --> Compute
    Compute <-.读写.-> Storage
    Storage -.-> Aux
    Compute --> Consumers

Pulsar 多租户模型Tenant(租户) → Namespace(命名空间) → Topic(主题)。每个租户有独立的配额、认证、隔离。

核心模块/角色详解

角色 职责 详细说明
Broker 计算层 - 无状态,不存储数据
- 接收/分发消息
- 处理消费者请求
- 可独立扩展(秒级扩缩容)
BookKeeper 存储层 - 持久化存储 Ledger(分片)
- 每个 Ledger 多副本(Quorum)
- 本地 SSD 存储
- 可独立扩展存储容量
Ledger 存储分片 - 消息存储的基本单元
- 分片存储到多个 Bookie
- 类似 Kafka Partition,但更小更灵活
Tenant 租户 - 最高级别的隔离
- 独立的认证/授权
- 独立的资源配额
Namespace 命名空间 - 租户下的二级隔离
- 配置策略(保留、复制等)
Topic 主题 - 持久/非持久
- 分区/非分区
- 全局有序/局部有序
ZooKeeper 元数据管理 - 存储集群元数据
- 协调 Broker 和 Bookie
Tiered Storage 分层存储 - 旧数据自动卸载到 S3/GCS/OSS
- 透明访问,对消费者无感知
- 大幅降低存储成本
Producer 消息生产者 - 支持批次、压缩
- 自动分区路由
- Exactly-once 语义
Consumer 消息消费者 - 四种订阅模式: Exclusive, Shared, Failover, Key_Shared
- 消息确认 ACK
- 游标 (Cursor) 记录消费位置

消费者订阅模式

模式 描述 适用场景
Exclusive 只有一个消费者能消费 需要严格顺序
Shared 多个消费者轮询消费 高吞吐,顺序不重要
Failover 主备切换 高可用 + 顺序
Key_Shared 按 Key 分发 需要 Key 级顺序

使用示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// ====== Java 客户端示例 ======
PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar://localhost:6650")
    .build();

// ====== 生产者 ======
Producer<String> producer = client.newProducer(Schema.STRING)
    .topic("persistent://my-tenant/my-namespace/task-status")
    .create();

MessageId msgId = producer.newMessage()
    .key("job-001")
    .value("{\"status\":\"RUNNING\"}")
    .send();

// ====== 消费者 ======
Consumer<String> consumer = client.newConsumer(Schema.STRING)
    .topic("persistent://my-tenant/my-namespace/task-status")
    .subscriptionName("task-processor")
    .subscriptionType(SubscriptionType.Shared)  // 负载均衡
    .subscribe();

while (true) {
    Message<String> msg = consumer.receive();
    try {
        System.out.println("Received: " + msg.getValue());
        consumer.acknowledge(msg);  // 确认
    } catch (Exception e) {
        consumer.negativeAcknowledge(msg);  // 重试
    }
}

// ====== 地理复制 (Replication) ======
// 创建跨地域复制的 Topic
producer = client.newProducer(Schema.STRING)
    .topic("persistent://my-tenant/my-namespace/global-topic")
    .create();

// 消息自动复制到配置的集群
producer.send("Replicated message");

// ====== 分层存储配置 ======
// 旧数据自动卸载到 S3 (在 broker 配置中)
// managedLedgerOffloadThresholdInSeconds=86400  // 1天后卸载
// 消费者透明访问,无需关心数据在哪里

运维与调优

部署建议

配置项 推荐值 说明
Broker JVM 8-16G 无状态计算层,中等内存即可
BookKeeper JVM 16-32G 存储层需要大内存做缓存
ZooKeeper 4-8G 元数据管理,稳定性优先
磁盘类型 NVMe SSD BookKeeper 需要高 IOPS
网络 10Gbps+ BookKeeper 数据复制需要大带宽
集群规模 3-5 Broker + 3-7 Bookies 独立扩展计算和存储

Broker 配置 (broker.conf)

# 集群配置
clusterName=pulsar-cluster
zookeeperServers=zk1:2181,zk2:2181,zk3:2181
configurationStoreServers=zk1:2181,zk2:2181,zk3:2181

# 端口
brokerServicePort=6650
brokerServicePortTls=6651
webServicePort=8080
webServicePortTls=8443

# 存储配置
managedLedgerDefaultEnsembleSize=3      # Bookies 集合大小
managedLedgerDefaultWriteQuorum=2       # 写入 Quorum
managedLedgerDefaultAckQuorum=2         # 确认 Quorum

# 性能调优
numIoThreads=8                           # IO 线程数 = CPU 核数
numHttpServerThreads=16
maxConcurrentLookupRequests=50000
maxConcurrentProducerRequests=100000

# 消息保留
defaultRetentionTimeInMinutes=10080      # 7天
defaultRetentionSizeInMB=-1              # 无限制

# 负载均衡
loadBalancerSheddingIntervalMinutes=1
loadBalancerSheddingThreshold=0.7

BookKeeper 配置 (bookkeeper.conf)

# 存储路径
journalDirectory=/data/bookkeeper/journal
ledgerDirectories=/data/bookkeeper/ledgers
indexDirectories=/data/bookkeeper/index

# 存储配置
journalSyncData=true                       # 同步刷盘
ledgerStorageClass=org.apache.bookkeeper.bookie.InterleavedLedgerStorage
ensemblePlacementPolicy=RackawareEnsemblePlacementPolicy

# 性能调优
numJournalCallbackThreads=8
numAddWorkerThreads=8
numReadWorkerThreads=16
fileInfoFormatVersion=3

# 磁盘使用
diskUsageThreshold=0.95
diskUsageWarnThreshold=0.90
gcWaitTime=900

# 网络
nettyServerTcpNoDelay=true
nettyServerSoSndBuf=131072
nettyServerSoRcvBuf=131072

分层存储配置 (Tiered Storage)

# 启用分层存储
managedLedgerOffloadDriver=aws-s3
s3ManagedLedgerOffloadBucket=pulsar-cold-storage
s3ManagedLedgerOffloadRegion=us-east-1
s3ManagedLedgerOffloadMaxBlockSizeInBytes=67108864  # 64MB

# 触发卸载的阈值
managedLedgerOffloadThresholdInSeconds=86400         # 1天后卸载
managedLedgerOffloadDeletionLagInSeconds=604800      # 保留本地副本7天

生产者调优

参数 说明 推荐值
sendTimeoutMs 发送超时 30000ms
blockIfQueueFull 队列满时阻塞 true
maxPendingMessages 最大挂起消息 1000-10000
batchingEnabled 启用批次 true
batchingMaxPublishDelayMicros 批次延迟 1000 (1ms)
compressionType 压缩 LZ4 / ZSTD

消费者调优

参数 说明 推荐值
receiverQueueSize 接收队列大小 1000
ackTimeout 确认超时 60000ms
negativeAckRedeliveryDelay 重试延迟 60000ms
subscriptionType 订阅模式 Shared (高吞吐) / Exclusive (顺序)

监控指标

  • Broker: 消息速率、生产/消费延迟、连接数、存储水位
  • BookKeeper: 写入延迟、Quorum 大小、磁盘使用、缓存命中
  • ZooKeeper: 会话数、请求延迟、znode 数
  • Topic/Namespace: 消息速率、存储使用、订阅数
  • JVM: GC 频率、堆内存、线程数

常见问题排查

问题 可能原因 解决方法
写入延迟高 BookKeeper Quorum 慢 检查 Bookie 磁盘、增加 ackQuorum
读延迟高 数据已卸载到 S3 调整 offload 阈值、增加 Bookie 缓存
Ledger 无法关闭 Bookie 故障 检查 Bookie 状态、修复或替换 Bookie
Backlog 增长 消费慢 增加消费者、调整订阅模式

运维工具

  • 官方工具: pulsar-admin, pulsar-client, pulsar-perf
  • 管理界面: Pulsar Manager, StreamNative Console
  • 监控: Prometheus + Grafana (pulsar-metrics)
  • K8s: Helm Chart, Pulsar Operator (StreamNative / DataStax)
  • 日志: Broker 日志、Bookie 日志、ZooKeeper 日志


6. 小众方案 & 数据库消息队列

概述

除了主流的专业消息队列,还有一些轻量级或基于数据库的消息队列方案,适合特定场景。


6.1 Redis Pub/Sub

特性 说明
定位 Redis 内置的简单发布订阅功能
持久化 ❌ 不持久化,消息仅在内存中
适用场景 简单实时通知、缓存失效、轻量级事件
优点 零依赖、极低延迟、部署简单
缺点 消息可能丢失、无消费确认、无回溯

使用示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import redis

# 发布
r = redis.Redis()
r.publish('task-notifications', '{"jobId":"001","status":"RUNNING"}')

# 订阅
pubsub = r.pubsub()
pubsub.subscribe('task-notifications')
for message in pubsub.listen():
    if message['type'] == 'message':
        print(message['data'])

6.2 Redis Streams (Redis 5.0+)

特性 说明
定位 Redis 的持久化事件流
持久化 ✅ 支持 RDB/AOF 持久化
适用场景 事件溯源、轻量级消息队列
优点 持久化、消费组、消息回溯
缺点 容量受限于内存、无跨节点复制

使用示例:

1
2
3
4
5
6
# 添加消息
r.xadd('task-stream', {'jobId': '001', 'status': 'RUNNING'})

# 消费组消费
r.xgroup_create('task-stream', 'worker-group', id='0', mkstream=True)
messages = r.xreadgroup('worker-group', 'consumer-1', {'task-stream': '>'}, count=10)

6.3 PostgreSQL LISTEN/NOTIFY

特性 说明
定位 PostgreSQL 内置的消息通知
持久化 ⚠️ 通知本身不持久化,但可以结合表
适用场景 数据库变更通知、轻量级事件
优点 事务一致性、无额外依赖
缺点 不适合高吞吐、无消费确认

使用示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
-- 发送通知
NOTIFY task_channel, '{"jobId":"001","status":"RUNNING"}';

-- 或使用触发器自动发送
CREATE OR REPLACE FUNCTION notify_task_change()
RETURNS trigger AS $$
BEGIN
    PERFORM pg_notify('task_channel', row_to_json(NEW)::text);
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER task_status_trigger
    AFTER UPDATE ON tasks
    FOR EACH ROW EXECUTE FUNCTION notify_task_change();

6.4 MySQL + 定时轮询

特性 说明
定位 用数据库表模拟消息队列
持久化 ✅ 完全持久化
适用场景 简单任务队列、已有数据库的场景
优点 事务支持、可靠、无额外运维
缺点 性能有限、轮询有延迟

使用示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
CREATE TABLE task_queue (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    payload JSON NOT NULL,
    status ENUM('pending', 'processing', 'done', 'failed') DEFAULT 'pending',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    locked_at TIMESTAMP NULL,
    INDEX idx_status_created (status, created_at)
);

-- 领取任务 (使用 SELECT ... FOR UPDATE SKIP LOCKED)
START TRANSACTION;
SELECT * FROM task_queue 
WHERE status = 'pending' 
ORDER BY created_at 
LIMIT 1 FOR UPDATE SKIP LOCKED;
-- 更新状态为 processing...
COMMIT;

6.5 Apache ActiveMQ / ActiveMQ Artemis

特性 说明
定位 老牌 JMS 消息中间件
持久化 ✅ 支持
适用场景 Java 企业遗留系统、JMS 标准
优点 JMS 兼容、多种协议、企业级
缺点 性能一般、社区活跃度下降

使用示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// JMS API 使用
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("task-queue");

// 发送
MessageProducer producer = session.createProducer(queue);
TextMessage message = session.createTextMessage("{\"jobId\":\"001\"}");
producer.send(message);

// 接收
MessageConsumer consumer = session.createConsumer(queue);
TextMessage received = (TextMessage) consumer.receive();

6.6 其他小众方案

方案 特点 适用场景
Beanstalkd 简单任务队列、延迟任务 任务调度、后台作业
Gearman 任务分发框架 分布式任务处理
ZeroMQ 消息传输库 自定义消息系统、高性能
NSQ 实时分布式消息平台 替代方案之一
IBM MQ 企业级商业 MQ 传统企业集成

何时选择这些方案?

场景 推荐方案
已有 Redis,简单通知 Redis Pub/Sub
已有 Redis,需要持久化 Redis Streams
已有 PostgreSQL,数据库变更 PostgreSQL LISTEN/NOTIFY
已有 MySQL,简单任务 MySQL 任务表 + 轮询
Java 遗留系统 ActiveMQ Artemis


7. 云厂商消息队列服务

概述

三大云厂商都提供托管的消息队列服务,免去运维负担,提供高可用和弹性扩展。


7.1 AWS SQS + SNS

架构图

flowchart TB
    subgraph Region["AWS Region"]
        subgraph SNS["SNS (Simple Notification Service)"]
            direction LR
            TA[Topic A]
            TB[Topic B]
        end
    end

    SQS1["SQS Queue 1 (Standard)"]
    SQS2["SQS Queue 2 (FIFO)"]
    Lambda["Lambda (Serverless)"]
    HTTP["HTTP/S Webhook"]

    EC2["EC2 Consumer (Polling)"]
    ECS["ECS Consumer (Polling)"]

    TA --> SQS1
    TA --> Lambda
    TB --> SQS2
    TB --> HTTP

    SQS1 --> EC2
    SQS2 --> ECS

SNS (Simple Notification Service) - 发布订阅

特点:

  • Pub/Sub 消息推送
  • 支持多种订阅端:SQS、Lambda、HTTP、Email、SMS
  • 高可用、全托管

使用示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import boto3

sns = boto3.client('sns', region_name='us-east-1')

# 创建 Topic
topic_arn = sns.create_topic(Name='task-status')['TopicArn']

# 发布消息
sns.publish(
    TopicArn=topic_arn,
    Message='{"jobId":"001","status":"RUNNING"}',
    MessageAttributes={
        'jobId': {'DataType': 'String', 'StringValue': '001'}
    }
)

# 订阅 - SQS
sns.subscribe(
    TopicArn=topic_arn,
    Protocol='sqs',
    Endpoint=queue_arn
)

SQS (Simple Queue Service) - 消息队列

两种队列类型:

特性 Standard Queue FIFO Queue
顺序 最佳努力 严格有序
去重 不保证 自动去重
吞吐量 几乎无限 ~3000 msg/s
延迟 略高

使用示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
sqs = boto3.client('sqs', region_name='us-east-1')

# 创建队列
queue_url = sqs.create_queue(
    QueueName='task-queue.fifo',  # .fifo 后缀表示 FIFO 队列
    Attributes={
        'FifoQueue': 'true',
        'ContentBasedDeduplication': 'true',
        'VisibilityTimeout': '300',  # 5分钟可见性超时
        'MessageRetentionPeriod': '86400'  # 保留1天
    }
)['QueueUrl']

# 发送消息
sqs.send_message(
    QueueUrl=queue_url,
    MessageBody='{"jobId":"001","status":"RUNNING"}',
    MessageGroupId='job-group',  # FIFO 需要
    MessageDeduplicationId='001'
)

# 接收消息
response = sqs.receive_message(
    QueueUrl=queue_url,
    MaxNumberOfMessages=10,
    WaitTimeSeconds=20  # 长轮询
)

for msg in response.get('Messages', []):
    print(msg['Body'])
    # 删除消息
    sqs.delete_message(
        QueueUrl=queue_url,
        ReceiptHandle=msg['ReceiptHandle']
    )

7.2 Azure Service Bus

架构图

flowchart TB
    subgraph NS["Azure Service Bus Namespace"]
        subgraph Topics["Topics — Pub/Sub"]
            direction LR
            subgraph TopicA["Topic A"]
                S1A[Subscription 1]
                S2A[Subscription 2]
            end
            subgraph TopicB["Topic B"]
                S1B[Subscription 1]
                S2B[Subscription 2]
            end
        end
        subgraph Queues["Queues — Point-to-Point"]
            direction LR
            Q1["Queue 1<br/>(Standard)"]
            Q2["Queue 2<br/>(Session)"]
        end
    end

    Fn["Azure Functions<br/>(Serverless)"]
    App["Azure App Service<br/>(WebJobs)"]
    VM["VM / AKS<br/>(Worker Role)"]

    NS --> Fn
    NS --> App
    NS --> VM

核心概念

概念 说明
Namespace 顶级容器,包含 Queue 和 Topic
Queue 点对点,先入先出
Topic + Subscription 发布订阅,支持过滤
Session 会话状态,按顺序处理
Dead Letter Queue 死信队列,存放处理失败的消息

使用示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import com.azure.messaging.servicebus.*;

// 客户端
ServiceBusProcessorClient processor = new ServiceBusClientBuilder()
    .connectionString("Endpoint=sb://xxx.servicebus.windows.net/;...")
    .processor()
    .queueName("task-queue")
    .processMessage(context -> {
        System.out.println("Received: " + context.getMessage().getBody().toString());
        context.complete();  // 确认消息
    })
    .processError(context -> {
        System.out.println("Error: " + context.getException());
    })
    .buildProcessorClient();

processor.start();

// 发送消息
ServiceBusSenderClient sender = new ServiceBusClientBuilder()
    .connectionString(connectionString)
    .sender()
    .queueName("task-queue")
    .buildSenderClient();

sender.sendMessage(new ServiceBusMessage("{\"jobId\":\"001\"}"));

7.3 Google Cloud Pub/Sub

架构图

flowchart TB
    subgraph Region["Google Cloud Region (e.g. us-central1)"]
        subgraph PS["Google Cloud Pub/Sub<br/>(Globally distributed, fully managed)"]
            direction LR
            TA[Topic A]
            TB[Topic B]
        end
    end

    Sub1["Subscription 1 (Push)<br/>Acknowledgements<br/>Retry policy"]
    Sub2["Subscription 2 (Pull)<br/>Acknowledgements<br/>Exactly-once"]
    Sub3["Subscription 3 (Push)<br/>Dead letter topic<br/>Ordering key"]

    CF["Cloud Functions<br/>(Serverless)"]
    GKE["GKE (K8s)<br/>Consumer Pods"]
    CR["Cloud Run<br/>(Containers)"]

    PS --> Sub1
    PS --> Sub2
    PS --> Sub3

    Sub1 --> CF
    Sub2 --> GKE
    Sub3 --> CR

Pub/Sub 核心特性

  • 全球复制:消息自动在多个 Region 复制
  • 至少一次投递:保证消息至少被投递一次
  • Exactly-once:订阅级别精确一次处理
  • 无限扩展:自动扩展,无需预配置容量
  • 消息保留:最多 7 天

产品特点

特性 说明
全球分布 跨 Region 自动复制,高可用
完全托管 无需管理任何服务器
弹性扩展 自动扩展,支持百万级 TPS
消息保留 最多 7 天
最大消息大小 10MB
Exactly-once 订阅级别精确一次处理
死信队列 自动创建和管理
排序 支持按排序键 (Ordering Key) 排序

使用示例 (Google Cloud SDK):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from google.cloud import pubsub_v1
import json

# ====== 生产者 ======
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path("your-project-id", "task-topic")

# 发布消息
message = {"jobId": "001", "status": "RUNNING"}
future = publisher.publish(
    topic_path,
    json.dumps(message).encode("utf-8"),
    jobId="001",  # 属性
    orderingKey="job-001"  # 排序键,保证有序
)
message_id = future.result()
print(f"Published message ID: {message_id}")

# ====== 消费者 (Push) ======
# 使用 Cloud Functions 或 Cloud Run 自动触发

# ====== 消费者 (Pull) ======
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    "your-project-id", "task-subscription"
)

def callback(message):
    print(f"Received: {message.data.decode()}")
    message.ack()  # 确认

streaming_pull_future = subscriber.subscribe(
    subscription_path, 
    callback=callback
)

print(f"Listening for messages on {subscription_path}")
try:
    streaming_pull_future.result()
except KeyboardInterrupt:
    streaming_pull_future.cancel()

# ====== 配置 Exactly-once ======
# 创建支持精确一次的订阅
subscription = subscriber.create_subscription(
    request={
        "name": subscription_path,
        "topic": topic_path,
        "enable_exactly_once_delivery": True,
        "ack_deadline_seconds": 60,
        "message_retention_duration": {"seconds": 604800}  # 7天
    }
)

# ====== 死信队列 ======
# 创建带死信队列的订阅
subscription = subscriber.create_subscription(
    request={
        "name": subscription_path,
        "topic": topic_path,
        "dead_letter_policy": {
            "dead_letter_topic": dead_letter_topic_path,
            "max_delivery_attempts": 5
        }
    }
)

其他云厂商

  • 阿里云 - 消息队列 RocketMQ、消息队列 MQ
  • 腾讯云 - TDMQ for RocketMQ、TDMQ for Pulsar、CMQ

7.4 云厂商消息队列对比总结

特性 AWS SQS/SNS Azure Service Bus Google Cloud Pub/Sub
最大消息大小 256KB (SQS) / 256KB (SNS) 256KB (Standard) / 100MB (Premium) 10MB
消息保留 最多 14 天 最多 90 天 最多 7 天
延迟消息 ⚠️ (Visibility Timeout 模拟) ✅ (Scheduled)
事务消息 ✅ (Sessions)
顺序保证 FIFO 队列 Sessions/Sequencing Ordering Key
死信队列
消息追踪 ⚠️ (CloudTrail)
Pub/Sub SNS Topics Pub/Sub 原生
Serverless 集成 Lambda Azure Functions Cloud Functions
全球部署 ✅ 多 Region ✅ 多 Region ✅ 全球复制
Exactly-once ⚠️

何时选择云厂商托管服务?

情况 推荐
不想运维 MQ 集群 云厂商托管服务
已经在使用特定云厂商 该云厂商的 MQ 服务
需要快速上线 云厂商托管服务
全球多 Region 部署 AWS / Azure / Google Cloud
主要在中国运营 阿里云 / 腾讯云
有极高的定制化需求 自建开源 MQ


运维与调优对比总结

部署难度对比

产品 组件数 部署难度 学习曲线 推荐方式
Kafka 2 (Broker + ZK/KRaft) Kubernetes Operator / Confluent
RabbitMQ 1 (Broker) 🏆 低 🏆 低 Docker / 包管理器
RocketMQ 2 (NameServer + Broker) Docker / 官方脚本
NATS 1 (Server) 🏆 极低 🏆 最低 Docker / 单二进制
Pulsar 3 (Broker + BookKeeper + ZK) Kubernetes Operator

资源需求对比 (生产环境)

产品 内存 CPU 磁盘 网络
Kafka 8-16G 4-8核 SSD/NVMe 10Gbps
RabbitMQ 4-8G 2-4核 SSD 1Gbps+
RocketMQ 8-16G 4-8核 NVMe SSD 10Gbps
NATS 2-4G 2-4核 可选 (JetStream) 1Gbps+
Pulsar Broker: 8-16G
Bookie: 16-32G
4-8核 NVMe SSD 10Gbps+

监控难度对比

产品 监控工具 指标完备度 告警配置 可视化
Kafka JMX Exporter Grafana
RabbitMQ Prometheus Plugin Web UI + Grafana
RocketMQ Exporter Dashboard + Grafana
NATS Exporter Grafana
Pulsar Prometheus Metrics 🏆 极高 Grafana

调优复杂度对比

产品 参数数量 调优粒度 文档质量 社区支持
Kafka 🏆 极佳 🏆 最强
RabbitMQ
RocketMQ 中 (中文)
NATS 🏆 少
Pulsar 🏆 极细 增长中

日常运维工作对比

运维任务 Kafka RabbitMQ RocketMQ NATS Pulsar
容量规划 需规划分区 简单 需规划队列 🏆 极简单 独立扩展
扩容 重平衡分区 简单 简单 🏆 极简单 🏆 独立扩展
备份恢复 复制分区 复制队列 复制文件 (JetStream) Ledger 复制
故障排查 简单 🏆 最简单 复杂
升级 滚动升级 简单 滚动升级 🏆 极简单 分阶段升级

总结对比

维度 Kafka RabbitMQ RocketMQ NATS Pulsar
架构复杂度 🏆 最低
运维复杂度 🏆 最低
吞吐量 🏆 最高 很高 中高
延迟 🏆 最低 🏆 极低
多租户 ⚠️ ⚠️ 🏆 最佳
事务 🏆 最佳
延迟消息 🏆 最佳
云原生 ⚠️ ⚠️ ⚠️ 🏆 最佳
全球部署 ⚠️ ⚠️ ⚠️ 🏆 最佳

选择建议

  • 选 Kafka:日志聚合、流处理、大数据生态
  • 选 RabbitMQ:企业应用、微服务通信、灵活路由
  • 选 RocketMQ:电商交易、金融场景、需要事务/延迟消息
  • 选 NATS:云原生微服务、边缘计算、极低延迟
  • 选 Pulsar:多租户 SaaS、全球部署、分层存储需求

运维选择建议

团队情况 推荐产品 理由
小型团队,运维经验少 RabbitMQ / NATS 部署简单,运维成本低
已有 Kafka 经验 Kafka 生态成熟,人才多
中国团队,电商/金融 RocketMQ 中文文档,场景匹配
云原生/K8s 团队 NATS / Pulsar 云原生设计,Operator 支持
需要多租户/全球部署 Pulsar 架构最匹配
资源受限,边缘场景 NATS 超轻量,低资源消耗
维度 Kafka RabbitMQ RocketMQ NATS Pulsar
架构复杂度 🏆 最低
运维复杂度 🏆 最低
吞吐量 🏆 最高 很高 中高
延迟 🏆 最低 🏆 极低
多租户 ⚠️ ⚠️ 🏆 最佳
事务 🏆 最佳
延迟消息 🏆 最佳
云原生 ⚠️ ⚠️ ⚠️ 🏆 最佳
全球部署 ⚠️ ⚠️ ⚠️ 🏆 最佳

选择建议

  • 选 Kafka:日志聚合、流处理、大数据生态
  • 选 RabbitMQ:企业应用、微服务通信、灵活路由
  • 选 RocketMQ:电商交易、金融场景、需要事务/延迟消息
  • 选 NATS:云原生微服务、边缘计算、极低延迟
  • 选 Pulsar:多租户 SaaS、全球部署、分层存储需求