Tuesday, 30 June 2015

Implementing AMQP based messaging solutions using Spring concepts for RabbitMQ Fanout Exchange (XML based configuration with listener)

In a real world scenario, we won't run a java class every time we need to retrieve messages from the queue. Ideally we would configure a listener which would retrieve the messages automatically for us.

In this example we will create a java application(Producer) which will send the messages to the queue and a web application(Consumer) which will receive messages from RabbitMQ using the listener configured.


Design:
  1. Exchange type: Fanout exchange
  2. Input argument: String
  3. Queue argument: default arguments
  4. Configuration type: xml based configuration

In this tutorial, we will create 2 queues (myQueue1 and myQueue2) and bind them to fanout exchange (myExchange) created. In our main class of our producer application, we send the messages to the exchange and retrieve message from the queues in our consumer applications. 



Prerequisites:
  1. RabbitMQ is installed locally
  2. Maven is configured
  3. Tomcat server is configured

Producer application:

If you have gone through my previous post, then you already know how to create a java application to send the messages to the queue. Only change you need to make here is, in the main class we just need to send the message (in the previous example we were sending and receiving messages in the same class).

     AbstractApplicationContext context = new ClassPathXmlApplicationContext("rabbit-context.xml");
     RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
     rabbitTemplate.convertAndSend("Message");


Consumer application:
As shown in the above diagram, each consumer application is bound to single queue. Below you will find the code needed to receive message from one of the queue. In order to create second consumer application, we just need to update the queue name in step 4

Step 1: Update the pom.xml with spring-amqp and spring-core dependencies
 <dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit</artifactId>
  <version>1.4.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-core</artifactId>
  <version>3.2.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-context</artifactId>
  <version>3.2.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-web</artifactId>
  <version>3.2.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-webmvc</artifactId>
  <version>3.2.2.RELEASE</version>
 </dependency>


Step 2: Update the web.xml file to read the rabbit configuration xml during server start up
<servlet>
 <servlet-name>RabbitMqConsumerService</servlet-name>
 <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
 <init-param>
  <param-name>contextConfigLocation</param-name>
  <param-value>classpath:rabbit-context.xml</param-value>
 </init-param>
 <load-on-startup>1</load-on-startup>
</servlet>

<servlet-mapping>
 <servlet-name>RabbitMqConsumerService</servlet-name>
 <url-pattern>/</url-pattern>
</servlet-mapping>

<listener>
 <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>

<context-param>
 <param-name>contextConfigLocation</param-name>
 <param-value>classpath:rabbit-context.xml</param-value>
</context-param>

Step 3: Update the spring context xml with rabbit connection, exchange and queue details
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
 xsi:schemaLocation="http://www.springframework.org/schema/rabbit
           http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
           http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"> 
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
 <property name="location">
  <value>classpath:RabbitConfig.properties</value>
 </property>
</bean>

<rabbit:connection-factory id="rabbitConnectionFactory"
 host="${rabbit.host}" username="${rabbit.username}" password="${rabbit.password}"
 port="${rabbit.port}"/>

<rabbit:admin connection-factory="rabbitConnectionFactory" />

<rabbit:template id="rabbitTemplate"
 connection-factory="rabbitConnectionFactory" exchange="${rabbit.exchange}" />

<rabbit:queue name="${rabbit.queue}" />

<rabbit:fanout-exchange name="${rabbit.exchange}">
 <rabbit:bindings>
  <rabbit:binding queue="${rabbit.queue}" />
 </rabbit:bindings>
</rabbit:fanout-exchange>

<bean id="messageListener" class="com.spring.amqp.FanoutExchangeConsumer" />

<rabbit:listener-container
 connection-factory="rabbitConnectionFactory">
 <rabbit:listener queues="${rabbit.queue}" ref="messageListener"
  method="processMessage" />
</rabbit:listener-container>
In the xml file, we declare connection factory, rabbit template, queues, bindings, listener etc. Among these few attributes are optional depending on the design or development approach of your application. In order to understand more about each attribute please click here

Step 4: Create RabbitConfig.properties file
rabbit.host=localhost
rabbit.username=guest
rabbit.password=guest
rabbit.port=5672
rabbit.exchange=myExchange
rabbit.queue=myQueue1

Step 5: Create main class(com.spring.amqp.FanoutExchangeConsumer) to send and receive messages using RabbitTemplate
 public void processMessage(String msg) {
  System.out.println(msg);
 }

When the queue receives a message, the consumer application will automatically process them because of the listener configured.

I have attached the sample Producer and consumer (Consumer1 and Consumer2) applications for reference. Deploy both the consumer applications on the tomcat server and send a sample message using the producer application.

In the next tutorial, we will look how to configure fanout exchange which accepts attributes like content type, content length etc along with the message data