0%

Spring Boot整合ActiveMQ,分别实现P2P和Pub/Sub两种模式

本文介绍如何在Spring Boot中分别通过P2P模式和Pub/Sub模式整合ActiveMQ。
版本信息:
Spring Boot: v2.2.2.RELEASE
ActiveMQ: 5.15.9

安装ActiveMQ

这里是通过docker安装ActiveMQ的: Docker安装ActiveMQ
如果需要其它方式安装,还请自行搜索:。

P2P模式

效果图

实现效果

代码目录结构

目录结构

引入依赖

修改pom.xml:

1
2
3
4
5
6
7
8
<dependencies>
<!-- ActiveMQ start -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!-- ActiveMQ end -->
</dependencies>

配置参数

修改application.yml

1
2
3
spring:
activemq:
broker-url: tcp://172.16.22.30:61616

创建常量文件

创建个常量文件定义队列名字:

1
2
3
public class Constants {
public static final String QUEUE_NAME = "segon-queue";
}

创建配置文件

1
2
3
4
5
6
7
8
9
10
11
12
@Configuration
@EnableJms
public class ActiveMQConfig {

/**
* 点对点
*/
@Bean
public Queue queue() {
return new ActiveMQQueue(Constants.QUEUE_NAME);
}
}

创建消费服务

1
2
3
4
5
6
7
8
9
@Service
@Slf4j
public class QueueConsumerService {

@JmsListener(destination = Constants.QUEUE_NAME)
public void receiveMsg(String msg) {
log.info("receive: {}", msg);
}
}

创建定时任务模拟发消息

在Application上添加@EnableScheduling注解启用定时任务

1
2
3
4
5
6
7
8
9
@SpringBootApplication
@EnableScheduling
public class ActiveMQApplication {

public static void main(String[] args) {
SpringApplication.run(ActiveMQApplication.class, args);
}

}

创建定时任务用于发消息到Queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Service
public class ProducerService {

/**
* 也可以注入JmsTemplate,JmsMessagingTemplate对JmsTemplate进行了封装
*/
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;

private int i = 0;

@Scheduled(fixedRate = 1000)
public void sendMessage() {
String msg = "queue msg " + i++;
jmsMessagingTemplate.convertAndSend(queue, msg);
}
}

启动程序进行验证

程序启动后,我们可以看到console输出如下log:

1
2
3
4
5
6
2019-12-14 16:22:35.520  INFO 23256 --- [enerContainer-1] c.s.a.service.QueueConsumerService       : receive: queue msg 0
2019-12-14 16:22:36.411 INFO 23256 --- [enerContainer-1] c.s.a.service.QueueConsumerService : receive: queue msg 1
2019-12-14 16:22:37.411 INFO 23256 --- [enerContainer-1] c.s.a.service.QueueConsumerService : receive: queue msg 2
2019-12-14 16:22:38.412 INFO 23256 --- [enerContainer-1] c.s.a.service.QueueConsumerService : receive: queue msg 3
2019-12-14 16:22:39.412 INFO 23256 --- [enerContainer-1] c.s.a.service.QueueConsumerService : receive: queue msg 4
2019-12-14 16:22:40.413 INFO 23256 --- [enerContainer-1] c.s.a.service.QueueConsumerService : receive: queue msg 5

Pub/Sub模式

效果图

实现效果

代码目录结构

目录结构

引入依赖

修改pom.xml:

1
2
3
4
5
6
7
8
<dependencies>
<!-- ActiveMQ start -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!-- ActiveMQ end -->
</dependencies>

配置参数

1
2
3
4
5
6
7
spring:
activemq:
broker-url: tcp://172.16.22.30:61616

# 默认情况下activemq提供的是P2P模式,若要使用Pub/Sub模式需要添加如下配置
jms:
pub-sub-domain: true

创建常量文件

创建个常量文件定义队列名字:

1
2
3
public class Constants {
public static final String TOPIC_NAME = "segon-topic";
}

创建配置文件

1
2
3
4
5
6
7
8
9
10
11
12
@Configuration
@EnableJms
public class ActiveMQConfig {

/**
* 发布/订阅
*/
@Bean
public Topic topic() {
return new ActiveMQTopic(Constants.TOPIC_NAME);
}
}

创建消费服务

1
2
3
4
5
6
7
8
9
10
@Service
@Slf4j
public class TopicConsumerService {

@JmsListener(destination = Constants.TOPIC_NAME)
public void receiveMsg(String msg) {
log.info("receive: {}", msg);
}
}

创建定时任务模拟发消息

在Application上添加@EnableScheduling注解启用定时任务

1
2
3
4
5
6
7
8
9
@SpringBootApplication
@EnableScheduling
public class ActiveMQApplication {

public static void main(String[] args) {
SpringApplication.run(ActiveMQApplication.class, args);
}

}

创建定时任务用于发消息到Topic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Service
public class ProducerService {

@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Topic topic;

private int i = 0;

@Scheduled(fixedRate = 1000)
public void sendMessage() {
String msg = "topic msg " + i++;
jmsMessagingTemplate.convertAndSend(topic, msg);
}
}

启动程序进行验证

程序启动后,我们可以看到console输出如下log:

1
2
3
4
5
2019-12-14 16:59:26.761  INFO 37824 --- [enerContainer-1] c.s.a.service.TopicConsumerService       : receive: topic msg 0
2019-12-14 16:59:27.727 INFO 37824 --- [enerContainer-1] c.s.a.service.TopicConsumerService : receive: topic msg 1
2019-12-14 16:59:28.726 INFO 37824 --- [enerContainer-1] c.s.a.service.TopicConsumerService : receive: topic msg 2
2019-12-14 16:59:29.724 INFO 37824 --- [enerContainer-1] c.s.a.service.TopicConsumerService : receive: topic msg 3
2019-12-14 16:59:30.723 INFO 37824 --- [enerContainer-1] c.s.a.service.TopicConsumerService : receive: topic msg 4