SpringBoot application series articles
- Configuration center for SpringBoot applications
- Distributed session for SpringBoot application
- Distributed index for SpringBoot applications
- Distributed caching for SpringBoot applications
- Message queue for SpringBoot application
- ELK for SpringBoot application
Order
This article mainly talks about how to use rabbitmq to send and receive messages in Spring Boot.
Prepare rabbitmq cluster
Specific viewDocker Builds rabbitmq ClusterThis article.
New project
Configuration item
#http://segmentfault.com/a/1190000004309900
spring.rabbitmq.host=192.168.99.100
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
logging.level.org.springframework.amqp=ERROR
logging.level.com.demo=INFO
#spring.rabbitmq.dynamic 是否创建AmqpAdmin bean. 默认为: true
#spring.rabbitmq.listener.acknowledge-mode 指定Acknowledge的模式.
#spring.rabbitmq.listener.auto-startup 是否在启动时就启动mq,默认: true
#spring.rabbitmq.listener.concurrency 指定最小的消费者数量.
#spring.rabbitmq.listener.max-concurrency 指定最大的消费者数量.
#spring.rabbitmq.listener.prefetch 指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量.
#spring.rabbitmq.listener.transaction-size 指定一个事务处理的消息数量,最好是小于等于prefetch的数量.
#spring.rabbitmq.requested-heartbeat 指定心跳超时,0为不指定.
#spring.rabbitmq.ssl.enabled 是否开始SSL,默认: false
#spring.rabbitmq.ssl.key-store 指定持有SSL certificate的key store的路径
#spring.rabbitmq.ssl.key-store-password 指定访问key store的密码.
#spring.rabbitmq.ssl.trust-store 指定持有SSL certificates的Trust store.
#spring.rabbitmq.ssl.trust-store-password 指定访问trust store的密码.
Producer allocation
@Configuration
public class ProducerConfig {
@Bean
RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
Queue queueFoo(RabbitAdmin rabbitAdmin) {
Queue queue = new Queue("queue.foo", true);
rabbitAdmin.declareQueue(queue);
return queue;
}
@Bean
Queue queueBar(RabbitAdmin rabbitAdmin) {
Queue queue = new Queue("queue.bar", true);
rabbitAdmin.declareQueue(queue);
return queue;
}
@Bean
TopicExchange exchange(RabbitAdmin rabbitAdmin) {
TopicExchange topicExchange = new TopicExchange("exchange");
rabbitAdmin.declareExchange(topicExchange);
return topicExchange;
}
@Bean
Binding bindingExchangeFoo(Queue queueFoo, TopicExchange exchange,RabbitAdmin rabbitAdmin) {
Binding binding = BindingBuilder.bind(queueFoo).to(exchange).with("queue.foo");
rabbitAdmin.declareBinding(binding);
return binding;
}
@Bean
Binding bindingExchangeBar(Queue queueBar, TopicExchange exchange,RabbitAdmin rabbitAdmin) {
Binding binding = BindingBuilder.bind(queueBar).to(exchange).with("queue.bar");
rabbitAdmin.declareBinding(binding);
return binding;
}
/**
* 生产者用
* @return
*/
@Bean
public RabbitMessagingTemplate rabbitMessagingTemplate(RabbitTemplate rabbitTemplate) {
RabbitMessagingTemplate rabbitMessagingTemplate = new RabbitMessagingTemplate();
rabbitMessagingTemplate.setMessageConverter(jackson2Converter());
rabbitMessagingTemplate.setRabbitTemplate(rabbitTemplate);
return rabbitMessagingTemplate;
}
@Bean
public MappingJackson2MessageConverter jackson2Converter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
return converter;
}
}
Consumer configuration
@Configuration
@EnableRabbit
public class ConsumerConfig implements RabbitListenerConfigurer {
@Autowired
ReceiverService receiverService;
@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(new MappingJackson2MessageConverter());
return factory;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// factory.setPrefetchCount(5);
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
return factory;
}
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}
}
Message sending and receiving
Sending service
@Component
public class SenderService {
@Autowired
private RabbitMessagingTemplate rabbitMessagingTemplate;
public void sendFoo2Rabbitmq(final Foo foo) {
this.rabbitMessagingTemplate.convertAndSend("exchange", "queue.foo", foo);
}
public void sendBar2Rabbitmq(final Bar bar){
this.rabbitMessagingTemplate.convertAndSend("exchange", "queue.bar", bar);
}
}
call
@SpringBootApplication
@ComponentScan(basePackages = "com.demo")
public class RabbitmqdemoApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(RabbitmqdemoApplication.class, args);
}
@Autowired
SenderService senderService;
@Override
public void run(String... strings) throws Exception {
Random random = new Random();
while (true){
senderService.sendBar2Rabbitmq(new Bar(random.nextInt()));
senderService.sendFoo2Rabbitmq(new Foo(UUID.randomUUID().toString()));
}
}
}
Receive
@Component
public class ReceiverService {
@RabbitListener(queues = "queue.foo")
public void receiveFooQueue(Foo foo) {
System.out.println("Received Foo<" + foo.getName() + ">");
}
@RabbitListener(queues = "queue.bar")
public void receiveBarQueue(Bar bar) {
System.out.println("Received Bar<" + bar.getAge() + ">");
}
}
View output
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v1.3.2.RELEASE)
2016-02-02 02:26:07.099 INFO 2185 --- [ main] com.demo.RabbitmqdemoApplication : Starting RabbitmqdemoApplication on Jupiter.local with PID 2185 (/Users/codecraft/workspace/rabbitmqdemo/target/classes started by codecraft in /Users/codecraft/workspace/rabbitmqdemo)
2016-02-02 02:26:07.101 INFO 2185 --- [ main] com.demo.RabbitmqdemoApplication : No active profile set, falling back to default profiles: default
2016-02-02 02:26:07.166 INFO 2185 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@15b3e5b: startup date [Tue Feb 02 02:26:07 CST 2016]; root of context hierarchy
2016-02-02 02:26:08.004 INFO 2185 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type [class org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration$$EnhancerBySpringCGLIB$$d94c0656] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2016-02-02 02:26:08.976 INFO 2185 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Registering beans for JMX exposure on startup
2016-02-02 02:26:08.981 INFO 2185 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase -2147482648
2016-02-02 02:26:08.981 INFO 2185 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 0
2016-02-02 02:26:09.029 INFO 2185 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 2147483647
Received Foo<062226b0-cc27-433c-8ba9-201379f2d7c4>
Received Bar<-1717186171>
Received Foo<b2ac3cb9-2dfd-4bf1-8975-06aaca270096>
Received Bar<1094787449>
Received Foo<d63160f5-f919-4122-a995-185edcb6c231>
Received Bar<-39441298>
Received Foo<235369f1-0b59-4c4f-9a51-2277f3179798>
Received Bar<-596340646>
Received Foo<ef6e596c-b088-4b83-8b5c-e78e7ceaabcd>
Received Bar<-915839285>
Received Foo<17fb113a-8845-473a-9f46-f850526f6f4d>
Received Bar<-75651721>
Received Foo<d796bb56-478a-41e8-a2ed-b5944ae01642>
Received Bar<-1210351662>
Received Foo<23700d4a-26f9-4280-836d-9f6d63c0a3a0>
Received Bar<-2096776841>
Received Foo<d10c28d7-2c75-4b7d-b51b-992b98101018>
Received Bar<-1986644405>
View rabbitmq interface
View queue
Here are two instances started, two receivers, plus prefecth is set to 5, so the waiting ack is kept at 2*2*5=20
View channel
View connections
Pit
Warning for MimeType
2016-02-02 02:19:04.444 WARN 2168 --- [ main] o.s.amqp.support.SimpleAmqpHeaderMapper : skipping header 'contentType' since it is not of expected type [org.springframework.util.MimeType]
Sources foundDefaultAmqpHeaderMapper should accept contentType headers containing org.springframework.util.MimeType
There is no solution. At present, the log of amqp is adjusted to error.
Declare matters
In order to facilitate automatic processing, you can go to declare queue, topic, exchange in the code, and finally pay attention to declare binding, otherwise you will not receive the message.