Message queue for SpringBoot application

  rabbitmq, springboot

SpringBoot application series articles

Order

This article mainly talks about how to use rabbitmq to send and receive messages in Spring Boot.

Prepare rabbitmq cluster

Specific viewDocker Builds rabbitmq ClusterThis article.

New project

图片描述

Configuration item

#http://segmentfault.com/a/1190000004309900
spring.rabbitmq.host=192.168.99.100
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/

logging.level.org.springframework.amqp=ERROR
logging.level.com.demo=INFO

#spring.rabbitmq.dynamic 是否创建AmqpAdmin bean. 默认为: true
#spring.rabbitmq.listener.acknowledge-mode 指定Acknowledge的模式.
#spring.rabbitmq.listener.auto-startup 是否在启动时就启动mq,默认: true
#spring.rabbitmq.listener.concurrency 指定最小的消费者数量.
#spring.rabbitmq.listener.max-concurrency 指定最大的消费者数量.
#spring.rabbitmq.listener.prefetch 指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量.
#spring.rabbitmq.listener.transaction-size 指定一个事务处理的消息数量,最好是小于等于prefetch的数量.
#spring.rabbitmq.requested-heartbeat 指定心跳超时,0为不指定.
#spring.rabbitmq.ssl.enabled 是否开始SSL,默认: false
#spring.rabbitmq.ssl.key-store 指定持有SSL certificate的key store的路径
#spring.rabbitmq.ssl.key-store-password 指定访问key store的密码.
#spring.rabbitmq.ssl.trust-store 指定持有SSL certificates的Trust store.
#spring.rabbitmq.ssl.trust-store-password 指定访问trust store的密码.

Producer allocation

@Configuration
public class ProducerConfig {

    @Bean
    RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    Queue queueFoo(RabbitAdmin rabbitAdmin) {
        Queue queue = new Queue("queue.foo", true);
        rabbitAdmin.declareQueue(queue);
        return queue;
    }

    @Bean
    Queue queueBar(RabbitAdmin rabbitAdmin) {
        Queue queue = new Queue("queue.bar", true);
        rabbitAdmin.declareQueue(queue);
        return queue;
    }

    @Bean
    TopicExchange exchange(RabbitAdmin rabbitAdmin) {
        TopicExchange topicExchange = new TopicExchange("exchange");
        rabbitAdmin.declareExchange(topicExchange);
        return topicExchange;
    }

    @Bean
    Binding bindingExchangeFoo(Queue queueFoo, TopicExchange exchange,RabbitAdmin rabbitAdmin) {
        Binding binding = BindingBuilder.bind(queueFoo).to(exchange).with("queue.foo");
        rabbitAdmin.declareBinding(binding);
        return binding;
    }

    @Bean
    Binding bindingExchangeBar(Queue queueBar, TopicExchange exchange,RabbitAdmin rabbitAdmin) {
        Binding binding = BindingBuilder.bind(queueBar).to(exchange).with("queue.bar");
        rabbitAdmin.declareBinding(binding);
        return binding;
    }



    /**
     * 生产者用
     * @return
     */
    @Bean
    public RabbitMessagingTemplate rabbitMessagingTemplate(RabbitTemplate rabbitTemplate) {
        RabbitMessagingTemplate rabbitMessagingTemplate = new RabbitMessagingTemplate();
        rabbitMessagingTemplate.setMessageConverter(jackson2Converter());
        rabbitMessagingTemplate.setRabbitTemplate(rabbitTemplate);
        return rabbitMessagingTemplate;
    }

    @Bean
    public MappingJackson2MessageConverter jackson2Converter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        return converter;
    }
}

Consumer configuration

@Configuration
@EnableRabbit
public class ConsumerConfig implements RabbitListenerConfigurer {

    @Autowired
    ReceiverService receiverService;

    @Bean
    public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setMessageConverter(new MappingJackson2MessageConverter());
        return factory;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        // factory.setPrefetchCount(5);
         factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
        return factory;
    }

    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
    }

}

Message sending and receiving

Sending service

@Component
public class SenderService {

    @Autowired
    private RabbitMessagingTemplate rabbitMessagingTemplate;

    public void sendFoo2Rabbitmq(final Foo foo) {
        this.rabbitMessagingTemplate.convertAndSend("exchange", "queue.foo", foo);
    }

    public void sendBar2Rabbitmq(final Bar bar){
        this.rabbitMessagingTemplate.convertAndSend("exchange", "queue.bar", bar);
    }
}

call

@SpringBootApplication
@ComponentScan(basePackages = "com.demo")
public class RabbitmqdemoApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(RabbitmqdemoApplication.class, args);
    }

    @Autowired
    SenderService senderService;

    @Override
    public void run(String... strings) throws Exception {
        Random random = new Random();
        while (true){
            senderService.sendBar2Rabbitmq(new Bar(random.nextInt()));
            senderService.sendFoo2Rabbitmq(new Foo(UUID.randomUUID().toString()));
        }
    }
}

Receive

@Component
public class ReceiverService {

    @RabbitListener(queues = "queue.foo")
    public void receiveFooQueue(Foo foo) {
        System.out.println("Received Foo<" + foo.getName() + ">");
    }

    @RabbitListener(queues = "queue.bar")
    public void receiveBarQueue(Bar bar) {
        System.out.println("Received Bar<" + bar.getAge() + ">");
    }
}

View output

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v1.3.2.RELEASE)

2016-02-02 02:26:07.099  INFO 2185 --- [           main] com.demo.RabbitmqdemoApplication   : Starting RabbitmqdemoApplication on Jupiter.local with PID 2185 (/Users/codecraft/workspace/rabbitmqdemo/target/classes started by codecraft in /Users/codecraft/workspace/rabbitmqdemo)
2016-02-02 02:26:07.101  INFO 2185 --- [           main] com.demo.RabbitmqdemoApplication   : No active profile set, falling back to default profiles: default
2016-02-02 02:26:07.166  INFO 2185 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@15b3e5b: startup date [Tue Feb 02 02:26:07 CST 2016]; root of context hierarchy
2016-02-02 02:26:08.004  INFO 2185 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type [class org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration$$EnhancerBySpringCGLIB$$d94c0656] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2016-02-02 02:26:08.976  INFO 2185 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Registering beans for JMX exposure on startup
2016-02-02 02:26:08.981  INFO 2185 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase -2147482648
2016-02-02 02:26:08.981  INFO 2185 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 0
2016-02-02 02:26:09.029  INFO 2185 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
Received Foo<062226b0-cc27-433c-8ba9-201379f2d7c4>
Received Bar<-1717186171>
Received Foo<b2ac3cb9-2dfd-4bf1-8975-06aaca270096>
Received Bar<1094787449>
Received Foo<d63160f5-f919-4122-a995-185edcb6c231>
Received Bar<-39441298>
Received Foo<235369f1-0b59-4c4f-9a51-2277f3179798>
Received Bar<-596340646>
Received Foo<ef6e596c-b088-4b83-8b5c-e78e7ceaabcd>
Received Bar<-915839285>
Received Foo<17fb113a-8845-473a-9f46-f850526f6f4d>
Received Bar<-75651721>
Received Foo<d796bb56-478a-41e8-a2ed-b5944ae01642>
Received Bar<-1210351662>
Received Foo<23700d4a-26f9-4280-836d-9f6d63c0a3a0>
Received Bar<-2096776841>
Received Foo<d10c28d7-2c75-4b7d-b51b-992b98101018>
Received Bar<-1986644405>

View rabbitmq interface

图片描述

View queue

图片描述
Here are two instances started, two receivers, plus prefecth is set to 5, so the waiting ack is kept at 2*2*5=20
图片描述

View channel

图片描述

View connections

图片描述

Pit

Warning for MimeType

2016-02-02 02:19:04.444  WARN 2168 --- [           main] o.s.amqp.support.SimpleAmqpHeaderMapper  : skipping header 'contentType' since it is not of expected type [org.springframework.util.MimeType]

Sources foundDefaultAmqpHeaderMapper should accept contentType headers containing org.springframework.util.MimeType

There is no solution. At present, the log of amqp is adjusted to error.

Declare matters

In order to facilitate automatic processing, you can go to declare queue, topic, exchange in the code, and finally pay attention to declare binding, otherwise you will not receive the message.

References