springboot activemq 同时支持topic和queue模式 | 张扎瓦的博客

springboot activemq 同时支持topic和queue模式

springboot使用activemq时,同时支持topic和queue两种模式


pom.xml配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- fast json -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.60</version>
</dependency>

<!-- activemq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!-- activemq连接池 -->
<dependency>
<groupId>org.messaginghub</groupId>
<artifactId>pooled-jms</artifactId>
</dependency>

</dependencies>

application.properties配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#是否启用内存模式(也就是不安装MQ,也可以使用MQ功能),默认为true
spring.activemq.in-memory=false
#ActiveMQ连接池是否启用
spring.activemq.pool.enabled=true
#ActiveMQ连接池最大连接数
spring.activemq.pool.max-connections=5
#ActiveMQ连接池连接空闲时间,默认为30秒
spring.activemq.pool.idle-timeout=30000
#服务地址
spring.activemq.broker-url=tcp://localhost:61616
#用户名(不写默认为admin)
spring.activemq.user=
#密码(不写默认为admin)
spring.activemq.password=

新建配置类ActivemqConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;

import javax.jms.ConnectionFactory;

/**
* activemq配置
*
* @author zhangxu
* @date 2019/12/20 10:43
*/
@Configuration
public class ActivemqConfig {

/**
* 设置存入mq的数据格式为json
*
* @return
*/
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}

/**
* 将springboot里面的消息加到jms监听工厂
* 解决接受消息之后消费不了问题
*
* @param connectionFactory
* @param configurer
* @return
*/
@Bean
public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}

/**
* 支持topic模式
*
* @param connectionFactory
* @return
*/
@Bean
public JmsListenerContainerFactory<?> topicModel(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setPubSubDomain(true);
bean.setConnectionFactory(connectionFactory);
return bean;
}

/**
* 支持queue模式
*
* @param connectionFactory
* @return
*/
@Bean
public JmsListenerContainerFactory<?> queueModel(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setConnectionFactory(connectionFactory);
return bean;
}

}

新建测试类 TestMq

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import lombok.extern.slf4j.Slf4j;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

/**
* 测试接收mq消息
*
* @author zhangxu
* @date 2019/12/26 10:39
*/
@Slf4j
@Component
public class TestMq {

/**
* 监听queue
*
* @param text
*/
@JmsListener(destination = "a.queue", containerFactory = "queueModel")
public void queue(String text) {
log.debug("获取到了queue消息:{}", text);
}

/**
* 监听topic
*
* @param text
*/
@JmsListener(destination = "a.topic", containerFactory = "topicModel")
public void topic(String text) {
log.debug("获取到了topic消息:{}", text);
}

}

开始测试

使用浏览器打开activemq管理页 http://localhost:8161

发送一条queue消息

此时控制台打印

再次发送一条topic消息

此时控制台输出

如果我的文章对您有所帮助,不妨打赏一杯豆浆以资鼓励(○` 3′○)