Reactor-rabbitmq

  rabbitmq, reactor

Order

This article mainly studies how to use reactor-rabbitmq

maven

        <dependency>
            <groupId>io.projectreactor.rabbitmq</groupId>
            <artifactId>reactor-rabbitmq</artifactId>
            <version>1.0.0.M2</version>
        </dependency>

rabbitmq

Example

    @Test
    public void testProducer() throws InterruptedException {
        int count = 100;
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.useNio();
        connectionFactory.setUsername("myuser");
        connectionFactory.setPassword("mypass");
        SenderOptions senderOptions =  new SenderOptions()
                .connectionFactory(connectionFactory)
                .connectionSupplier(cf -> cf.newConnection(
                        new Address[] {new Address("192.168.99.100",5672), new Address("192.168.99.100",5673), new Address("192.168.99.100",5674)},
                        "reactive-sender"))
                .resourceCreationScheduler(Schedulers.elastic());
        Sender sender = ReactorRabbitMq.createSender(senderOptions);
        Flux<OutboundMessageResult> confirmations = sender.sendWithPublishConfirms(Flux.range(1, count)
                .map(i -> new OutboundMessage("", QUEUE, ("Message_" + i).getBytes())));

        CountDownLatch latch = new CountDownLatch(count);

        sender.declareQueue(QueueSpecification.queue(QUEUE))
                .thenMany(confirmations)
                .doOnError(e -> LOGGER.error("Send failed", e))
                .subscribe(r -> {
                    if (r.isAck()) {
                        LOGGER.info("Message {} sent successfully", new String(r.getOutboundMessage().getBody()));
                        latch.countDown();
                    }
                });

        latch.await(10, TimeUnit.SECONDS);
        sender.close();
    }

    @Test
    public void testConsumer() throws InterruptedException {
        int count = 100;
        CountDownLatch latch = new CountDownLatch(count);

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.useNio();
        connectionFactory.setUsername("myuser");
        connectionFactory.setPassword("mypass");
        SenderOptions senderOptions =  new SenderOptions()
                .connectionFactory(connectionFactory)
                .connectionSupplier(cf -> cf.newConnection(
                        new Address[] {new Address("192.168.99.100",5672), new Address("192.168.99.100",5673), new Address("192.168.99.100",5674)},
                        "reactive-sender"))
                .resourceCreationScheduler(Schedulers.elastic());

        Sender sender = ReactorRabbitMq.createSender(senderOptions);
        Mono<AMQP.Queue.DeclareOk> queueDeclaration = sender.declareQueue(QueueSpecification.queue(QUEUE));

        ReceiverOptions receiverOptions = new ReceiverOptions()
                .connectionFactory(connectionFactory)
                .connectionSupplier(cf -> cf.newConnection(
                        new Address[] {new Address("192.168.99.100",5672), new Address("192.168.99.100",5673), new Address("192.168.99.100",5674)},
                        "reactive-receiver"))
                .connectionSubscriptionScheduler(Schedulers.elastic());
        Receiver receiver = ReactorRabbitMq.createReceiver(receiverOptions);
        Flux<Delivery> messages = receiver.consumeAutoAck(QUEUE);
        Disposable disposable = queueDeclaration.thenMany(messages).subscribe(m -> {
            LOGGER.info("Received message {}", new String(m.getBody()));
            latch.countDown();
        });

        latch.await(10, TimeUnit.SECONDS);

        disposable.dispose();
        sender.close();
        receiver.close();
    }
  • Since the account password is set, it is necessary to specify the account password at the ConnectionFactory.
  • In addition, due to the use of rabbitmq cluster, multiple rabbitmq addresses to be connected are specified through the connectionSupplier.
  • Both producer and consumer here operate through queueDeclaration.

Summary

Reactor-rabbitmq encapsulates rabbitmq’s api and transforms it into reactive streams mode, providing Non-blocking Back-pressure and End-to-end Reactive Pipeline features.

doc