Friday, 3 July 2015

Implementing AMQP based messaging solutions using Spring concepts for RabbitMQ Fanout Exchange (Annotation based configuration)

In this post we will look at how to develop a consumer application which will receive messages from the RabbitMQ using annotations. If you are not sure about how to develop a producer application to send messages to the queue, please refer to my earlier post. I have also attached the sample application below for quick reference.

Design:
  1. Exchange type: Fanout exchange
  2. Input argument: String
  3. Queue argument: default arguments
  4. Configuration type: annotations based configuration 
In this tutorial, we will create 2 queues (myQueue1 and myQueue2) and bind them to fanout exchange (myExchange) created. In our main class of our producer application, we send the messages to the exchange and retrieve message from the queues in our consumer applications. 


Prerequisites:
  1. RabbitMQ is installed locally
  2. Maven is configured
  3. Tomcat server is configured 

Consumer application:
As shown in the above diagram, each consumer application is bound to single queue. Below you will find the code needed to receive message from one of the queue. In order to create second consumer application, we just need to update the queue name in step 4

Step 1: Update the pom.xml with spring-amqp and spring-core dependencies
 <dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit</artifactId>
  <version>1.4.2.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-core</artifactId>
  <version>4.1.0.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-context</artifactId>
  <version>4.1.0.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-web</artifactId>
  <version>4.1.0.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-webmvc</artifactId>
  <version>4.1.0.RELEASE</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-beans</artifactId>
  <version>4.1.0.RELEASE</version>
 </dependency>

Step 2: Update the web.xml file to read the rabbit configuration xml during server start up
  <servlet>
   <servlet-name>RabbitMqConsumerService</servlet-name>
   <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
   <init-param>
    <param-name>contextConfigLocation</param-name>
    <param-value>classpath:rabbit-context.xml</param-value>
   </init-param>
   <load-on-startup>1</load-on-startup>
  </servlet>
  
  <servlet-mapping>
   <servlet-name>RabbitMqConsumerService</servlet-name>
   <url-pattern>/</url-pattern>
  </servlet-mapping>
  
  <listener>
   <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
  </listener>
  
  <context-param>
   <param-name>contextConfigLocation</param-name>
   <param-value>classpath:rabbit-context.xml</param-value>
  </context-param>

Step 3: Update the spring context xml with rabbit connection, exchange and queue details
<context:component-scan base-package="com.spring.amqp" />

Step 4: Create RabbitConfiguration.java file
@Configuration
@ComponentScan("com.spring.amqp") 
@EnableRabbit
public class RabbitConfiguration {

 @Bean
 public ConnectionFactory connectionFactory() {
  CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
  connectionFactory.setUsername("guest");
  connectionFactory.setPassword("guest");
  connectionFactory.setPort(5672);
  return connectionFactory;
 }

 @Bean
 public AmqpAdmin amqpAdmin() {
  return new RabbitAdmin(connectionFactory());
 }

 @Bean
 public RabbitTemplate rabbitTemplate() {
  return new RabbitTemplate(connectionFactory());
 }

 @Bean
 public Queue myQueue() {
  return new Queue("myQueue1", true);
 }

 @Bean
 public FanoutExchange exchange() {
  FanoutExchange exchange = new FanoutExchange("myExchange");
  return exchange;
 }

 @Bean
 public Binding binding() {
  return BindingBuilder.bind(myQueue()).to(exchange());
 }
 
 @Bean(name="rabbitListenerContainerFactory")
 public SimpleRabbitListenerContainerFactory listenerFactory(){
  SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  factory.setConnectionFactory(connectionFactory());
  return factory;
 }

}

Step 5: Create main class(com.spring.amqp.FanoutExchangeConsumer) to send and receive messages using RabbitTemplate
@Service
public class FanoutExchangeConsumer{

 @RabbitListener(queues="myQueue")
 public void processMessage(String msg) {
  System.out.println(msg);
 }

}

When the queue receives a message, the consumer application will automatically process them.

I have attached the sample Producer and consumer application for reference. Deploy the consumer application on the tomcat server and send a sample message using the producer application. 

No comments:

Post a Comment