Sunday, 5 July 2015

Understanding xml attributes in rabbit config


In enterprise applications, we will not be creating the queues. We will have Rabbit admin who will create the queues with the required attributes and we will be using them.

In the rabbit configuration xml file, we declare connection factory, rabbit template, queues, bindings, listener etc, in order to transfer messages from producer to consumer application. Among these few attributes are optional depending on the design or development approach of your application. Lets discuss each of them in detail here

<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
 <property name="location">
  <value>classpath:RabbitConfig.properties</value>
 </property>
</bean>

<rabbit:connection-factory id="rabbitConnectionFactory"
 host="${rabbit.host}" username="${rabbit.username}" password="${rabbit.password}"
 port="${rabbit.port}"/>

<rabbit:admin connection-factory="rabbitConnectionFactory" />

<rabbit:template id="rabbitTemplate"
 connection-factory="rabbitConnectionFactory" exchange="${rabbit.exchange}" />

<rabbit:queue name="${rabbit.queue}" />

<rabbit:fanout-exchange name="${rabbit.exchange}">
 <rabbit:bindings>
  <rabbit:binding queue="${rabbit.queue}" />
 </rabbit:bindings>
</rabbit:fanout-exchange>

No matter what the approach the following are mandatory:
  1. rabbit connection factory
  2. rabbit template
  3. rabbit queue declaration
Rabbit Connection factory: Creates an instance of rabbit CachingConnectionFactory (org.springframework.amqp.rabbit.connection)

Rabbit Template: Creates a rabbit template (org.springframework.amqp.rabbit.core) for convenient access to the message broker.

Rabbit Queue: Creates a queue. Uses an existing queue with the same name if it exists on the broker, or else declares a new one

Note that in order for rabbit queue attribute to create a new queue, we would need rabbit admin attribute
Rabbit Admin: Creates an instance of rabbit RabbitAdmin (org.springframework.amqp.rabbit.core)
So if we already have queues configured, then we need not have to rabbit admin attribute. Our application will work just fine. But if we add this attribute then we need to very careful while creating queues, because if we enter wrong queue, a new queue will be created which may cause errors


Implementing AMQP based messaging solutions using Spring concepts for RabbitMQ Fanout Exchange (XML based configuration with AMQP Message object as input)

In the previous example, we understood how a message is transferred from publisher to the consumer application. We used String as the data format for the same. But generally when two applications interact with each other the data exchange happens via xml or json.


Since we can send messages in many format we need a way to identify the type of message that was sent and any other properties, Spring AMQP defines a message class where we can send these additional properties and actual message in a single instance.


In this example we will create 2 java applications(Producers) which will send xml and json messages to the queue and a web application(Consumer) which will receive messages and based on the content type in the message header process them accordingly.



Design:

  1. Exchange type: Fanout exchange
  2. Input argument: Message (org.springframework.amqp.core.Message)
  3. Queue argument: default arguments
  4. Configuration type: xml based configuration

In this tutorial, we will create 1 queue (myQueue) and bind it to fanout exchange (myExchange) created. In our main class of our producer applications, we send the messages to the exchange and retrieve message from the queue in our consumer application.



Prerequisites:

  1. RabbitMQ is installed locally
  2. Maven is configured
  3. Tomcat server is configured

Producer application 1 (sends xml data):

Step 1: Update the pom.xml with spring-amqp, spring-core and xml dependencies

  <dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit</artifactId>
  <version>1.4.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-core</artifactId>
  <version>3.2.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-context</artifactId>
  <version>3.2.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-oxm</artifactId>
  <version>3.2.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>javax.xml.bind</groupId>
  <artifactId>jaxb-api</artifactId>
  <version>2.0</version>
 </dependency>

Step 2: Update the spring context xml with rabbit connection, exchange and queue details

<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
 xsi:schemaLocation="http://www.springframework.org/schema/rabbit
           http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
           http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">

        <bean
  class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
  <property name="location">
   <value>classpath:RabbitConfig.properties</value>
  </property>
 </bean>

 <rabbit:connection-factory id="rabbitConnectionFactory"
  host="${rabbit.host}" username="${rabbit.username}" password="${rabbit.password}"
  port="${rabbit.port}" />

 <rabbit:admin connection-factory="rabbitConnectionFactory" />

 <rabbit:template id="rabbitTemplate"
  connection-factory="rabbitConnectionFactory" exchange="${rabbit.exchange}" />

 <rabbit:queue name="${rabbit.queue}" />

 <rabbit:fanout-exchange name="${rabbit.exchange}">
  <rabbit:bindings>
   <rabbit:binding queue="${rabbit.queue}" />
  </rabbit:bindings>
 </rabbit:fanout-exchange>


 <bean id="jaxb2Marshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller">
  <property name="classesToBeBound">
   <list>
    <value>com.spring.amqp.model.Order</value>
   </list>
  </property>
 </bean>
In the xml file, we declare connection factory, rabbit template, queues, bindings, listener etc. Among these few attributes are optional depending on the design or development approach of your application. In order to understand more about each attribute please click here

Step 3: Create RabbitConfig.properties file
rabbit.host=localhost
rabbit.username=guest
rabbit.password=guest
rabbit.port=5672
rabbit.exchange=myExchange
rabbit.queue=myQueue

Step 4: Create a POJO class
@XmlRootElement
public class Order {

 private long orderNumber;
 private long custNumber;
 private Date orderDate;
 private String shippingNumber;

        // Create getter and setter methods
        // Override toString method
}

Step 5: Create main class(com.spring.amqp.main.TestFanoutProducer) to send and receive messages using RabbitTemplate

 AbstractApplicationContext context = new ClassPathXmlApplicationContext("rabbit-context.xml");
 RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
 Marshaller converter = (Marshaller) context.getBean("jaxb2Marshaller");
 
 MessageProperties messageProperties = new MessageProperties();
 messageProperties.setTimestamp(new Date());
 messageProperties.setContentType("application/xml");
 
 Order order = new Order();
 order.setOrderNumber(12345);
 order.setCustNumber(65839);
 order.setOrderDate(new Date());
 order.setShippingNumber("ABCD1234");
 
 // Object to XML
 StringWriter writer = new StringWriter();
 converter.marshal(order, new StreamResult(writer));
 String output = writer.toString();
 
 Message message = new Message(output.getBytes(), messageProperties);
 
 rabbitTemplate.convertAndSend(message);



Producer application 2 (sends json data):
Step 1: Update the pom.xml with spring-amqp, spring-core and json dependencies

 <dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit</artifactId>
  <version>1.4.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-core</artifactId>
  <version>3.2.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-context</artifactId>
  <version>3.2.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-oxm</artifactId>
  <version>3.2.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-databind</artifactId>
  <version>2.5.4</version>
 </dependency>


Step 2: Update the spring context xml with rabbit connection, exchange and queue details

<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
 xsi:schemaLocation="http://www.springframework.org/schema/rabbit
           http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
           http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"> 

 <bean
  class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
  <property name="location">
   <value>classpath:RabbitConfig.properties</value>
  </property>
 </bean>

 <rabbit:connection-factory id="rabbitConnectionFactory"
  host="${rabbit.host}" username="${rabbit.username}" password="${rabbit.password}"
  port="${rabbit.port}" />

 <rabbit:admin connection-factory="rabbitConnectionFactory" />

 <rabbit:template id="rabbitTemplate"
  connection-factory="rabbitConnectionFactory" exchange="${rabbit.exchange}" />

 <rabbit:queue name="${rabbit.queue}" />

 <rabbit:fanout-exchange name="${rabbit.exchange}">
  <rabbit:bindings>
   <rabbit:binding queue="${rabbit.queue}" />
  </rabbit:bindings>
 </rabbit:fanout-exchange>
In the xml file, we declare connection factory, rabbit template, queues, bindings, listener etc. Among these few attributes are optional depending on the design or development approach of your application. In order to understand more about each attribute please click here

Step 3: Create RabbitConfig.properties file

rabbit.host=localhost
rabbit.username=guest
rabbit.password=guest
rabbit.port=5672
rabbit.exchange=myExchange
rabbit.queue=myQueue

Step 4: Create a POJO class
public class Order {

 private long orderNumber;
 private long custNumber;
 private Date orderDate;
 private String shippingNumber;

        // Create getter and setter methods
        // Override toString method
}

Step 5: Create main class(com.spring.amqp.main.TestFanoutProducer) to send and receive messages using RabbitTemplate

 AbstractApplicationContext context = new ClassPathXmlApplicationContext("rabbit-context.xml");
 RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
 
 MessageProperties messageProperties = new MessageProperties();
 messageProperties.setTimestamp(new Date());
 messageProperties.setContentType("application/json");
 
 Order order = new Order();
 order.setOrderNumber(12345);
 order.setCustNumber(65839);
 order.setOrderDate(new Date());
 order.setShippingNumber("ABCD1234");
 
 ObjectMapper mapper = new ObjectMapper();
 Message message = new Message(mapper.writeValueAsBytes(order), messageProperties);
 
 rabbitTemplate.convertAndSend(message);







Consumer Application:
Step 1: Update the pom.xml with spring-amqp, spring-core, xml and json dependencies

  <dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit</artifactId>
  <version>1.4.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-core</artifactId>
  <version>3.2.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-context</artifactId>
  <version>3.2.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-web</artifactId>
  <version>3.2.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-webmvc</artifactId>
  <version>3.2.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-oxm</artifactId>
  <version>3.2.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>javax.xml.bind</groupId>
  <artifactId>jaxb-api</artifactId>
  <version>2.0</version>
 </dependency>
 <dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-databind</artifactId>
  <version>2.5.4</version>
 </dependency>

Step 2: Update the spring context xml with rabbit connection, exchange and queue details

<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
 xsi:schemaLocation="http://www.springframework.org/schema/rabbit
           http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
           http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"> 

 <context:component-scan base-package="com.spring.amqp.main" />

 <bean
  class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
  <property name="location">
   <value>classpath:RabbitConfig.properties</value>
  </property>
 </bean>

 <rabbit:connection-factory id="rabbitConnectionFactory"
  host="${rabbit.host}" username="${rabbit.username}" password="${rabbit.password}"
  port="${rabbit.port}" />

 <rabbit:admin connection-factory="rabbitConnectionFactory" />

 <rabbit:template id="rabbitTemplate"
  connection-factory="rabbitConnectionFactory" exchange="${rabbit.exchange}" />

 <rabbit:queue name="${rabbit.queue}" />

 <rabbit:fanout-exchange name="${rabbit.exchange}">
  <rabbit:bindings>
   <rabbit:binding queue="${rabbit.queue}" />
  </rabbit:bindings>
 </rabbit:fanout-exchange>


 <bean id="jaxb2Marshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller">
  <property name="classesToBeBound">
   <list>
    <value>com.spring.amqp.model.Order</value>
   </list>
  </property>
 </bean>

 <bean id="messageListener" class="com.spring.amqp.main.FanoutExchangeConsumer" />

 <rabbit:listener-container
  connection-factory="rabbitConnectionFactory">
  <rabbit:listener queues="${rabbit.queue}" ref="messageListener" />
 </rabbit:listener-container>
In the xml file, we declare connection factory, rabbit template, queues, bindings, listener etc. Among these few attributes are optional depending on the design or development approach of your application. In order to understand more about each attribute please click here

Step 3: Create RabbitConfig.properties file

rabbit.host=localhost
rabbit.username=guest
rabbit.password=guest
rabbit.port=5672
rabbit.exchange=myExchange
rabbit.queue=myQueue

Step 4: Create a POJO class
@XmlRootElement
public class Order {

 private long orderNumber;
 private long custNumber;
 private Date orderDate;
 private String shippingNumber;

        // Create getter and setter methods
        // Override toString method
}


Step 5: Create main class(com.spring.amqp.main.FanoutExchangeConsumer) to send and receive messages using RabbitTemplate

 @Autowired
 Jaxb2Marshaller unmarshaller;

 public void onMessage(Message message){
  Order order = null;
  System.out.println("Message received from queue at " + message.getMessageProperties().getTimestamp() + " is : " + new String (message.getBody()) + " with content type as : "+ message.getMessageProperties().getContentType());

  if(message.getMessageProperties().getContentType().equals("application/xml")){
   InputStream stream = new ByteArrayInputStream(message.getBody());
   Source source = new StreamSource(stream);
   try {
    order = (Order) unmarshaller.unmarshal(source);
   } catch (XmlMappingException e) {
    e.printStackTrace();
   }
   System.out.println(order);
  }else if (message.getMessageProperties().getContentType().equals("application/json")){
   ObjectMapper mapper = new ObjectMapper();
   try {
    order = (Order) mapper.readValue(message.getBody(), Order.class);
   } catch (JsonParseException e) {
    e.printStackTrace();
   } catch (JsonMappingException e) {
    e.printStackTrace();
   } catch (IOException e) {
    e.printStackTrace();
   }
   System.out.println(order);
  }
 }

When the queue receives a message, the consumer application will automatically process them based on the content type value present in the message properties.

I have attached the sample Producer applications(xml and json producers) and consumer application for reference.

In the next tutorial, we will look how to configure fanout exchange using annotations

Friday, 3 July 2015

Implementing AMQP based messaging solutions using Spring concepts for RabbitMQ Fanout Exchange (Annotation based configuration)

In this post we will look at how to develop a consumer application which will receive messages from the RabbitMQ using annotations. If you are not sure about how to develop a producer application to send messages to the queue, please refer to my earlier post. I have also attached the sample application below for quick reference.

Design:
  1. Exchange type: Fanout exchange
  2. Input argument: String
  3. Queue argument: default arguments
  4. Configuration type: annotations based configuration 
In this tutorial, we will create 2 queues (myQueue1 and myQueue2) and bind them to fanout exchange (myExchange) created. In our main class of our producer application, we send the messages to the exchange and retrieve message from the queues in our consumer applications. 


Prerequisites:
  1. RabbitMQ is installed locally
  2. Maven is configured
  3. Tomcat server is configured 

Consumer application:
As shown in the above diagram, each consumer application is bound to single queue. Below you will find the code needed to receive message from one of the queue. In order to create second consumer application, we just need to update the queue name in step 4

Step 1: Update the pom.xml with spring-amqp and spring-core dependencies
 <dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit</artifactId>
  <version>1.4.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-core</artifactId>
  <version>4.1.0.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-context</artifactId>
  <version>4.1.0.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-web</artifactId>
  <version>4.1.0.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-webmvc</artifactId>
  <version>4.1.0.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-beans</artifactId>
  <version>4.1.0.RELEASE</version>
 </dependency>

Step 2: Update the web.xml file to read the rabbit configuration xml during server start up
  <servlet>
   <servlet-name>RabbitMqConsumerService</servlet-name>
   <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
   <init-param>
    <param-name>contextConfigLocation</param-name>
    <param-value>classpath:rabbit-context.xml</param-value>
   </init-param>
   <load-on-startup>1</load-on-startup>
  </servlet>
  
  <servlet-mapping>
   <servlet-name>RabbitMqConsumerService</servlet-name>
   <url-pattern>/</url-pattern>
  </servlet-mapping>
  
  <listener>
   <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
  </listener>
  
  <context-param>
   <param-name>contextConfigLocation</param-name>
   <param-value>classpath:rabbit-context.xml</param-value>
  </context-param>

Step 3: Update the spring context xml with rabbit connection, exchange and queue details
<context:component-scan base-package="com.spring.amqp" />

Step 4: Create RabbitConfiguration.java file
@Configuration
@ComponentScan("com.spring.amqp") 
@EnableRabbit
public class RabbitConfiguration {

 @Bean
 public ConnectionFactory connectionFactory() {
  CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
  connectionFactory.setUsername("guest");
  connectionFactory.setPassword("guest");
  connectionFactory.setPort(5672);
  return connectionFactory;
 }

 @Bean
 public AmqpAdmin amqpAdmin() {
  return new RabbitAdmin(connectionFactory());
 }

 @Bean
 public RabbitTemplate rabbitTemplate() {
  return new RabbitTemplate(connectionFactory());
 }

 @Bean
 public Queue myQueue() {
  return new Queue("myQueue1", true);
 }

 @Bean
 public FanoutExchange exchange() {
  FanoutExchange exchange = new FanoutExchange("myExchange");
  return exchange;
 }

 @Bean
 public Binding binding() {
  return BindingBuilder.bind(myQueue()).to(exchange());
 }
 
 @Bean(name="rabbitListenerContainerFactory")
 public SimpleRabbitListenerContainerFactory listenerFactory(){
  SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  factory.setConnectionFactory(connectionFactory());
  return factory;
 }

}

Step 5: Create main class(com.spring.amqp.FanoutExchangeConsumer) to send and receive messages using RabbitTemplate
@Service
public class FanoutExchangeConsumer{

 @RabbitListener(queues="myQueue")
 public void processMessage(String msg) {
  System.out.println(msg);
 }

}

When the queue receives a message, the consumer application will automatically process them.

I have attached the sample Producer and consumer application for reference. Deploy the consumer application on the tomcat server and send a sample message using the producer application. 

Tuesday, 30 June 2015

Implementing AMQP based messaging solutions using Spring concepts for RabbitMQ Fanout Exchange (XML based configuration with listener)

In a real world scenario, we won't run a java class every time we need to retrieve messages from the queue. Ideally we would configure a listener which would retrieve the messages automatically for us.

In this example we will create a java application(Producer) which will send the messages to the queue and a web application(Consumer) which will receive messages from RabbitMQ using the listener configured.


Design:
  1. Exchange type: Fanout exchange
  2. Input argument: String
  3. Queue argument: default arguments
  4. Configuration type: xml based configuration

In this tutorial, we will create 2 queues (myQueue1 and myQueue2) and bind them to fanout exchange (myExchange) created. In our main class of our producer application, we send the messages to the exchange and retrieve message from the queues in our consumer applications. 



Prerequisites:
  1. RabbitMQ is installed locally
  2. Maven is configured
  3. Tomcat server is configured

Producer application:

If you have gone through my previous post, then you already know how to create a java application to send the messages to the queue. Only change you need to make here is, in the main class we just need to send the message (in the previous example we were sending and receiving messages in the same class).

     AbstractApplicationContext context = new ClassPathXmlApplicationContext("rabbit-context.xml");
     RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
     rabbitTemplate.convertAndSend("Message");


Consumer application:
As shown in the above diagram, each consumer application is bound to single queue. Below you will find the code needed to receive message from one of the queue. In order to create second consumer application, we just need to update the queue name in step 4

Step 1: Update the pom.xml with spring-amqp and spring-core dependencies
 <dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit</artifactId>
  <version>1.4.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-core</artifactId>
  <version>3.2.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-context</artifactId>
  <version>3.2.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-web</artifactId>
  <version>3.2.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-webmvc</artifactId>
  <version>3.2.2.RELEASE</version>
 </dependency>


Step 2: Update the web.xml file to read the rabbit configuration xml during server start up
<servlet>
 <servlet-name>RabbitMqConsumerService</servlet-name>
 <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
 <init-param>
  <param-name>contextConfigLocation</param-name>
  <param-value>classpath:rabbit-context.xml</param-value>
 </init-param>
 <load-on-startup>1</load-on-startup>
</servlet>

<servlet-mapping>
 <servlet-name>RabbitMqConsumerService</servlet-name>
 <url-pattern>/</url-pattern>
</servlet-mapping>

<listener>
 <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>

<context-param>
 <param-name>contextConfigLocation</param-name>
 <param-value>classpath:rabbit-context.xml</param-value>
</context-param>

Step 3: Update the spring context xml with rabbit connection, exchange and queue details
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
 xsi:schemaLocation="http://www.springframework.org/schema/rabbit
           http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
           http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"> 
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
 <property name="location">
  <value>classpath:RabbitConfig.properties</value>
 </property>
</bean>

<rabbit:connection-factory id="rabbitConnectionFactory"
 host="${rabbit.host}" username="${rabbit.username}" password="${rabbit.password}"
 port="${rabbit.port}"/>

<rabbit:admin connection-factory="rabbitConnectionFactory" />

<rabbit:template id="rabbitTemplate"
 connection-factory="rabbitConnectionFactory" exchange="${rabbit.exchange}" />

<rabbit:queue name="${rabbit.queue}" />

<rabbit:fanout-exchange name="${rabbit.exchange}">
 <rabbit:bindings>
  <rabbit:binding queue="${rabbit.queue}" />
 </rabbit:bindings>
</rabbit:fanout-exchange>

<bean id="messageListener" class="com.spring.amqp.FanoutExchangeConsumer" />

<rabbit:listener-container
 connection-factory="rabbitConnectionFactory">
 <rabbit:listener queues="${rabbit.queue}" ref="messageListener"
  method="processMessage" />
</rabbit:listener-container>
In the xml file, we declare connection factory, rabbit template, queues, bindings, listener etc. Among these few attributes are optional depending on the design or development approach of your application. In order to understand more about each attribute please click here

Step 4: Create RabbitConfig.properties file
rabbit.host=localhost
rabbit.username=guest
rabbit.password=guest
rabbit.port=5672
rabbit.exchange=myExchange
rabbit.queue=myQueue1

Step 5: Create main class(com.spring.amqp.FanoutExchangeConsumer) to send and receive messages using RabbitTemplate
 public void processMessage(String msg) {
  System.out.println(msg);
 }

When the queue receives a message, the consumer application will automatically process them because of the listener configured.

I have attached the sample Producer and consumer (Consumer1 and Consumer2) applications for reference. Deploy both the consumer applications on the tomcat server and send a sample message using the producer application.

In the next tutorial, we will look how to configure fanout exchange which accepts attributes like content type, content length etc along with the message data

Sunday, 22 March 2015

Implementing AMQP based messaging solutions using Spring concepts for RabbitMQ Fanout Exchange (XML based configuration)


In this example we will create a single java application which will send and receive messages from RabbitMQ using spring amqp.

Design:
  1. Exchange type: Fanout exchange
  2. Input argument: String
  3. Queue argument: default arguments
  4. Configuration type: xml based configuration
Note about design: In this example we will start with the basic design and then try to improve on this in the coming examples
  • We can pass String or Message(org.springframework.amqp.core.Message) as input argument. We will see what are the advantages of using each type in the coming posts
  • When we create a queue, it is created with few default values like durable as true, auto delete as false etc, we will try to understand what each value means and also configure other optional parameters like TTL, Dead letter exchange etc and understand the behavior
  • We will also look how to configure using annotations in the comings tutorials

In this tutorial, we will create 2 queues (myQueue1 and myQueue2) and bind them to fanout exchange (myExchange) created. In our main class, we send the messages to the exchange and retrieve message both the queues which are bound to this exchange.



Prerequisites:
  1. RabbitMQ is installed locally
  2. Maven is configured

Step 1: Update the pom.xml with spring-amqp and spring-core dependencies
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.4.2.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>3.2.2.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>3.2.2.RELEASE</version>
        </dependency>

Step 2: Update the spring context xml with rabbit connection, exchange and queue details
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
 xsi:schemaLocation="http://www.springframework.org/schema/rabbit
           http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
           http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"> 
<rabbit:connection-factory id="connectionFactory"
        host="localhost" username="guest" password="guest" />

    <rabbit:template id="rabbitTemplate"
        connection-factory="connectionFactory" exchange="myExchange" />

    <rabbit:admin connection-factory="connectionFactory" />

    <rabbit:queue name="myQueue1" />
    <rabbit:queue name="myQueue2" />

    <rabbit:fanout-exchange name="myExchange">
        <rabbit:bindings>
            <rabbit:binding queue="myQueue1" />
            <rabbit:binding queue="myQueue2" />
        </rabbit:bindings>
    </rabbit:fanout-exchange>
In the xml file, we declare connection factory, rabbit template, queues, bindings etc. Among these few attributes are optional depending on the design or development approach of your application. In order to understand more about each attribute please click here

Step 3: Create main class to send and receive messages using RabbitTemplate
     AbstractApplicationContext context = new ClassPathXmlApplicationContext("rabbit-context.xml");
     RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
     rabbitTemplate.convertAndSend("Message");
        
     String msg1  = (String) rabbitTemplate.receiveAndConvert("myQueue1");
     System.out.println(msg1);
     String msg2  = (String) rabbitTemplate.receiveAndConvert("myQueue2");
     System.out.println(msg2);

When we run the above class, rabbitTemplate sends the message string to the exchange which is configured in the rabbit-context.xml and then receive the message from the one of the queues specified in the argument of the receive method.

In the next tutorial, we will look at how to add a listener class so that we process the messages automatically

Rabbit MQ basic concepts

In this post we will try to understand the basic concepts of Rabbit MQ, components involved in transmission of messages, basic api's required in spring-amqp and so on...

RabbitMQ: RabbitMQ is open source message broker software that implements the Advanced Message Queuing Protocol (AMQP).

AMQP: The Advanced Message Queuing Protocol (AMQP) is an open standard application layer protocol for sending and receiving messages between distributed systems.

Message Brokers: Message brokers receive messages from the publishers and route them to consumers.

Bindings: Relationship between the exchange and the queue is called binding

Rabbit MQ working:
  1. Messages are published to the exchange
  2. Exchange distributes the message copies to queues using binding rules
  3. Consumer application receives the messages. In AMQP model there are two ways for applications to consume
    1. Have message delivered to the consumer application(push API)
    2. Fetch messages as needed(pull API)


Exchange types: Exchanges take a message and route it into zero or more queues. The routing algorithm used depends on the exchange type and bindings
There are 4 types of exchanges:
  1. Direct exchange: A direct exchange delivers messages to queues based on the message routing key.
  2. Fanout exchange: A fanout exchange routes messages to all of the queues that are bound to it and the routing key is ignored.
  3. Topic exchange: Topic exchanges route messages to one or many queues based on matching between a message routing key and the pattern that was used to bind a queue to an exchange.
  4. Headers exchange: A headers exchange is designed for routing on multiple attributes that are more easily expressed as message headers than a routing key.
Message acknowledgements
Consumer applications may fail to process the messages due various reasons like network issue or system failure etc.
AMQP specification proposes two choices for this:
  1. Delete the message from the queue, when broker sends the message to the application (automatic acknowledgement model: using either basic.deliver or basic.get-ok AMQP method)
  2. Delete the message from the queue, when the application sends back the acknowledgement (explicit acknowledgment model: using basic.ack AMQP method)
Important Spring AMQP API’s for basic operations:

spring-amqp is the base abstraction and spring-rabbit is the RabbitMQ implementation
There are mainly 3 important basic classes we need to know before starting the development:
  1. CachingConnectionFactory: Used for creating connections
  2. RabbitAdmin: Used for declaring exchanges, queues, bindings
  3. RabbitTemplate: Specifies basic set of AMQP operations
In the next post, we will see how to configure fanout exchange using spring amqp and exchange messages between producer and consumer applications