RocketMQ
2025/12/1大约 4 分钟
RocketMQ基本概念和功能
RocketMQ是阿里巴巴开源的分布式消息中间件,它提供了高吞吐量、低延迟的消息传递机制,支持事务消息、顺序消息、定时消息等多种特性。下面是RocketMQ的基本概念和功能:
- 发布/订阅消息系统:RocketMQ提供了发布/订阅消息系统功能,可以让生产者将消息发布到主题中,让消费者从主题中订阅消息。
- 事务消息:RocketMQ提供了事务消息功能,可以确保消息发送和本地事务的原子性,适用于分布式事务场景。
- 顺序消息:RocketMQ提供了顺序消息功能,可以确保消息按照发送顺序被消费,适用于需要严格顺序的场景。
- 定时消息:RocketMQ提供了定时消息功能,可以在指定时间后才会被消费,适用于延迟任务场景。
- 高可用性:RocketMQ支持主从架构和主从切换,可以保障RocketMQ的高可用性。
为什么要用RocketMQ?
使用RocketMQ有以下几个优点:
- 功能丰富:RocketMQ支持事务消息、顺序消息、定时消息等多种特性,可以满足不同的业务需求。
- 高吞吐量:RocketMQ可以处理每秒数十万条消息,适用于大规模的数据处理场景。
- 低延迟:RocketMQ可以在毫秒级别内传递消息,适用于实时数据处理场景。
- 高可用性:RocketMQ支持主从架构和主从切换,可以保障RocketMQ的高可用性。
- 可扩展性:RocketMQ支持水平扩展,可以通过增加Broker来扩展RocketMQ集群的容量。
- 社区活跃:RocketMQ是阿里巴巴开源的项目,有着庞大的社区支持和活跃的开发者社区,可以保证RocketMQ的更新和维护。
总之,使用RocketMQ可以帮助开发者构建高可用、高可靠的分布式消息系统,提高应用程序的可用性、灵活性和可维护性。
RocketMQ应用场景?
RocketMQ主要应用于异步处理、应用解耦、流量削峰、分布式事务、顺序消息、定时消息、日志收集和系统集成等场景,可以帮助开发者构建高可用、高可靠的分布式应用。
RocketMQ核心组件
RocketMQ的核心组件包括:
- Producer:生产者,用于发送消息到RocketMQ。
- Consumer:消费者,用于从RocketMQ接收消息。
- NameServer:命名服务,用于Broker的注册和发现,类似Zookeeper。
- Broker:RocketMQ服务器,用于存储消息和处理客户端请求。
- Topic:主题,用于分类存储消息。
- MessageQueue:消息队列,是Topic的物理分区,一个Topic可以有多个MessageQueue。
RocketMQ核心架构图
+------------------+ +------------------+ +------------------+
| Producer | | Producer | | Producer |
| | | | | |
+------------------+ +------------------+ +------------------+
| | |
| | |
| | |
+-------------------------------------------------------------+
| NameServer Cluster |
| +--------------+ +--------------+ +--------------+ |
| | NameServer 1 | | NameServer 2 | | NameServer 3 | |
| +--------------+ +--------------+ +--------------+ |
+-------------------------------------------------------------+
| | |
| | |
| | |
+-------------------------------------------------------------+
| Broker Cluster |
| +--------------+ +--------------+ +--------------+ |
| | Broker 1 | | Broker 2 | | Broker 3 | |
| | Master-Slave| | Master-Slave| | Master-Slave| |
| +--------------+ +--------------+ +--------------+ |
+-------------------------------------------------------------+
| | |
| | |
| | |
+------------------+ +------------------+ +------------------+
| Consumer | | Consumer | | Consumer |
| Group 1 | | Group 2 | | Group 3 |
+------------------+ +------------------+ +------------------+在上面的架构图中,分别展示了生产者通过NameServer发现Broker,然后将消息发送到Broker,Broker将消息存储在MessageQueue中,然后消费者从Broker中消费消息。
RocketMQ安装与测试
Windows安装
1、下载RocketMQ
从RocketMQ官网下载最新版本的RocketMQ。
2、解压文件
将下载的压缩包解压到本地目录,例如 D:\rocketmq。
3、设置环境变量
添加 ROCKETMQ_HOME 环境变量,指向RocketMQ的解压目录。
4、启动NameServer
cd D:\rocketmq\bin
start mqnamesrv.cmd5、启动Broker
cd D:\rocketmq\bin
start mqbroker.cmd -n localhost:98766、访问控制台
可以使用第三方控制台,如 rocketmq-console-ng:
java -jar rocketmq-console-ng-1.0.0.jar --rocketmq.config.namesrvAddr=localhost:9876访问 http://localhost:8080/
Linux安装
1、下载RocketMQ
wget https://downloads.apache.org/rocketmq/5.1.4/rocketmq-all-5.1.4-bin-release.zip2、解压文件
unzip rocketmq-all-5.1.4-bin-release.zip
mv rocketmq-all-5.1.4-bin-release /opt/rocketmq3、设置环境变量
echo 'export ROCKETMQ_HOME=/opt/rocketmq' >> ~/.bashrc
echo 'export PATH=$PATH:$ROCKETMQ_HOME/bin' >> ~/.bashrc
source ~/.bashrc4、启动NameServer
cd /opt/rocketmq/bin
nohup sh mqnamesrv &5、启动Broker
cd /opt/rocketmq/bin
nohup sh mqbroker -n localhost:9876 &6、访问控制台
可以使用第三方控制台,如 rocketmq-console-ng:
java -jar rocketmq-console-ng-1.0.0.jar --rocketmq.config.namesrvAddr=localhost:9876访问 http://localhost:8080/
Docker安装
1、Docker 拉取镜像
docker pull apache/rocketmq:4.9.5
docker pull styletang/rocketmq-console-ng:latest2、启动NameServer
docker run -d \
--name rmqnamesrv \
-p 9876:9876 \
--restart=always \
apache/rocketmq:4.9.5 \
sh mqnamesrv3、启动Broker
docker run -d \
--name rmqbroker \
-p 10911:10911 \
-p 10909:10909 \
-e "NAMESRV_ADDR=rmqnamesrv:9876" \
--link rmqnamesrv:rmqnamesrv \
--restart=always \
apache/rocketmq:4.9.5 \
sh mqbroker4、启动控制台
docker run -d \
--name rmqconsole \
-p 8080:8080 \
-e "JAVA_OPTS=-Drocketmq.namesrv.addr=rmqnamesrv:9876" \
--link rmqnamesrv:rmqnamesrv \
--restart=always \
styletang/rocketmq-console-ng:latest5、访问控制台
http://localhost:8080/
Spring Boot 整合 RocketMQ
1、添加依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>2、配置RocketMQ
rocketmq:
name-server: localhost:9876
producer:
group: my-producer-group3、发送消息
@Service
public class RocketMQProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendMessage(String topic, String message) {
rocketMQTemplate.convertAndSend(topic, message);
}
}4、接收消息
@Service
@RocketMQMessageListener(
topic = "my-topic",
consumerGroup = "my-consumer-group"
)
public class RocketMQConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("收到消息:" + message);
}
}