4 Consuming Messages - Reference Documentation
Authors: Jeff Brown, Peter Ledbrook
Version: 1.0.0
4 Consuming Messages
The plugin provides two simple ways of consuming messages:- from a named Queue
- by subscribing to an exchange (the traditional pub/sub model)
4.1 Pub-Sub
One of the most common messaging models people use involves a producer broadcasting messages to all registered listeners (or more accurately, consumers). This is known as the publish/subscribe model, or pub/sub for short. There are two steps to getting this set up in Grails:- create the exchange you're going to publish messages to
- create some consumers that subscribe to that exchange
grails-app/conf/Config.groovy
.The second step is dead easy with the plugin: create a service with a static rabbitSubscribe
property and a handleMessage()
method. Here's an example:package org.exampleclass SharesService { static rabbitSubscribe = 'shares' void handleMessage(message) { // handle message… } }
shares
, the SharesService
will receive all messages sent to that exchange. Every time a message is received from the broker, the service's handleMessage()
method is called with the message as its argument. We'll talk more about messages shortly.
The rabbitSubscribe
option only makes sense when applied to fanout and topic exchanges.
In the case of a topic exchange, you can filter messages based on the routing key. By default your service will receive all messages, but you can override this with an alternative syntax for rabbitSubscribe
:package org.exampleclass SharesService { static rabbitSubscribe = [ name: 'shares', routingKey: 'NYSE.GE' ] … }
static rabbitSubscribe = [ name: 'shares', routingKey: 'NYSE.GE', encoding: "ISO-8859-1", prefetchCount: 1]
4.2 Manual queue management
The plugin provides a convention based mechanism for associating a listener with a queue. Any Grails Service may express that it wants to receive messages on a specific queue by defining a static property namedrabbitQueue
and
assigning the property a string which represents the name of a queue.package org.grails.rabbitmq.testclass DemoService { static rabbitQueue = 'someQueueName' void handleMessage(message) { // handle message… } }
handleMessage()
method. That's all there is to it! The real trick is to configure your exchanges and queues with appropriate bindings, as we described in the configuration section.If you want more say in the configuration of the underlying listener, then you can also specify a map:static rabbitQueue = [queues: "someQueueName", channelTransacted: true]
4.3 Messages
What is a message? In the examples you've seen in this section, the message has been some arbitrary object but we haven't discussed what the type of that object might be. That's because, it can be pretty much anything! Within the messaging system, the content of a message is simply a byte array - it's up to the producer can consumer to interpret/convert that raw data.Fortunately the plugin (via Spring AMQP) automatically handles messages whose content is in familiar forms, including:- strings
- byte arrays
- maps
- other serializable types
handleMessage()
:package org.grails.rabbitmq.testclass DemoService { static rabbitQueue = 'someQueueName' void handleMessage(String textMessage) { // handle String message… } void handleMessage(Map mapMessage) { // handle Map message… } void handleMessage(byte[] byteMessage) { // handle byte array message… } }
Map
messages! For production systems, we recommend you use strings and byte arrays.Sometimes you want access to the raw message, particularly if you want to look at the message headers. If so, just change the signature of the handleMessage()
method and add an extra option to your rabbitQueue
or rabbitSubscribe
property:package org.grails.rabbitmq.testimport org.springframework.amqp.core.Messageclass DemoService { static rabbitQueue = [queues: 'someQueueName', messageConverterBean: ''] void handleMessage(Message msg) { // Do something with the message headers println "Received message with content type ${msg.contentType};${msg.encoding}" … } }
Message
and add the messageConverterBean
option with an empty string as its value. This disables the automatic message conversion, allowing you to interrogate the raw message as required.