SpringBoot集成RocketMQ
# 引入jar包
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
1
2
3
4
5
2
3
4
5
# 配置文件
生产者消费者写在了一个项目中
rocketmq:
name-server: 192.168.220.128:9876
producer:
group: producer-group
access-key: rocketmq2
secret-key: 12345678
retry-times-when-send-failed: 2
consumer:
access-key: rocketmq2
secret-key: 12345678
spring:
application:
name: rocketmq-demo
server:
port: 8080
logging:
level:
root: info
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 发送消息
# 同步发送
这种方式在消息到达broker,接收到broker的ack后返回结果。存在同步等待发送结果的时间代价。
SendResult sendResult = rocketMQTemplate.syncSend(topic+":"+tags, content);
1
这种方式内部自带重试机制,即在主动声明本次消息发送失败之前,内部实现将重试一定次数,默认为2次(DefaultMQProducer#getRetryTimesWhenSendFailed)。 最后一次发送失败的话,不会报错,而是作为结果返回。
# 异步发送
消息发送后立即返回,当消息完成发送后会调用回调函数sendCallback来告知发送者本次发送成功还是失败。异步模式通常用于响应时间敏感的业务场景。
public void async() {
rocketMQTemplate.asyncSend("topic:tags", "send async message!", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("send successful");
}
@Override
public void onException(Throwable throwable) {
log.info("send fail; {}", throwable.getMessage());
}
});
}
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
与同步发送一样,异步发送内部也存在重试机制。
# 直接发送
发送端发送消息后直接返回,不会等来自broker的ack回告。这种方式吞吐量大,但是有丢消息风险,适用于不重要的消息发送,例如日志收集。
public void oneWay() {
rocketMQTemplate.sendOneWay("topic:tags", "send one-way message");
}
1
2
3
2
3
# 消费消息
通过注解@RocketMQMessageListener
声明消费者Bean
@Component
@RocketMQMessageListener(consumerGroup = "group1", topic = "springboot-rocketmq" ,selectorExpression = "test")
class ReceiverOne implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
System.out.println("group1-1 消费消息:"+msg);
}
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
@RocketMQMessageListerer
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RocketMQMessageListener {
String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
// 分组
String consumerGroup();
// topic
String topic();
// 选择类型 默认tag,另一种是SQL92表达式
SelectorType selectorType() default SelectorType.TAG;
// 全部匹配
String selectorExpression() default "*";
// 消费模式 默认均摊,可指定顺序消费
ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
// 消息消费类型 默认集群,可指定广播
MessageModel messageModel() default MessageModel.CLUSTERING;
// 最大并发线程数
int consumeThreadMax() default 64;
// 超时时间
long consumeTimeout() default 30000L;
String accessKey() default ACCESS_KEY_PLACEHOLDER;
String secretKey() default SECRET_KEY_PLACEHOLDER;
// 追踪trace开关
boolean enableMsgTrace() default true;
// 存储消息轨迹数据的topic
String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;
String nameServer() default NAME_SERVER_PLACEHOLDER;
String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 异常重试
RocketMQ可在broker.conf文件中配置Consumer端的重试次数和重试时间间隔,如下:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
1
默认重试18次,如不需要重试太多次,可以在代码中指定重试次数。
@Component
@RocketMQMessageListener(consumerGroup = "group2", topic = "springboot-rocketmq",selectorExpression = "test")
class ReceiverTwo implements RocketMQListener<MessageExt> ,RocketMQPushConsumerLifecycleListener{
@Override
public void onMessage(MessageExt msg) {
System.out.println("group2 消费消息:"+ JSON.toJSONString(msg));
int reconsumeTimes = msg.getReconsumeTimes();
throw new RuntimeException("第"+reconsumeTimes+"次重试失败");
}
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
//设置消费次数
consumer.setMaxReconsumeTimes(3);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
编辑 (opens new window)
上次更新: 2023/04/09, 16:34:32