Sunday, 5 July 2015

Implementing AMQP based messaging solutions using Spring concepts for RabbitMQ Fanout Exchange (XML based configuration with AMQP Message object as input)

In the previous example, we understood how a message is transferred from publisher to the consumer application. We used String as the data format for the same. But generally when two applications interact with each other the data exchange happens via xml or json.


Since we can send messages in many format we need a way to identify the type of message that was sent and any other properties, Spring AMQP defines a message class where we can send these additional properties and actual message in a single instance.


In this example we will create 2 java applications(Producers) which will send xml and json messages to the queue and a web application(Consumer) which will receive messages and based on the content type in the message header process them accordingly.



Design:

  1. Exchange type: Fanout exchange
  2. Input argument: Message (org.springframework.amqp.core.Message)
  3. Queue argument: default arguments
  4. Configuration type: xml based configuration

In this tutorial, we will create 1 queue (myQueue) and bind it to fanout exchange (myExchange) created. In our main class of our producer applications, we send the messages to the exchange and retrieve message from the queue in our consumer application.



Prerequisites:

  1. RabbitMQ is installed locally
  2. Maven is configured
  3. Tomcat server is configured

Producer application 1 (sends xml data):

Step 1: Update the pom.xml with spring-amqp, spring-core and xml dependencies

  <dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit</artifactId>
  <version>1.4.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-core</artifactId>
  <version>3.2.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-context</artifactId>
  <version>3.2.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-oxm</artifactId>
  <version>3.2.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>javax.xml.bind</groupId>
  <artifactId>jaxb-api</artifactId>
  <version>2.0</version>
 </dependency>

Step 2: Update the spring context xml with rabbit connection, exchange and queue details

<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
 xsi:schemaLocation="http://www.springframework.org/schema/rabbit
           http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
           http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">

        <bean
  class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
  <property name="location">
   <value>classpath:RabbitConfig.properties</value>
  </property>
 </bean>

 <rabbit:connection-factory id="rabbitConnectionFactory"
  host="${rabbit.host}" username="${rabbit.username}" password="${rabbit.password}"
  port="${rabbit.port}" />

 <rabbit:admin connection-factory="rabbitConnectionFactory" />

 <rabbit:template id="rabbitTemplate"
  connection-factory="rabbitConnectionFactory" exchange="${rabbit.exchange}" />

 <rabbit:queue name="${rabbit.queue}" />

 <rabbit:fanout-exchange name="${rabbit.exchange}">
  <rabbit:bindings>
   <rabbit:binding queue="${rabbit.queue}" />
  </rabbit:bindings>
 </rabbit:fanout-exchange>


 <bean id="jaxb2Marshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller">
  <property name="classesToBeBound">
   <list>
    <value>com.spring.amqp.model.Order</value>
   </list>
  </property>
 </bean>
In the xml file, we declare connection factory, rabbit template, queues, bindings, listener etc. Among these few attributes are optional depending on the design or development approach of your application. In order to understand more about each attribute please click here

Step 3: Create RabbitConfig.properties file
rabbit.host=localhost
rabbit.username=guest
rabbit.password=guest
rabbit.port=5672
rabbit.exchange=myExchange
rabbit.queue=myQueue

Step 4: Create a POJO class
@XmlRootElement
public class Order {

 private long orderNumber;
 private long custNumber;
 private Date orderDate;
 private String shippingNumber;

        // Create getter and setter methods
        // Override toString method
}

Step 5: Create main class(com.spring.amqp.main.TestFanoutProducer) to send and receive messages using RabbitTemplate

 AbstractApplicationContext context = new ClassPathXmlApplicationContext("rabbit-context.xml");
 RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
 Marshaller converter = (Marshaller) context.getBean("jaxb2Marshaller");
 
 MessageProperties messageProperties = new MessageProperties();
 messageProperties.setTimestamp(new Date());
 messageProperties.setContentType("application/xml");
 
 Order order = new Order();
 order.setOrderNumber(12345);
 order.setCustNumber(65839);
 order.setOrderDate(new Date());
 order.setShippingNumber("ABCD1234");
 
 // Object to XML
 StringWriter writer = new StringWriter();
 converter.marshal(order, new StreamResult(writer));
 String output = writer.toString();
 
 Message message = new Message(output.getBytes(), messageProperties);
 
 rabbitTemplate.convertAndSend(message);



Producer application 2 (sends json data):
Step 1: Update the pom.xml with spring-amqp, spring-core and json dependencies

 <dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit</artifactId>
  <version>1.4.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-core</artifactId>
  <version>3.2.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-context</artifactId>
  <version>3.2.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-oxm</artifactId>
  <version>3.2.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-databind</artifactId>
  <version>2.5.4</version>
 </dependency>


Step 2: Update the spring context xml with rabbit connection, exchange and queue details

<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
 xsi:schemaLocation="http://www.springframework.org/schema/rabbit
           http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
           http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"> 

 <bean
  class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
  <property name="location">
   <value>classpath:RabbitConfig.properties</value>
  </property>
 </bean>

 <rabbit:connection-factory id="rabbitConnectionFactory"
  host="${rabbit.host}" username="${rabbit.username}" password="${rabbit.password}"
  port="${rabbit.port}" />

 <rabbit:admin connection-factory="rabbitConnectionFactory" />

 <rabbit:template id="rabbitTemplate"
  connection-factory="rabbitConnectionFactory" exchange="${rabbit.exchange}" />

 <rabbit:queue name="${rabbit.queue}" />

 <rabbit:fanout-exchange name="${rabbit.exchange}">
  <rabbit:bindings>
   <rabbit:binding queue="${rabbit.queue}" />
  </rabbit:bindings>
 </rabbit:fanout-exchange>
In the xml file, we declare connection factory, rabbit template, queues, bindings, listener etc. Among these few attributes are optional depending on the design or development approach of your application. In order to understand more about each attribute please click here

Step 3: Create RabbitConfig.properties file

rabbit.host=localhost
rabbit.username=guest
rabbit.password=guest
rabbit.port=5672
rabbit.exchange=myExchange
rabbit.queue=myQueue

Step 4: Create a POJO class
public class Order {

 private long orderNumber;
 private long custNumber;
 private Date orderDate;
 private String shippingNumber;

        // Create getter and setter methods
        // Override toString method
}

Step 5: Create main class(com.spring.amqp.main.TestFanoutProducer) to send and receive messages using RabbitTemplate

 AbstractApplicationContext context = new ClassPathXmlApplicationContext("rabbit-context.xml");
 RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
 
 MessageProperties messageProperties = new MessageProperties();
 messageProperties.setTimestamp(new Date());
 messageProperties.setContentType("application/json");
 
 Order order = new Order();
 order.setOrderNumber(12345);
 order.setCustNumber(65839);
 order.setOrderDate(new Date());
 order.setShippingNumber("ABCD1234");
 
 ObjectMapper mapper = new ObjectMapper();
 Message message = new Message(mapper.writeValueAsBytes(order), messageProperties);
 
 rabbitTemplate.convertAndSend(message);







Consumer Application:
Step 1: Update the pom.xml with spring-amqp, spring-core, xml and json dependencies

  <dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit</artifactId>
  <version>1.4.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-core</artifactId>
  <version>3.2.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-context</artifactId>
  <version>3.2.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-web</artifactId>
  <version>3.2.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-webmvc</artifactId>
  <version>3.2.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-oxm</artifactId>
  <version>3.2.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>javax.xml.bind</groupId>
  <artifactId>jaxb-api</artifactId>
  <version>2.0</version>
 </dependency>
 <dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-databind</artifactId>
  <version>2.5.4</version>
 </dependency>

Step 2: Update the spring context xml with rabbit connection, exchange and queue details

<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
 xsi:schemaLocation="http://www.springframework.org/schema/rabbit
           http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
           http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"> 

 <context:component-scan base-package="com.spring.amqp.main" />

 <bean
  class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
  <property name="location">
   <value>classpath:RabbitConfig.properties</value>
  </property>
 </bean>

 <rabbit:connection-factory id="rabbitConnectionFactory"
  host="${rabbit.host}" username="${rabbit.username}" password="${rabbit.password}"
  port="${rabbit.port}" />

 <rabbit:admin connection-factory="rabbitConnectionFactory" />

 <rabbit:template id="rabbitTemplate"
  connection-factory="rabbitConnectionFactory" exchange="${rabbit.exchange}" />

 <rabbit:queue name="${rabbit.queue}" />

 <rabbit:fanout-exchange name="${rabbit.exchange}">
  <rabbit:bindings>
   <rabbit:binding queue="${rabbit.queue}" />
  </rabbit:bindings>
 </rabbit:fanout-exchange>


 <bean id="jaxb2Marshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller">
  <property name="classesToBeBound">
   <list>
    <value>com.spring.amqp.model.Order</value>
   </list>
  </property>
 </bean>

 <bean id="messageListener" class="com.spring.amqp.main.FanoutExchangeConsumer" />

 <rabbit:listener-container
  connection-factory="rabbitConnectionFactory">
  <rabbit:listener queues="${rabbit.queue}" ref="messageListener" />
 </rabbit:listener-container>
In the xml file, we declare connection factory, rabbit template, queues, bindings, listener etc. Among these few attributes are optional depending on the design or development approach of your application. In order to understand more about each attribute please click here

Step 3: Create RabbitConfig.properties file

rabbit.host=localhost
rabbit.username=guest
rabbit.password=guest
rabbit.port=5672
rabbit.exchange=myExchange
rabbit.queue=myQueue

Step 4: Create a POJO class
@XmlRootElement
public class Order {

 private long orderNumber;
 private long custNumber;
 private Date orderDate;
 private String shippingNumber;

        // Create getter and setter methods
        // Override toString method
}


Step 5: Create main class(com.spring.amqp.main.FanoutExchangeConsumer) to send and receive messages using RabbitTemplate

 @Autowired
 Jaxb2Marshaller unmarshaller;

 public void onMessage(Message message){
  Order order = null;
  System.out.println("Message received from queue at " + message.getMessageProperties().getTimestamp() + " is : " + new String (message.getBody()) + " with content type as : "+ message.getMessageProperties().getContentType());

  if(message.getMessageProperties().getContentType().equals("application/xml")){
   InputStream stream = new ByteArrayInputStream(message.getBody());
   Source source = new StreamSource(stream);
   try {
    order = (Order) unmarshaller.unmarshal(source);
   } catch (XmlMappingException e) {
    e.printStackTrace();
   }
   System.out.println(order);
  }else if (message.getMessageProperties().getContentType().equals("application/json")){
   ObjectMapper mapper = new ObjectMapper();
   try {
    order = (Order) mapper.readValue(message.getBody(), Order.class);
   } catch (JsonParseException e) {
    e.printStackTrace();
   } catch (JsonMappingException e) {
    e.printStackTrace();
   } catch (IOException e) {
    e.printStackTrace();
   }
   System.out.println(order);
  }
 }

When the queue receives a message, the consumer application will automatically process them based on the content type value present in the message properties.

I have attached the sample Producer applications(xml and json producers) and consumer application for reference.

In the next tutorial, we will look how to configure fanout exchange using annotations

No comments:

Post a Comment