Order
This article mainly studies how to use reactor-rabbitmq
maven
<dependency>
<groupId>io.projectreactor.rabbitmq</groupId>
<artifactId>reactor-rabbitmq</artifactId>
<version>1.0.0.M2</version>
</dependency>
rabbitmq
- ReferencesDocker Builds rabbitmq Cluster
- The currently used image is bijurunjammen/rabbimq-server: 3.7.0, and the account password configured in the docker-compose file is myuser/mypass
- Accesshttp://192.168.99.100: 15672 You can view the interface
Example
@Test
public void testProducer() throws InterruptedException {
int count = 100;
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.useNio();
connectionFactory.setUsername("myuser");
connectionFactory.setPassword("mypass");
SenderOptions senderOptions = new SenderOptions()
.connectionFactory(connectionFactory)
.connectionSupplier(cf -> cf.newConnection(
new Address[] {new Address("192.168.99.100",5672), new Address("192.168.99.100",5673), new Address("192.168.99.100",5674)},
"reactive-sender"))
.resourceCreationScheduler(Schedulers.elastic());
Sender sender = ReactorRabbitMq.createSender(senderOptions);
Flux<OutboundMessageResult> confirmations = sender.sendWithPublishConfirms(Flux.range(1, count)
.map(i -> new OutboundMessage("", QUEUE, ("Message_" + i).getBytes())));
CountDownLatch latch = new CountDownLatch(count);
sender.declareQueue(QueueSpecification.queue(QUEUE))
.thenMany(confirmations)
.doOnError(e -> LOGGER.error("Send failed", e))
.subscribe(r -> {
if (r.isAck()) {
LOGGER.info("Message {} sent successfully", new String(r.getOutboundMessage().getBody()));
latch.countDown();
}
});
latch.await(10, TimeUnit.SECONDS);
sender.close();
}
@Test
public void testConsumer() throws InterruptedException {
int count = 100;
CountDownLatch latch = new CountDownLatch(count);
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.useNio();
connectionFactory.setUsername("myuser");
connectionFactory.setPassword("mypass");
SenderOptions senderOptions = new SenderOptions()
.connectionFactory(connectionFactory)
.connectionSupplier(cf -> cf.newConnection(
new Address[] {new Address("192.168.99.100",5672), new Address("192.168.99.100",5673), new Address("192.168.99.100",5674)},
"reactive-sender"))
.resourceCreationScheduler(Schedulers.elastic());
Sender sender = ReactorRabbitMq.createSender(senderOptions);
Mono<AMQP.Queue.DeclareOk> queueDeclaration = sender.declareQueue(QueueSpecification.queue(QUEUE));
ReceiverOptions receiverOptions = new ReceiverOptions()
.connectionFactory(connectionFactory)
.connectionSupplier(cf -> cf.newConnection(
new Address[] {new Address("192.168.99.100",5672), new Address("192.168.99.100",5673), new Address("192.168.99.100",5674)},
"reactive-receiver"))
.connectionSubscriptionScheduler(Schedulers.elastic());
Receiver receiver = ReactorRabbitMq.createReceiver(receiverOptions);
Flux<Delivery> messages = receiver.consumeAutoAck(QUEUE);
Disposable disposable = queueDeclaration.thenMany(messages).subscribe(m -> {
LOGGER.info("Received message {}", new String(m.getBody()));
latch.countDown();
});
latch.await(10, TimeUnit.SECONDS);
disposable.dispose();
sender.close();
receiver.close();
}
- Since the account password is set, it is necessary to specify the account password at the ConnectionFactory.
- In addition, due to the use of rabbitmq cluster, multiple rabbitmq addresses to be connected are specified through the connectionSupplier.
- Both producer and consumer here operate through queueDeclaration.
Summary
Reactor-rabbitmq encapsulates rabbitmq’s api and transforms it into reactive streams mode, providing Non-blocking Back-pressure and End-to-end Reactive Pipeline features.