SpringBoot集成RocketMQ

项目地址 (opens new window)

# 引入jar包

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.3</version>
</dependency>
1
2
3
4
5

# 配置文件

生产者消费者写在了一个项目中

rocketmq:
  name-server: 192.168.220.128:9876
  producer:
    group: producer-group
    access-key: rocketmq2
    secret-key: 12345678
    retry-times-when-send-failed: 2
  consumer:
    access-key: rocketmq2
    secret-key: 12345678

spring:
  application:
    name: rocketmq-demo
server:
  port: 8080
logging:
  level:
    root: info
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 发送消息

# 同步发送

这种方式在消息到达broker,接收到broker的ack后返回结果。存在同步等待发送结果的时间代价。

SendResult sendResult = rocketMQTemplate.syncSend(topic+":"+tags, content);
1

这种方式内部自带重试机制,即在主动声明本次消息发送失败之前,内部实现将重试一定次数,默认为2次(DefaultMQProducer#getRetryTimesWhenSendFailed)。 最后一次发送失败的话,不会报错,而是作为结果返回。

# 异步发送

消息发送后立即返回,当消息完成发送后会调用回调函数sendCallback来告知发送者本次发送成功还是失败。异步模式通常用于响应时间敏感的业务场景。

public void async() {
    rocketMQTemplate.asyncSend("topic:tags", "send async message!", new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            log.info("send successful");
        }

        @Override
        public void onException(Throwable throwable) {
            log.info("send fail; {}", throwable.getMessage());
        }
    });
}
1
2
3
4
5
6
7
8
9
10
11
12
13

与同步发送一样,异步发送内部也存在重试机制。

# 直接发送

发送端发送消息后直接返回,不会等来自broker的ack回告。这种方式吞吐量大,但是有丢消息风险,适用于不重要的消息发送,例如日志收集。

public void oneWay() {
    rocketMQTemplate.sendOneWay("topic:tags", "send one-way message");
}
1
2
3

# 消费消息

通过注解@RocketMQMessageListener声明消费者Bean

@Component
@RocketMQMessageListener(consumerGroup = "group1", topic = "springboot-rocketmq" ,selectorExpression = "test")
class ReceiverOne implements RocketMQListener<String>  {
  @Override
  public void onMessage(String msg) {
    System.out.println("group1-1 消费消息:"+msg);
  }
}
1
2
3
4
5
6
7
8

@RocketMQMessageListerer

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RocketMQMessageListener {

    String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
    String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
    String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
    String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
    String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
    // 分组
    String consumerGroup();
    // topic
    String topic();
    // 选择类型 默认tag,另一种是SQL92表达式
    SelectorType selectorType() default SelectorType.TAG;
    // 全部匹配
    String selectorExpression() default "*";
    // 消费模式 默认均摊,可指定顺序消费
    ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
    // 消息消费类型 默认集群,可指定广播
    MessageModel messageModel() default MessageModel.CLUSTERING;
    // 最大并发线程数
    int consumeThreadMax() default 64;
    // 超时时间
    long consumeTimeout() default 30000L;

    String accessKey() default ACCESS_KEY_PLACEHOLDER;

    String secretKey() default SECRET_KEY_PLACEHOLDER;
    // 追踪trace开关
    boolean enableMsgTrace() default true;
    // 存储消息轨迹数据的topic
    String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;

    String nameServer() default NAME_SERVER_PLACEHOLDER;

    String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

# 异常重试

RocketMQ可在broker.conf文件中配置Consumer端的重试次数和重试时间间隔,如下:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
1

默认重试18次,如不需要重试太多次,可以在代码中指定重试次数。

@Component
@RocketMQMessageListener(consumerGroup = "group2", topic = "springboot-rocketmq",selectorExpression = "test")
class ReceiverTwo implements RocketMQListener<MessageExt> ,RocketMQPushConsumerLifecycleListener{
  @Override
  public void onMessage(MessageExt msg) {
    System.out.println("group2 消费消息:"+ JSON.toJSONString(msg));
    int reconsumeTimes = msg.getReconsumeTimes();
    throw new RuntimeException("第"+reconsumeTimes+"次重试失败");
  }
  @Override
  public void prepareStart(DefaultMQPushConsumer consumer) {
    //设置消费次数
    consumer.setMaxReconsumeTimes(3);
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
上次更新: 2023/04/09, 16:34:32
最近更新
01
docker-compose笔记
01-12
02
MySQL数据迁移
11-27
03
Docker部署服务,避免PID=1
11-27
更多文章>