Sunday, 22 March 2015

Implementing AMQP based messaging solutions using Spring concepts for RabbitMQ Fanout Exchange (XML based configuration)


In this example we will create a single java application which will send and receive messages from RabbitMQ using spring amqp.

Design:
  1. Exchange type: Fanout exchange
  2. Input argument: String
  3. Queue argument: default arguments
  4. Configuration type: xml based configuration
Note about design: In this example we will start with the basic design and then try to improve on this in the coming examples
  • We can pass String or Message(org.springframework.amqp.core.Message) as input argument. We will see what are the advantages of using each type in the coming posts
  • When we create a queue, it is created with few default values like durable as true, auto delete as false etc, we will try to understand what each value means and also configure other optional parameters like TTL, Dead letter exchange etc and understand the behavior
  • We will also look how to configure using annotations in the comings tutorials

In this tutorial, we will create 2 queues (myQueue1 and myQueue2) and bind them to fanout exchange (myExchange) created. In our main class, we send the messages to the exchange and retrieve message both the queues which are bound to this exchange.



Prerequisites:
  1. RabbitMQ is installed locally
  2. Maven is configured

Step 1: Update the pom.xml with spring-amqp and spring-core dependencies
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.4.2.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>3.2.2.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>3.2.2.RELEASE</version>
        </dependency>

Step 2: Update the spring context xml with rabbit connection, exchange and queue details
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
 xsi:schemaLocation="http://www.springframework.org/schema/rabbit
           http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
           http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"> 
<rabbit:connection-factory id="connectionFactory"
        host="localhost" username="guest" password="guest" />

    <rabbit:template id="rabbitTemplate"
        connection-factory="connectionFactory" exchange="myExchange" />

    <rabbit:admin connection-factory="connectionFactory" />

    <rabbit:queue name="myQueue1" />
    <rabbit:queue name="myQueue2" />

    <rabbit:fanout-exchange name="myExchange">
        <rabbit:bindings>
            <rabbit:binding queue="myQueue1" />
            <rabbit:binding queue="myQueue2" />
        </rabbit:bindings>
    </rabbit:fanout-exchange>
In the xml file, we declare connection factory, rabbit template, queues, bindings etc. Among these few attributes are optional depending on the design or development approach of your application. In order to understand more about each attribute please click here

Step 3: Create main class to send and receive messages using RabbitTemplate
     AbstractApplicationContext context = new ClassPathXmlApplicationContext("rabbit-context.xml");
     RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
     rabbitTemplate.convertAndSend("Message");
        
     String msg1  = (String) rabbitTemplate.receiveAndConvert("myQueue1");
     System.out.println(msg1);
     String msg2  = (String) rabbitTemplate.receiveAndConvert("myQueue2");
     System.out.println(msg2);

When we run the above class, rabbitTemplate sends the message string to the exchange which is configured in the rabbit-context.xml and then receive the message from the one of the queues specified in the argument of the receive method.

In the next tutorial, we will look at how to add a listener class so that we process the messages automatically

Rabbit MQ basic concepts

In this post we will try to understand the basic concepts of Rabbit MQ, components involved in transmission of messages, basic api's required in spring-amqp and so on...

RabbitMQ: RabbitMQ is open source message broker software that implements the Advanced Message Queuing Protocol (AMQP).

AMQP: The Advanced Message Queuing Protocol (AMQP) is an open standard application layer protocol for sending and receiving messages between distributed systems.

Message Brokers: Message brokers receive messages from the publishers and route them to consumers.

Bindings: Relationship between the exchange and the queue is called binding

Rabbit MQ working:
  1. Messages are published to the exchange
  2. Exchange distributes the message copies to queues using binding rules
  3. Consumer application receives the messages. In AMQP model there are two ways for applications to consume
    1. Have message delivered to the consumer application(push API)
    2. Fetch messages as needed(pull API)


Exchange types: Exchanges take a message and route it into zero or more queues. The routing algorithm used depends on the exchange type and bindings
There are 4 types of exchanges:
  1. Direct exchange: A direct exchange delivers messages to queues based on the message routing key.
  2. Fanout exchange: A fanout exchange routes messages to all of the queues that are bound to it and the routing key is ignored.
  3. Topic exchange: Topic exchanges route messages to one or many queues based on matching between a message routing key and the pattern that was used to bind a queue to an exchange.
  4. Headers exchange: A headers exchange is designed for routing on multiple attributes that are more easily expressed as message headers than a routing key.
Message acknowledgements
Consumer applications may fail to process the messages due various reasons like network issue or system failure etc.
AMQP specification proposes two choices for this:
  1. Delete the message from the queue, when broker sends the message to the application (automatic acknowledgement model: using either basic.deliver or basic.get-ok AMQP method)
  2. Delete the message from the queue, when the application sends back the acknowledgement (explicit acknowledgment model: using basic.ack AMQP method)
Important Spring AMQP API’s for basic operations:

spring-amqp is the base abstraction and spring-rabbit is the RabbitMQ implementation
There are mainly 3 important basic classes we need to know before starting the development:
  1. CachingConnectionFactory: Used for creating connections
  2. RabbitAdmin: Used for declaring exchanges, queues, bindings
  3. RabbitTemplate: Specifies basic set of AMQP operations
In the next post, we will see how to configure fanout exchange using spring amqp and exchange messages between producer and consumer applications