(Quick Reference)

4 Consuming Messages - Reference Documentation

Authors: Jeff Brown, Peter Ledbrook

Version: 1.0.0

4 Consuming Messages

The plugin provides two simple ways of consuming messages:
  1. from a named Queue
  2. by subscribing to an exchange (the traditional pub/sub model)

Which approach you take depends on whether you want to implement the pub/sub messaging model and how much control you need.

4.1 Pub-Sub

One of the most common messaging models people use involves a producer broadcasting messages to all registered listeners (or more accurately, consumers). This is known as the publish/subscribe model, or pub/sub for short. There are two steps to getting this set up in Grails:
  1. create the exchange you're going to publish messages to
  2. create some consumers that subscribe to that exchange

The first step can be done either outside of the Grails application or in the plugin's configuration. If the Grails application is the publisher, then it makes sense to declare the exchange in grails-app/conf/Config.groovy.

The second step is dead easy with the plugin: create a service with a static rabbitSubscribe property and a handleMessage() method. Here's an example:

package org.example

class SharesService { static rabbitSubscribe = 'shares'

void handleMessage(message) { // handle message… } }

As long as the broker contains an exchange with the name shares, the SharesService will receive all messages sent to that exchange. Every time a message is received from the broker, the service's handleMessage() method is called with the message as its argument. We'll talk more about messages shortly.

The rabbitSubscribe option only makes sense when applied to fanout and topic exchanges.

In the case of a topic exchange, you can filter messages based on the routing key. By default your service will receive all messages, but you can override this with an alternative syntax for rabbitSubscribe:

package org.example

class SharesService { static rabbitSubscribe = [ name: 'shares', routingKey: 'NYSE.GE' ] … }

In this example, the service will only receive messages that have a routing key of 'GE'. Of course, you can use standard AMQP wildcards too like 'NYSE.#', which will match all messages with a routing key that starts with 'NYSE.'.

Under the hood, the plugin creates a temporary, exclusive queue for your service which is removed from the broker when your application shuts down. There is no way for you to control the name of the queue or attach another listener to it, but then that's the point in this case. If you do need more control, then you must manage the queues and their bindings yourself.

The map syntax also allows you to customise the properties of the Spring message listener container and the corresponding listener adapter (see the section on advanced configuration for more details on these). For example,

static rabbitSubscribe = [
        name: 'shares',
        routingKey: 'NYSE.GE',
        encoding: "ISO-8859-1",
        prefetchCount: 1]

will set the encoding and prefetch count for just this service listener. This technique is also possible for straight queue listeners as well.

4.2 Manual queue management

The plugin provides a convention based mechanism for associating a listener with a queue. Any Grails Service may express that it wants to receive messages on a specific queue by defining a static property named rabbitQueue and assigning the property a string which represents the name of a queue.

package org.grails.rabbitmq.test

class DemoService { static rabbitQueue = 'someQueueName'

void handleMessage(message) { // handle message… } }

As with the pub/sub model, messages are delivered to the service by invoking the handleMessage() method. That's all there is to it! The real trick is to configure your exchanges and queues with appropriate bindings, as we described in the configuration section.

If you want more say in the configuration of the underlying listener, then you can also specify a map:

static rabbitQueue = [queues: "someQueueName", channelTransacted: true]

The "queues" option can either be a simple queue name or a list of queue names. Again, have a look at the advanced configuration section for information about the extra properties you can set here.

One last subject to discuss is the form that the messages take.

4.3 Messages

What is a message? In the examples you've seen in this section, the message has been some arbitrary object but we haven't discussed what the type of that object might be. That's because, it can be pretty much anything! Within the messaging system, the content of a message is simply a byte array - it's up to the producer can consumer to interpret/convert that raw data.

Fortunately the plugin (via Spring AMQP) automatically handles messages whose content is in familiar forms, including:

  • strings
  • byte arrays
  • maps
  • other serializable types

One manifestation of this support is that different message types may be handled with overloaded versions of handleMessage():

package org.grails.rabbitmq.test

class DemoService { static rabbitQueue = 'someQueueName'

void handleMessage(String textMessage) { // handle String message… }

void handleMessage(Map mapMessage) { // handle Map message… }

void handleMessage(byte[] byteMessage) { // handle byte array message… } }

This is a great convenience, but be aware that using serializable Java objects limits the types of client you can interact with. If all the clients you're interested in are using Spring AMQP, then you should be fine, but don't expect Ruby or Python clients to handle Map messages! For production systems, we recommend you use strings and byte arrays.

Sometimes you want access to the raw message, particularly if you want to look at the message headers. If so, just change the signature of the handleMessage() method and add an extra option to your rabbitQueue or rabbitSubscribe property:

package org.grails.rabbitmq.test

import org.springframework.amqp.core.Message

class DemoService { static rabbitQueue = [queues: 'someQueueName', messageConverterBean: '']

void handleMessage(Message msg) { // Do something with the message headers println "Received message with content type ${msg.contentType};${msg.encoding}" … } }

As you can see, all you have to do is accept an argument of type Message and add the messageConverterBean option with an empty string as its value. This disables the automatic message conversion, allowing you to interrogate the raw message as required.