17370845950

SprigBoot整合rocketmq-v5-client-spring-boot的示例详解
目录
  • 建议使用jdk11以上版本
    • ubuntu安装RocketMQ
  • 启动 namesrv
    • 启动 Broker 消息存储中心和 Proxy 代理
      • 测试消息收发
        • 可以通过 mqadmin 命令创建
        • 注意 TestTopic 是topic名称
      • 先用官方原生 rocketmq-client-java JDK
      • 整合SpringBoot rocketmq-v5-client-spring-boot
        • 先创建变量
          • 注意:
        • 编写工具类
          • 测试代码
            • yml配置
            • 发送消息
            • 普通消息(广播模式)
            • 日志 (注意consumerGroup 不同)
            • 负载均衡模式 把consumerGroup改为 RocketMQVariable.NORMAL_GROUP
            • 日志 (会自动选择一个消费者消费)
          • 顺序消费
            • 日志
          • 定时/延时任务消息 (可自定义时间)
            • 日志
          • 事务处理情况1
            • 如果在工具类里面提交了事务 transaction.commit();下面的就不会进入处理了
            • 而是直接消费了
            • 日志
          • 事务处理情况2
            • 使用官方事务处理机制处理事务
            • 事务提交后才会被消费
            • 日志
        1. 安装RocketMQ 服务端
          Apache RocketMQ官方网站
          5.X文档地址
          rocketmq-v5-client-spring-boot官方示例
          rocketmq-v5-client-spring-boot-starter maven仓库
          RocketMQ服务端 的安装包分为两种,二进制包和源码包。这里以5.3.2版本做示例 。 点击这里 下载 Apache RocketMQ 5.3.2的源码包。你也可以从这里 下载到二进制包。二进制包是已经编译完成后可以直接运行的,源码包是需要编译后运行的。

        本文整合的rocketmq-v5-client-spring-boot 版本2.3.3 , 内部引用的是rocketmq-client-java , 版本5.0.7,使用的是gRPC 协议 , 使用前建议先把官方文档与示例看一下,使用的Java环境是openjdk-17

        建议使用jdk11以上版本

        ubuntu安装RocketMQ

        查看jdk版本

        java -version

        如果没安装jdk的话,在root账号下执行以下命令

        apt-get upgrade
        apt-get update
        apt install openjdk-17-jdk

        创建文件夹

        cd /usr/local
        mkdir rocketmq
        cd rocketmq

        下载RocketMQ二进制包(我这里使用的是5.3.2版本,使用其他版本可前往官方下载)

        wget https://dist.apache.org/repos/dist/release/rocketmq/5.3.2/rocketmq-all-5.3.2-bin-release.zip

        解压 没有unzip的解压命令 , ubuntu会提示,根据提示安装unzip插件

        unzip rocketmq-all-5.3.2-bin-release.zip

        进入bin目录
        调整runserver.sh的内存大小

        JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

        调整runbroker.sh内存大小

        JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m"

        修改/conf/broker.conf配置文件,这里主要是为了测试方便我们放开自动创建Topic的配置,加入以下配置(经过测试5.0以上版本不支持自动创建主题topic)

        # 开启自动创建 Topic  加不加都行
        autoCreateTopicEnable=true
        #内网ip  namesrvAddr:nameSrv地址 公网访问设置公网IP  内网访问设置内网IP   以下所有IP需一致
        namesrvAddr=192.168.3.86:9876   
        #brokerIP1:broker也需要一个ip  内网或公网
        brokerIP1=192.168.3.86

        配置 NameServer 的环境变量

        配置环境

         vim /etc/profile

        添加以下配置

        #MQ安装位置
        export ROCKETMQ_HOME=/usr/local/rocketmq/rocketmq-all-5.3.2
        #MQ公网或内网ip  公网访问设置公网IP  内网访问设置内网IP
        export NAMESRV_ADDR=192.168.3.86:9876

        重新编译文件生效

        source /etc/profile

        修改完后,我们就可以启动 RocketMQ 的 NameServer 了

        启动 namesrv

        nohup sh bin/mqnamesrv &

        验证

        # 验证 namesrv 是否启动成功
        tail -f -n 500 mqnamesrv.log
        ...
        The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876
        # 或者是
        tail -f ~/logs/rocketmqlogs/namesrv.log

        启动 Broker 消息存储中心和 Proxy 代理

        # 启动(不使用代理)
        nohup sh bin/mqbroker -n 192.168.3.86:9876 >mqbroker.log 2>&1 &
        # 启动 Broker+Proxy
        nohup sh bin/mqbroker -n 192.168.3.86:9876 --enable-proxy &
        # 推荐使用 指定配置文件启动(broker默认使用的端口是10911,我们也可以在配置文件修改端口)
        nohup sh bin/mqbroker -n 192.168.3.86:9876 -c conf/broker.conf --enable-proxy &
        # 验证是否启动成功
        tail -n 500 nohup.out
        tail -f ~/logs/rocketmqlogs/broker.log 
        tail -f ~/logs/rocketmqlogs/proxy.log 
        Wed May 14 12:41:41 CST 2025 rocketmq-proxy startup successfully

        使用tail -f ~/logs/rocketmqlogs/broker.log 查看日志如果提示以下

        The default acl dir /usr/local/rocketmq/rocketmq-all-5.3.2/conf/acl is not exist

        需要切换conf目录下 新建acl文件夹就行了

        mkdir acl

        由于v5可参考的文档太少,这个报错我也没找到为什么源码包里会少一个acl的文件夹,有知道的希望留言告知

        测试消息收发

        sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
        sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

        可以通过 mqadmin 命令创建

        Admin Tool官方命令工具

        注意 TestTopic 是topic名称

        sh bin/mqadmin updatetopic -n 192.168.3.86:9876 -t TestTopic -c DefaultCluster

        打印

        create topic to 192.168.3.86:10911 success.
        TopicConfig [topicName=TestTopic, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={}]

        安装RocketMQ Dashboard 可视化
        官方介绍
        按照官方安装运行就可以
        需要注意的点,IP需要跟上面设置的IP一致,防火墙开通8080,8081,10911,9876

        注意:默认端口为:8080,不修改会跟Proxy端口冲突,Proxy端口默认的也是8080
        我这里修改了RocketMQ Dashboard的默认端口,改成8082
        可以在本地运行,也可以打包运行,我是打包运行的,运行成功后访问:http://192.168.3.86:8082

        可以先添加主题,(研究了几天,没研究自动添加的方法 , 如果那位大佬研究出来了可以给我分享一下)

        点击提交就行了。研究了源码,这个添加跟更新是一个接口。提交之后就可以敲代码了(有研究出来能自动加载Topic的麻烦留言)

        如果启动报错,需添加 topic

        CODE: 17  DESC: No topic route info in name server for the topic: delay-topic

        整合rocketmq-client-java

        先用官方原生 rocketmq-client-java JDK

                <dependency>
                    <groupId>org.apache.rocketmq</groupId>
                    <artifactId>rocketmq-client-java</artifactId>
                    <version>5.0.7</version>
                </dependency>

        官方示例代码可以去看看

        发送普通消息

        package com.example.mq.producer;
        import org.apache.rocketmq.client.apis.ClientConfiguration;
        import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
        import org.apache.rocketmq.client.apis.ClientException;
        import org.apache.rocketmq.client.apis.ClientServiceProvider;
        import org.apache.rocketmq.client.apis.message.Message;
        import org.apache.rocketmq.client.apis.producer.Producer;
        import org.apache.rocketmq.client.apis.producer.SendReceipt;
        import org.slf4j.Logger;
        import org.slf4j.LoggerFactory;
        import java.time.Duration;
        public class ProducerExample {
            private static final Logger logger = LoggerFactory.getLogger(ProducerExample.class);
            public static void main(String[] args) throws ClientException {
                // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081。
                String endpoint = "192.168.3.86:8081";
                // 消息发送的目标Topic名称,需要提前创建。
                String topic = "TestTopic";
                ClientServiceProvider provider = ClientServiceProvider.loadService();
                ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
                ClientConfiguration configuration = builder.build();
                // 初始化Producer时需要设置通信配置以及预绑定的Topic。
                Producer producer = provider.newProducerBuilder()
                        .setTopics(topic)
                        .setClientConfiguration(configuration)
                        .build();
                // 普通消息发送。
                Message message = provider.newMessageBuilder()
                        .setTopic(topic)
                        // 设置消息索引键,可根据关键字精确查找某条消息。
                        .setKeys("messageKey")
                        // 设置消息Tag,用于消费端根据指定Tag过滤消息。
                        .setTag("messageTag")
                        // 消息体。
                        .setBody("你好,mq".getBytes())
                        .build();
                try {
                    // 发送消息,需要关注发送结果,并捕获失败等异常。
                    SendReceipt sendReceipt = producer.send(message);
                    logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
                } catch (ClientException e) {
                    logger.error("Failed to send message", e);
                }
                // producer.close();
            }
        }

        订阅

        package com.example.mq.consumer;
        import java.io.IOException;
        import java.time.Duration;
        import java.util.Collections;
        import org.apache.rocketmq.client.apis.ClientConfiguration;
        import org.apache.rocketmq.client.apis.ClientException;
        import org.apache.rocketmq.client.apis.ClientServiceProvider;
        import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
        import org.apache.rocketmq.client.apis.consumer.FilterExpression;
        import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
        import org.apache.rocketmq.client.apis.consumer.PushConsumer;
        import org.slf4j.Logger;
        import org.slf4j.LoggerFactory;
        public class PushConsumerExample {
            private static final Logger logger = LoggerFactory.getLogger(PushConsumerExample.class);
            private PushConsumerExample() {
            }
            public static void main(String[] args) throws ClientException, IOException, InterruptedException {
                final ClientServiceProvider provider = ClientServiceProvider.loadService();
                // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
                String endpoints = "192.168.3.86:8081";
                ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                        .setEndpoints(endpoints)
                        .build();
                // 订阅消息的过滤规则,表示订阅所有Tag的消息。
                String tag = "*";
                FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
                // 为消费者指定所属的消费者分组,Group需要提前创建。
                String consumerGroup = "YourConsumerGroup";
                // 指定需要订阅哪个目标Topic,Topic需要提前创建。
                String topic = "TestTopic";
                // 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
                PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                        .setClientConfiguration(clientConfiguration)
                        // 设置消费者分组。
                        .setConsumerGroup(consumerGroup)
                        // 设置预绑定的订阅关系。
                        .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                        // 设置消费监听器。
                        .setMessageListener(messageView -> {
                         // 处理消息并返回消费结果。
                            logger.info("Consume message successfully, messageId={}", messageView.getMessageId());
                            ByteBuffer body = messageView.getBody();
                            String message = StandardCharsets.UTF_8.decode(body).toString();
                            logger.info("Consume message successfully, body={}", message);
                            return ConsumeResult.SUCCESS;
                        })
                        .build();
                Thread.sleep(Long.MAX_VALUE);
                // 如果不需要再使用 PushConsumer,可关闭该实例。
                // pushConsumer.close();
            }
        }

        4.整合rocketmq-v5-client-spring-boot

        整合SpringBoot rocketmq-v5-client-spring-boot

        注意原生的rocketmq-client-java需要注释掉,rocketmq-v5-client-spring-boot已经引入了,不注释会jar包冲突

        引入maven

        <dependency>
           <groupId>org.apache.rocketmq</groupId>
           <artifactId>rocketmq-v5-client-spring-boot</artifactId>
           <version>2.3.3</version>
        </dependency>

        官方示例代码

        先创建变量

        public class RocketMQVariable {
            /**
             * 普通消息队列
             */
            public static final String NORMAL_TOPIC = "normal-topic";
            // 如果使用负载均衡模式 需要设置相同的消费组名
            public static final String NORMAL_GROUP = "normal-group";
            // 处理广播消费模式使用 
            public static final String NORMAL1_GROUP = "normal1-group";
            /**
             * 异步普通消息队列
             */
            public static final String ASYNC_NORMAL_TOPIC = "async-normal-topic";
            public static final String ASYNC_NORMAL_GROUP = "async-normal-group";
            /**
             * 顺序消息队列
             */
            public static final String FIFO_TOPIC = "fifo-topic";
            public static final String FIFO_GROUP = "fifo-group";
            /**
             * 定时/延时消息队列
             */
            public static final String DELAY_TOPIC = "delay-topic";
            public static final String DELAY_GROUP = "delay-group";
            /**
             * 事务消息队列
             */
            public static final String TRANSACTION_TOPIC = "transaction-topic";
            public static final String TRANSACTION_GROUP = "transaction-group";
        }

        注意:

        如果使用负载均衡模式 需设置相同的Topic 相同的group
        如果使用广播消费模式 需设置相同的Topic 不同的group

        编写工具类

        import com.alibaba.fastjson2.JSONObject;
        import jakarta.annotation.Resource;
        import org.apache.rocketmq.client.apis.ClientException;
        import org.apache.rocketmq.client.apis.producer.SendReceipt;
        import org.apache.rocketmq.client.apis.producer.Transaction;
        import org.apache.rocketmq.client.common.Pair;
        import org.apache.rocketmq.client.core.RocketMQClientTemplate;
        import org.slf4j.Logger;
        import org.slf4j.LoggerFactory;
        import org.springframework.stereotype.Component;
        import java.time.Duration;
        import java.util.concurrent.CompletableFuture;
        import java.util.concurrent.ExecutorService;
        import java.util.concurrent.Executors;
        @Component
        public class RocketMQV2Service {
            private static final Logger log = LoggerFactory.getLogger(RocketMQV2Service.class);
            @Resource
            private RocketMQClientTemplate template;
            /**
             * 发送普通消息
             *
             * @param topic
             * @param message
             */
            public void syncSendNormalMessage(String topic, Object message) {
                SendReceipt sendReceipt = template.syncSendNormalMessage(topic, message);
                log.info("普通消息发送完成:topic={},  message = {}, sendReceipt = {}", topic, message, sendReceipt);
            }
            /**
             * 发送异步普通消息
             *
             * @param topic
             * @param message
             */
            public void asyncSendNormalMessage(String topic, Object message) {
                CompletableFuture<SendReceipt> future = new CompletableFuture<>();
                ExecutorService sendCallbackExecutor = Executors.newCachedThreadPool();
                future.whenCompleteAsync((sendReceipt, throwable) -> {
                    if (null != throwable) {
                        log.error("发送消息失败", throwable);
                        return;
                    }
                    log.info("发送异步消息消费成功5, messageId={}", sendReceipt.getMessageId());
                }, sendCallbackExecutor);
                CompletableFuture<SendReceipt> completableFuture = template.asyncSendNormalMessage(topic, message, future);
                log.info("发送异步消息成功1, topic={},  message = {},  sendReceipt={}", topic, message, completableFuture);
            }
            /**
             * 发送顺序消息
             *
             * @param topic
             * @param message
             * @param messageGroup
             */
            public void syncSendFifoMessage(String topic, Object message, String messageGroup) {
                SendReceipt sendReceipt = template.syncSendFifoMessage(topic, message, messageGroup);
                log.info("顺序消息发送完成:topic={},  message = {}, sendReceipt = {}", topic, message, sendReceipt);
            }
            /**
             * 发送延时消息
             *
             * @param topic
             * @param message
             * @param delay   单位:秒
             */
            public void syncSendDelayMessage(String topic, Object message, Long delay) {
                SendReceipt sendReceipt = template.syncSendDelayMessage(topic, message, Duration.ofSeconds(delay));
                log.info("延时消息发送完成 :topic={},  message = {}, sendReceipt = {}", topic, message, sendReceipt);
            }
            /**
             * 发送延时消息
             *
             * @param topic
             * @param message
             * @param duration Duration.ofSeconds(秒)    Duration.ofMinutes(分钟)    Duration.ofHours(小时)
             */
            public void syncSendDelayMessage(String topic, Object message, Duration duration) {
                SendReceipt sendReceipt = template.syncSendDelayMessage(topic, message, duration);
                log.info("延时消息发送完成 :topic={},  message = {}, sendReceipt = {}", topic, message, sendReceipt);
            }
            /**
             * 发送事务消息
             *
             * @param topic
             * @param message
             * @throws ClientException
             */
            public Pair<SendReceipt, Transaction> sendMessageInTransaction(String topic, Object message) {
                try {
                    Pair<SendReceipt, Transaction> pair = template.sendMessageInTransaction(topic, message);
                    SendReceipt sendReceipt = pair.getSendReceipt();
                    Transaction transaction = pair.getTransaction();
                    log.info("事务消息发送完成   transaction = {} , sendReceipt = {}", JSONObject.toJSONString(transaction), JSONObject.toJSONString(sendReceipt));
                    log.info("消息id  : {}  ", sendReceipt.getMessageId());
                    //如果这里提交了事务 
                    if (doLocalTransaction(1)) {
                        log.info("本地事务执行成功");
                        transaction.commit();
                    } else {
                        log.info("本地事务执行失败");
                        transaction.rollback();
                    }
                    return pair;
                } catch (ClientException e) {
                    throw new RuntimeException(e);
                }
            }
            boolean doLocalTransaction(int number) {
                //  本地事务逻辑 数据库操作
                log.info("执行本地事务 : {}", number);
                return number > 5;
            }
        }

        测试代码

        yml配置

        rocketmq:
          producer:
            endpoints: 192.168.3.86:8081
            topic:
          push-consumer:
            endpoints: 192.168.3.86:8081
            access-key:
            secret-key:
            topic:
            tag: "*"

        发送消息

        import com.alibaba.fastjson2.JSONObject;
        import com.example.mq.util.RocketMQV2Service;
        import com.example.mq.variable.RocketMQVariable;
        import lombok.extern.slf4j.Slf4j;
        import org.apache.rocketmq.client.apis.ClientException;
        import org.apache.rocketmq.client.apis.message.MessageId;
        import org.apache.rocketmq.client.apis.producer.SendReceipt;
        import org.apache.rocketmq.client.apis.producer.Transaction;
        import org.apache.rocketmq.client.common.Pair;
        import org.springframework.beans.factory.annotation.Autowired;
        import org.springframework.web.bind.annotation.GetMapping;
        import org.springframework.web.bind.annotation.RequestMapping;
        import org.springframework.web.bind.annotation.RestController;
        import java.time.Duration;
        @Slf4j
        @RestController
        @RequestMapping("/send")
        public class SendController {
            @Autowired
            private RocketMQV2Service rocketMQV2Service;
            /**
             * 普通消息
             *
             * @return
             */
            @GetMapping("/normal.message")
            public String normalMessage() {
                rocketMQV2Service.syncSendNormalMessage(RocketMQVariable.NORMAL_TOPIC, "hello RocketMQ  这是普通消息");
                return "发送成功";
            }
            /**
             * 异步普通消息
             *
             * @return
             */
            @GetMapping("/async.normal.message")
            public String asyncSendNormalMessageNormalMessage() {
                rocketMQV2Service.asyncSendNormalMessage(RocketMQVariable.ASYNC_NORMAL_TOPIC, "hello RocketMQ  这是异步普通消息");
                return "发送成功";
            }
            /**
             * 顺序消息
             *
             * @return
             */
            @GetMapping("/flfo.message")
            public String flfoMessage() {
                for (int i = 0; i < 20; i++) {
                    rocketMQV2Service.syncSendFifoMessage(RocketMQVariable.FIFO_TOPIC, "hello RocketMQ  这是顺序消息" + i, RocketMQVariable.FIFO_GROUP);
                }
                return "发送成功";
            }
            /**
             * 定时/延时消息
             *
             * @return
             */
            @GetMapping("/delay.message")
            public String delayMessage() {
                rocketMQV2Service.syncSendDelayMessage(RocketMQVariable.DELAY_TOPIC, "hello RocketMQ  这是30秒定时消息", Duration.ofSeconds(30));
                rocketMQV2Service.syncSendDelayMessage(RocketMQVariable.DELAY_TOPIC, "hello RocketMQ  这是10秒定时消息  ", 10l);
                rocketMQV2Service.syncSendDelayMessage(RocketMQVariable.DELAY_TOPIC, "hello RocketMQ  这是1分钟定时消息", Duration.ofMinutes(1));
                return "发送成功";
            }
            /**
             * 事务消息
             *
             * @return
             */
            @GetMapping("/transaction.message")
            public String transactionMessage() throws ClientException {
                Pair<SendReceipt, Transaction> sendReceiptTransactionPair = rocketMQV2Service.sendMessageInTransaction(RocketMQVariable.TRANSACTION_TOPIC, "hello RocketMQ  这是事务消息");
                Transaction transaction = sendReceiptTransactionPair.getTransaction();
                SendReceipt sendReceipt = sendReceiptTransactionPair.getSendReceipt();
                MessageId messageId = sendReceipt.getMessageId();
                log.info("事务消息发送完成   messageId = {}", messageId);
                log.info("事务消息发送完成   transaction = {} , sendReceipt = {}", JSONObject.toJSONString(transaction), JSONObject.toJSONString(sendReceipt));
                    return "发送成功";
            }
        }

        普通消息(广播模式)

        import com.alibaba.fastjson2.JSONObject;
        import com.example.mq.variable.RocketMQVariable;
        import lombok.extern.slf4j.Slf4j;
        import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
        import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
        import org.apache.rocketmq.client.apis.message.MessageView;
        import org.apache.rocketmq.client.core.RocketMQListener;
        import org.springframework.stereotype.Service;
        import java.nio.ByteBuffer;
        import java.nio.charset.StandardCharsets;
        import java.util.Map;
        import java.util.Objects;
        @Slf4j
        @Service
        @RocketMQMessageListener(topic = RocketMQVariable.NORMAL_TOPIC,
                consumerGroup = RocketMQVariable.NORMAL_GROUP)
        public class PushConsumerNormalService implements RocketMQListener {
            @Override
            public ConsumeResult consume(MessageView messageView) {
                log.info("普通消息, messageView={}", messageView);
                ByteBuffer body = messageView.getBody();
                String message = StandardCharsets.UTF_8.decode(body).toString();
                log.info("普通消息, message={}", message);
                Map<String, String> properties = messageView.getProperties();
                log.info("普通消息, properties={}", JSONObject.toJSONString(properties));
                if (Objects.nonNull(messageView.getProperties().get("OrderId"))) {
                    log.info("普通消息, message={}", messageView);
                    return ConsumeResult.FAILURE;
                }
                return ConsumeResult.SUCCESS;
            }
        }
        import com.alibaba.fastjson2.JSONObject;
        import com.example.mq.variable.RocketMQVariable;
        import lombok.extern.slf4j.Slf4j;
        import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
        import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
        import org.apache.rocketmq.client.apis.message.MessageView;
        import org.apache.rocketmq.client.core.RocketMQListener;
        import org.springframework.stereotype.Service;
        import java.nio.ByteBuffer;
        import java.nio.charset.StandardCharsets;
        import java.util.Map;
        import java.util.Objects;
        @Slf4j
        @Service
        @RocketMQMessageListener(topic = RocketMQVariable.NORMAL_TOPIC,
                consumerGroup = RocketMQVariable.NORMAL1_GROUP)
        public class PushConsumerNormal1Service implements RocketMQListener {
            @Override
            public ConsumeResult consume(MessageView messageView) {
                log.info("普通消息1, messageView={}", messageView);
                ByteBuffer body = messageView.getBody();
                String message = StandardCharsets.UTF_8.decode(body).toString();
                log.info("普通消息1, message={}", message);
                Map<String, String> properties = messageView.getProperties();
                log.info("普通消息1, properties={}", JSONObject.toJSONString(properties));
                if (Objects.nonNull(messageView.getProperties().get("OrderId"))) {
                    log.info("普通消息1, message={}", messageView);
                    return ConsumeResult.FAILURE;
                }
                return ConsumeResult.SUCCESS;
            }
        }

        日志 (注意consumerGroup 不同)

        负载均衡模式 把consumerGroup改为 RocketMQVariable.NORMAL_GROUP

        import com.alibaba.fastjson2.JSONObject;
        import com.example.mq.variable.RocketMQVariable;
        import lombok.extern.slf4j.Slf4j;
        import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
        import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
        import org.apache.rocketmq.client.apis.message.MessageView;
        import org.apache.rocketmq.client.core.RocketMQListener;
        import org.springframework.stereotype.Service;
        import java.nio.ByteBuffer;
        import java.nio.charset.StandardCharsets;
        import java.util.Map;
        import java.util.Objects;
        @Slf4j
        @Service
        @RocketMQMessageListener(topic = RocketMQVariable.NORMAL_TOPIC,
                consumerGroup = RocketMQVariable.NORMAL_GROUP)
        public class PushConsumerNormal1Service implements RocketMQListener {
            @Override
            public ConsumeResult consume(MessageView messageView) {
                log.info("普通消息1, messageView={}", messageView);
                ByteBuffer body = messageView.getBody();
                String message = StandardCharsets.UTF_8.decode(body).toString();
                log.info("普通消息1, message={}", message);
                Map<String, String> properties = messageView.getProperties();
                log.info("普通消息1, properties={}", JSONObject.toJSONString(properties));
                if (Objects.nonNull(messageView.getProperties().get("OrderId"))) {
                    log.info("普通消息1, message={}", messageView);
                    return ConsumeResult.FAILURE;
                }
                return ConsumeResult.SUCCESS;
            }
        }

        日志 (会自动选择一个消费者消费)

        顺序消费

        import com.alibaba.fastjson2.JSONObject;
        import com.example.mq.variable.RocketMQVariable;
        import lombok.extern.slf4j.Slf4j;
        import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
        import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
        import org.apache.rocketmq.client.apis.message.MessageView;
        import org.apache.rocketmq.client.core.RocketMQListener;
        import org.springframework.stereotype.Service;
        import java.nio.ByteBuffer;
        import java.nio.charset.StandardCharsets;
        import java.util.Map;
        import java.util.Objects;
        @Slf4j
        @Service
        @RocketMQMessageListener(topic = RocketMQVariable.FIFO_TOPIC,
                consumerGroup = RocketMQVariable.FIFO_GROUP)
        public class PushConsumerFifoService implements RocketMQListener {
            @Override
            public ConsumeResult consume(MessageView messageView) {
                log.info("顺序消息, messageView={}", messageView);
                ByteBuffer body = messageView.getBody();
                String message = StandardCharsets.UTF_8.decode(body).toString();
                log.info("顺序消息, message={}", message);
                Map<String, String> properties = messageView.getProperties();
                log.info("顺序消息, properties={}", JSONObject.toJSONString(properties));
                if (Objects.nonNull(messageView.getProperties().get("OrderId"))) {
                    log.info("顺序消息, message={}", messageView);
                    return ConsumeResult.FAILURE;
                }
                log.info("rollback transaction");
                return ConsumeResult.SUCCESS;
            }
        }

        日志

        定时/延时任务消息 (可自定义时间)

        import com.alibaba.fastjson2.JSONObject;
        import com.example.mq.variable.RocketMQVariable;
        import lombok.extern.slf4j.Slf4j;
        import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
        import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
        import org.apache.rocketmq.client.apis.message.MessageView;
        import org.apache.rocketmq.client.core.RocketMQListener;
        import org.springframework.stereotype.Service;
        import java.nio.ByteBuffer;
        import java.nio.charset.StandardCharsets;
        import java.util.Map;
        import java.util.Objects;
        @Slf4j
        @Service
        @RocketMQMessageListener(topic = RocketMQVariable.DELAY_TOPIC,
                consumerGroup = RocketMQVariable.DELAY_GROUP)
        public class PushConsumerDelayService implements RocketMQListener {
            @Override
            public ConsumeResult consume(MessageView messageView) {
                log.info("定时/延时消息, messageView={}", messageView);
                ByteBuffer body = messageView.getBody();
                String message = StandardCharsets.UTF_8.decode(body).toString();
                log.info("定时/延时消息, message={}", message);
                Map<String, String> properties = messageView.getProperties();
                log.info("定时/延时消息, properties={}", JSONObject.toJSONString(properties));
                if (Objects.nonNull(messageView.getProperties().get("OrderId"))) {
                    log.info("定时/延时消息, message={}", messageView);
                    return ConsumeResult.FAILURE;
                }
                log.info("定时/延时消息 消费完成");
                return ConsumeResult.SUCCESS;
            }
        }

        日志

        事务处理情况1

        /**
             * 发送事务消息
             *
             * @param topic
             * @param message
             * @throws ClientException
             */
            public Pair<SendReceipt, Transaction> sendMessageInTransaction(String topic, Object message) {
                try {
                    Pair<SendReceipt, Transaction> pair = template.sendMessageInTransaction(topic, message);
                    SendReceipt sendReceipt = pair.getSendReceipt();
                    Transaction transaction = pair.getTransaction();
                    log.info("事务消息发送完成   transaction = {} , sendReceipt = {}", JSONObject.toJSONString(transaction), JSONObject.toJSONString(sendReceipt));
                    log.info("消息id  : {}  ", sendReceipt.getMessageId());
                    //如果这里提交了事务 
                    if (doLocalTransaction(1)) {
                        log.info("本地事务执行成功");
                        transaction.commit();
                    } else {
                        log.info("本地事务执行失败");
                        transaction.rollback();
                    }
                    return pair;
                } catch (ClientException e) {
                    throw new RuntimeException(e);
                }
            }
            boolean doLocalTransaction(int number) {
                //  本地事务逻辑 数据库操作
                log.info("执行本地事务 : {}", number);
                return number > 5;
            }

        如果在工具类里面提交了事务 transaction.commit();下面的就不会进入处理了

        @Slf4j
        @RocketMQTransactionListener
        public class PushConsumerTransactionTemplate implements RocketMQTransactionChecker {
            @Override
            public TransactionResolution check(MessageView messageView) {
                log.info("Receive transactional message check, message={}", messageView);
                return null;
            }
        }

        而是直接消费了

        import com.example.mq.variable.RocketMQVariable;
        import lombok.extern.slf4j.Slf4j;
        import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
        import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
        import org.apache.rocketmq.client.apis.message.MessageView;
        import org.apache.rocketmq.client.core.RocketMQListener;
        import org.springframework.stereotype.Service;
        import java.nio.ByteBuffer;
        import java.nio.charset.StandardCharsets;
        import java.util.Objects;
        @Slf4j
        @Service
        @RocketMQMessageListener(topic = RocketMQVariable.TRANSACTION_TOPIC, consumerGroup = RocketMQVariable.TRANSACTION_GROUP)
        public class PushConsumerTransactionService implements RocketMQListener {
            @Override
            public ConsumeResult consume(MessageView messageView) {
                log.info("事务消息消费, messageView={}", messageView);
                ByteBuffer body = messageView.getBody();
                String message = StandardCharsets.UTF_8.decode(body).toString();
                log.info("事务消息消费, message={}", message);
                if (Objects.isNull(message)) {
                    log.info("事务消息 消费失败");
                    return ConsumeResult.FAILURE;
                }
                log.info("事务消息 消费成功");
                return ConsumeResult.SUCCESS;
            }
        }

        日志

        事务处理情况2

        /**
             * 发送事务消息 这里只发消息  不参与事务提交
             *
             * @param topic
             * @param message
             * @throws ClientException
             */
            public void sendMessageInTransaction(String topic, Object message) {
                try {
                  template.sendMessageInTransaction(topic, message);
                } catch (ClientException e) {
                    throw new RuntimeException(e);
                }
            }

        使用官方事务处理机制处理事务

        import lombok.extern.slf4j.Slf4j;
        import org.apache.rocketmq.client.annotation.RocketMQTransactionListener;
        import org.apache.rocketmq.client.apis.message.MessageView;
        import org.apache.rocketmq.client.apis.producer.TransactionResolution;
        import org.apache.rocketmq.client.core.RocketMQTransactionChecker;
        import java.nio.ByteBuffer;
        import java.nio.charset.StandardCharsets;
        import java.util.Objects;
        @Slf4j
        @RocketMQTransactionListener
        public class PushConsumerTransactionTemplate implements RocketMQTransactionChecker {
            @Override
            public TransactionResolution check(MessageView messageView) {
                log.info("事务消息  事务操作, messageView={}", messageView);
                ByteBuffer body = messageView.getBody();
                String message = StandardCharsets.UTF_8.decode(body).toString();
                log.info("事务消息  事务操作, message={}", message);
                String messageId = messageView.getMessageId().toString();
                if (Objects.nonNull(messageId)) {
                    log.info("事务消息  事务操作, messageId={}", messageId);
                    return TransactionResolution.COMMIT;
                }
                log.info("事务消息消费失败");
                return TransactionResolution.ROLLBACK;
            }
        }

        事务提交后才会被消费

        import com.example.mq.variable.RocketMQVariable;
        import lombok.extern.slf4j.Slf4j;
        import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
        import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
        import org.apache.rocketmq.client.apis.message.MessageView;
        import org.apache.rocketmq.client.core.RocketMQListener;
        import org.springframework.stereotype.Service;
        import java.nio.ByteBuffer;
        import java.nio.charset.StandardCharsets;
        import java.util.Objects;
        @Slf4j
        @Service
        @RocketMQMessageListener(topic = RocketMQVariable.TRANSACTION_TOPIC, consumerGroup = RocketMQVariable.TRANSACTION_GROUP)
        public class PushConsumerTransactionService implements RocketMQListener {
            @Override
            public ConsumeResult consume(MessageView messageView) {
                log.info("事务消息消费, messageView={}", messageView);
                ByteBuffer body = messageView.getBody();
                String message = StandardCharsets.UTF_8.decode(body).toString();
                log.info("事务消息消费, message={}", message);
                if (Objects.isNull(message)) {
                    log.info("事务消息 消费失败");
                    return ConsumeResult.FAILURE;
                }
                log.info("事务消息 消费成功");
                return ConsumeResult.SUCCESS;
            }
        }

        日志