RabbitMQ Message Broker Patterns

RabbitMQ uses variety of asynchronous architectural patterns to decouple applications. Here we’ll cover round robin, next available worker, and publish/subscribe models, and features such as routing, pattern filtering, acknowledgement, and durability [1].

 Message Broker

exchange.png
[2]

The patterns we’ll cover fall into two dispatch models, whether a message is sent to to one or multiple consumers.

 One-Message-to-One-Consumer Model

Work Queues distribute resource-consuming tasks among multiple workers, where each task is delivered to exactly one worker.  This model is useful when it’s difficult to handle a complex task during a short HTTP request window, such as like images to be resized or pdf files to be rendered.

 Round-Robin

By default RabbitMQ immediately dispatches (or pre-assigns) each message to the next consumer in sequence when it enters the queue. It dispatches messages evenly where where on average every consumer will get the same number of messages.

A shortcoming of this approach is when messages use uneven resources. In a situation with two workers, when all odd messages are heavy and even messages are light, one worker will be constantly busy and the other will do little work.

 Next Available Worker

RabbitMQ’s approach is to use the basicQos method with the prefetchCount = 1 setting.

RabbitMQ won’t give more than one message to a worker at a time. Or, in other words, don’t dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.

Let’s look at some code

ProducerTask.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send {

  private final static String QUEUE_NAME = "fifo";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    String message = "Do Stuff";
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));

    channel.close();
    connection.close();
  }
}

Let’s look closely.

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

The connection abstracts the socket connection, and takes care of protocol version negotiation and authentication and so on for us. Here we connect to a broker on the local machine - hence the localhost. If we wanted to connect to a broker on a different machine we’d simply specify its name or IP address here.

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

String message = "Do Stuff";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

Next we create a channel, which is where most of the API for getting things done resides.

To send, we must declare a queue for us to send to; then we can publish a message to the queue.

Declaring a queue is idempotent - it will only be created if it doesn’t exist already. The message content is a byte array, so you can encode whatever you like there.

channel.close();
connection.close();

Lastly, we close the channel and the connection.

ConsumerWorker.java

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;

public class Consumer {

  private final static String QUEUE_NAME = "fifo";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, true, false, false, null);

    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, 
          AMQP.BasicProperties properties, byte[] body)
          throws IOException {
              ...
      }
    };
    channel.basicConsume(QUEUE_NAME, true, consumer);
  }
}

Note that we declare the queue here, as well. Because we might start the consumer before the publisher, we want to make sure the queue exists before we try to consume messages from it.

We’re about to tell the server to deliver us the messages from the queue. Since it will push us messages asynchronously, we provide a callback in the form of an object that will buffer the messages until we’re ready to use them. That is what a DefaultConsumer subclass does.

 One-Message-to-Multiple-Consumers model

More complicated patterns are when an Exchange dispatches one message to multiple consumers. In RabbitMQ, the exchange type drives these patterns.

To list all exchanges on the server

sudo rabbitmqctl list_exchanges

 Publish/subscribe

A classic pattern where a producer publishes a message to set of subscribed consumers.

https://www.rabbitmq.com/img/tutorials/python-three-overall.png

Routing

We’ll send messages to a direct exchange. We will supply the log severity as a routing key. That way the receiving program will be able to select the severity it wants to receive.  a message goes to the queues whose binding key exactly matches the routing key of the message.

Selected queues bound exchange

https://www.rabbitmq.com/img/tutorials/direct-exchange.png

Also, can bind multiple queues to same exchange, using same binding key https://www.rabbitmq.com/img/tutorials/direct-exchange-multiple.png. if all queues were bound to exchange, it’d be same as fanout exchange type.

https://www.rabbitmq.com/img/tutorials/python-four.png

Producer

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class ProducerPublisher {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        channel.close();
        connection.close();
    }
    //...
}

Consumer

import com.rabbitmq.client.*;

import java.io.IOException;

public class ConsumerSubscriber {
  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope,
                                 AMQP.BasicProperties properties, byte[] body) throws 
                  IOException {
            ...
          }
    };
    channel.basicConsume(queueName, true, consumer);
  }
}

Instead of queueDeclare we use

channel.exchangeDeclare("logs", "fanout");

with parameter queue name and exchange type.

 Acknowledgement

By default, once RabbitMQ delivers a message to the customer it immediately marks it for deletion. If the consumer crashes before completion the message is lost.

When acknowledgement turned on, an ack(nowledgement) is sent back by the consumer to tell RabbitMQ that a particular message has been received, processed and that RabbitMQ is free to delete it. If a consumer crashes (its channel is closed, connection is closed, or TCP connection is lost) without sending an ack, RabbitMQ will understand that a message wasn’t processed fully and will re-queue it.

 Durability

Due to acknowledgement, if our consumer stops, the task isn’t lost.
Durability prevents queues or messages being lost when RabbitMQ process stops. We need to make both the queue and messages as durable.

boolean durable = true;
channel.queueDeclare("fifo", durable, false, false, null);

In this short tutorial we covered basic architectural patterns with RabbitMQ such as round-robin, next available worker, and publish/subscribe. I hope it was useful to you.

References:

  1. RabbitMQ Tutorials https://www.rabbitmq.com/tutorials

  2. RabbitMQ Tutorials, Producer Exchange Queue, Consumer https://www.rabbitmq.com/tutorials/tutorial-four-java.html

 
3
Kudos
 
3
Kudos

Now read this

What’s “new” in JavaScript?

Few syntactic features highlight JavaScript’s object oriented nature more than the new operator. Many of us have been using it for years but do we really know what it does? In languages like Java it allocates memory for an instance... Continue →