5 Events Bus API - Reference Documentation
Authors: Marc Palmer (marc@grailsrocks.com), Stéphane Maldini (smaldini@vmware.com)
Version: 1.0.0
Table of Contents
5 Events Bus API
Why an events bus ? Today's applications rely more and more on non-blocking processing, elasticity and modularity. An events bus loosely couples modules, enabling different codes and frameworks to work together, because the right tool for the right purpose paradigm is becoming a reality. The bus may also support publish/subscribe pattern, distributing the same message across the handling modules and giving an excellent opportunity to deploy the same application into a cluster or cloud .Within the bus, an event is often as simple as a "callback" with no parameters, but usually there is some extra state passed as an event object. An event can be sent to multiple listeners, and any result returned from any listeners is passed back to the original sender of the event. An event belongs to a "topic" and often has a "subject". The topic is like a channel that identifies the kind of events. The optional subject is the object that the event "happened to". So for example a simple "app started" notification has no subject but may have topic "grails", but a "user logged in" even may have topic "security" and subject set to the user principal supplied by the security plugin you are using.With Platform-Core plugin we have implemented a couple of features and artifacts to let you simply manage an event bus and get maximum flexibility when required :- Sending Events methods injected in your Domains, Controllers and Services
- @Listener annotations for your Services methods.
- Events mapping DSL artifact to select and control events topics
- Spring beans with access to underlying API
- More cool stuff with Events Spring Integration and Events Push
- Simple config keys
- Send events :
- Write listeners (or event handler, or event reactor or whatever you call it):
5.1 Sending Events
Sending an event is simple. You only need to remind 1 method name and 2 different signatures:- event(topic, [data, params, callbackClosure])
- event(Map args, [callbackClosure])
- Topic argument is a String which represents channel subscribed by listeners.
- optional Data argument is an Object - preferrably Serializable for IO facilities - which represents the subject of your event such as a domain class.
- optional Params argument is a Map which represents sending behaviors including namespace.
- optional callbackClosure is a Closure triggered after an event completion.
- The map notation allows you to reuse the same arguments than params plus topic for topic, data for data and for (shortcut for 'namespace'). If you specify params, it will use it for the params argument otherwise the first level map is used as params.
Key | Type | Default | Description |
---|---|---|---|
fork | Boolean | false | Force the event to reuse the caller thread, therefore executing the method synchronously and propagating any errors. |
namespace / for | String | 'app' | Target a dedicated topic namespace. To avoid overlapping topic names, the events bus supports a scoping concept called namespace. E.g. 'gorm' is used by gorm events and 'browser' is used for Javascript listeners in events-push plugin. |
onReply | Closure{EventReply reply} | Same behavior than callbackClosure argument, overrides it if both are defined. | |
onError | Closure{List<Exception> errors} | If exceptions has been raised by listeners, this callback will be triggered. If undefined, exceptions will be propagated on EventReply.getValue(s). | |
gormSession | Boolean | true | Opens a GORM session for the new thread which carries event execution. |
timeout | Long | Define a maximum time in millisecond for the event execution. | |
headers | Map<String, Serializable> | Additional headers for the event message enveloppe. |
- List<Object> getValues() : Returns as many values as listeners has replied.
- Object getValue() : First element of getValues().
- int size() : Invoked listeners count.
- List<Throwable> getErrors() : Available errors.
- boolean hasErrors() : Scans for any errors.
- EventReply waitFor() : blocks current thread and return this reply.
- EventReply waitFor(long time) : blocks current thread for T milliseconds and returns this reply.
Events workflow
Events can be sent from domains, services and controllers artefacts by using EventReply event(String topic, Object data) . Platform-core Events bus provides a non-blocking way to send events by default, however you can block on several methods from EventReply :- size
- waitFor
- get
- getValues
- getValue
Non forked events
If you want to reuse the current thread and force synchronous processing, use the fork param. Be aware that each exception will be directly propagated to caller even without using blocking methods except if onError parameter is used.{docx} class SomeController{ def logout(){ def reply = event('logout', session.user, [fork:false]) //block for processing //no need to wait for reply since it has been populated on event call. render reply.value } } {docx}Assigning a namespace
All listeners get a property called namespace which prevents topic naming collisions and undesired events. By default, they are all assigned to app. This is the same default used when you send an event, but what if you want to reach others namespaced listeners, like 'browser' ones if you use events-push plugin ? Simply use namespace argument or use for if you stick with Map notation.{docx} class SomeController{ def logout(){ //we use the Map form, the namespace argument is identified by the 'for' key event for:'browser', topic:'logout', data:session.user } } {docx}It's mandatory to declare namespace when using events bus from a plugin in order to avoid any conflicts.
Wildcard support
It's possible to call multiple topics/namespaces in a single shot using wildcard as the last character.{docx} class SomeController{ def logout(){ /* We send to every listeners starting with "chat-" on every namespaces starting with "role-" */ event for:'role-', topic:'chat-', data:session.user //Here we can trigger every listeners in the default namespace 'app' event '*' } } {docx}This feature will probably evolve to a smarter implementation behaving like UrlMappings and authorizing substring captures
5.2 Listening Events
Listening for events simply requires registering the method that should receive the event notifications.There are few ways to register events.Defining listeners at compile time
Within Grails services you can use the @Listener annotation. It takes a topic string, but you can omit it and use the method name as the topic to listen for:{docx} class SomeService{ @grails.events.Listener(topic = 'userLogged') def myMethod(User user){ } //use 'mailSent' as topic name @grails.events.Listener def mailSent(){ } } {docx}Event methods can define a single argument, and the value is the object sent with the event. Usually this is the "subject" of the event. However an event is carried by an enveloppe called EventMessage which contains several useful metadata like additionnal headers, current topic :{docx} class SomeService{ @grails.events.Listener(topic = 'userLogged') def myMethod(org.grails.plugin.platform.events.EventMessage userMessage){ println userMessage.headers // display opt headers println userMessage.event // displays current topic println userMessage.data // displays data } } {docx}If a listener argument type is not assignable to an event data type, the event silently skips the mismatching listener. If you want to catch every event types, use Object type or if the argument is not necessary, do not declare it.Filtering on the EventMessage<D> generic type doesn't work, e.g. EventMessage<Book> won't prevent EventMessage<Author> invokation. For such fine grained control, you can rely on Events Artifact
Namespacing
Your declared events belongs to the app namespace, unless you tune it using the namespace argument or the Events DSL we will introduce later.{docx} class SomeService{ @grails.events.Listener(topic = 'userLogged', namespace = 'security') def myMethod(User user){ } //will subscribe this method to topic 'afterInsert' on namespace 'gorm' @grails.events.Listener(namespace = 'gorm') def afterInsert(User user){ } } {docx}Remember that you will need to specify the scope when triggering events if you customize it with a different value than app :{docx} class SomeController{ def myAction(){ event for:'security', topic:'userLogged', data:session.user } } {docx}It's mandatory to declare namespace when using events bus from a plugin in order to avoid any conflicts.
Proxy (AOP) support
By default, listeners try to call the original method (unproxified bean). Using proxySupport you can tweak this setting :{docx} class SomeService{ static transactional = true //Will invoke transactional logic, similar to someSerice.myMethod() @grails.events.Listener(proxySupport=true) def myMethod(User user){ }} {docx}Dynamic listeners
Some edge cases need runtime registration. If you meet this use case, use the injected on method :{docx} class SomeController{ def testInlineListener = { //register with 'logout' topic on 'app' default namespace def listener = on("logout") {User user -> println "test $user" } render "$listener registered" } def testInlineListener2 = { //register a 'gorm' namespaced handler on 'afterInsert' topic. def listener = on("gorm", "afterInsert") {Book book -> println "test $book" } render "$listener registered" } } {docx}Wildcard support
Capturing a wider group of events can be useful, specially for monitoring purposes. It's possible to listen for multiple topics/namespaces in a single shot using wildcard as the last character.{docx} class SomeService{ @grails.events.Listener(namespace='role-', topic = 'chat-') def myMethod(org.grails.plugin.platform.events.EventMessage userMessage){ println userMessage.namespace println userMessage.event } } {docx}This feature will probably evolve to a smarter implementation behaving like UrlMappings and authorizing substring captures
Listener ID
Registered listeners generate a unique id (ListenerId) applying the following pattern :[namespace://]topic[:package.Class][#method][@hashcode]
The above square brackets determine each optional part of the sequence id thus allowing to target group of listeners depending
of the known arguments: namespace, class, method, hashcode.This pattern is useful when using countListeners, removeListeners or extensions. For instance, overriding
a generated channel with events-si plugin requires to use namespace://topic
if namespace is different from "app".
Another example to count listeners:
{docx}
//count every listeners subscribed to 'mytopic' inside TestService
countListeners("mytopic:my.TestService")//count every listeners using gorm namespace
countListeners("gorm://*")//remove every listeners in TestService
removeListeners("*:my.TestService")
{docx}Reloading in Development mode
It works.5.3 Replying from Listeners
Usually, an event is fired and forgot . In some cases, you may expect an answer to transform your messaging architecture into a controlled flow. For instance, a negative reply can be used in GORM events to veto database writing for the current subjet. Another usual example is the aggregation of multiple workers products.Simple reply
Replying is a simple matter of returning an object from the listener method :{docx} class SomeService{ @Listener def logout(User user){ Date disconnectDate = new Date() //do something with user return disconnectDate } } {docx}If listeners return non null objects, the caller can access them through the EventReply enveloppe returned immediatly after calling event method. The other option is the use of a reply handler :{docx} class SomeController{ def logout(){ def reply = event topic:"logout", data:session.user, fork:false render reply.value //display value //Using callback closure def replyHandler = {EventReply reply-> } event topic:"logout", data:session.user, onReply:replyHandler //Or as last argument event(topic:"logout", data:session.user){ EventReply r-> } //EventReply object is a Future implementation def reply_future = event topic:"logout", data:session.user render reply_future.get(30, TimeUnit.SECONDS) } } {docx}Whenever an event is triggered, a task is submitted into the events bus thread pool and a Future returned, wrapped into EventReply. It's also planned to fully support reply-address pattern in a future version (replyTo parameter) which brings interesting features out of the box : non blocking response, streaming handler response one by one, forwarding using topic name instead of closure…
Multiple replies
Multiple listeners can return values for the same topic/namespace. In this case, EventReply will wait for all handlers before returning any value. Remember that a valid result is a non null value, hence why even if 3 handlers have reacted but only 2 did return something, then you will only see 2 values in the EventReply.values.{docx} class SomeController{ def logout(){ def reply = event topic:"sendMails", data:session.user //wait for all listeners and then display the first value from the aggregated results render reply.value //display all results as List render reply.values } } {docx}Exceptions
Because no code is perfect, exceptions can happen in the event process for 3 reasons :- RuntimeException in one or more handlers
- InterruptedException if the process has been cancelled
- TimeoutException if the maximum process time has been reached (timeout parameter)
Waiting replies
In domains, services and controllers artefacts you can wait for events using "EventReply waitFor(EventReply… eventReplies)". It accepts as many events replies you want and returns the same array for functiunal programming style. EventReply also have a waitFor method for one-line waiting.{docx} class SomeController{ def logout(){ def reply = event('logout', session.user) def reply2 = event('logout', session.user) def reply3 = event('logout', session.user) waitFor(reply,reply2,reply3).each{EventReply reply-> render reply.value +'</br>' } //same with 20 seconds timeout on each reply waitFor(20, TimeUnit.SECONDS, reply,reply2,reply3).each{EventReply reply-> render reply.value } //other style : event('logout', session.user).waitFor() //blocks event event('logout', session.user).waitFor(2000) //blocks event for maximum 2 seconds } } {docx}5.4 Routing configuration -- The XxxEvents Artifact
An extensible Events DSL is available in grails-app/conf for routing configuration. This artifact does detail behaviors of event method by selecting topics and namespaces to apply :- Filtering
- Disabling
- Sending behaviors
- Extensions
- Security
- Declarations
The DSL is intended to evolve. One of the most wanted features is topic/namespace declaration: assigning a definition to a property would generate an injectable eponym bean with streaming methods.The DSL requires to assign a closure to an events variable. Each call is a definition, the method name is parsed as a topic name and key/value arguments are definitions attributes. Wildcard topics/namespaces are supported as well.An Events artifact is a script with some bound variables:
Variable | Description |
---|---|
grailsApplication | Grails application object, retrieves artifacts, context etc. |
ctx | Spring context, useful for beans access, e.g. ctx.myService.method() |
config | Configuration object |
Attribute name | Type | Default | Description |
---|---|---|---|
namespace | String | "app" | Define which namespace the current definition is bound |
filter | Closure(Object) Closure(EventMessage) Class | If a closure is passed, the return value matched as the condition for event propagation. If a class is passed, the subject data type must match. | |
disabled | boolean | false | Disable event propagation |
fork | boolean | false | Use the current thread for event processing (blocking call) |
onError | Closure(List<Throwable>) | Default onError handler for the current topic(s) | |
onReply | Closure(EventReply) | Default onReply handler for the current topic(s) | |
timeout | Long | Default timeout for execution time, throwing a TimeoutException and calls handlers | |
* | * | Any attributes can be written to be used by plugins through EventDefinition.othersAttributes |
roadmap
//not yet implemented: Assigning and merging definitions //myTopic = testTopic4(filter:{i>2}) //myTopic = testTopic4(filter:{i<4}) //not yet implemented: Enabling security context for target listeners //testTopic5 secured:true //not yet implemented: Topic Forwarding //testTopic6 to:'anotherTopic' //not yet implemented: Topic Handlers //testTopic9 onError:'anotherTopicErrors', onReply:'anotherTopicReplies'} {docx}
Reloading in Development mode
It works.5.5 Listening GORM events
Starting from Grails 2, the Events Bus supports GORM events.GORM Listeners
To listen for GORM, simply declare listeners on the gorm namespace using the following supported topics table :Event Type | Target Topic |
---|---|
PreInsertEvent | beforeInsert |
PreUpdateEvent | beforeUpdate |
PreDeleteEvent | beforeDelete |
ValidationEvent | beforeValidate |
PostInsertEvent | afterInsert |
PostUpdateEvent | afterUpdate |
PostDeleteEvent | afterDelete |
SaveOrUpdateEvent | onSaveOrUpdate |
Filtering with Events Artifact
Setting a filter through an Events artifact allows more fined control and efficient selection since it prevents events to be propagated :{docx} events = { 'afterInsert' namespace:'gorm', filter:Book 'afterDelete' namespace:'gorm', filter:{it.id > 5} 'afterUpdate' namespace:'gorm', filter:{it in Book || it in Author} 'beforeDelete' namespace:'gorm', disabled:true } {docx}
GORM may generate tons of events. Consider using it wisely, combine it with routing filtering.
You can also totally disable gorm bridge by using events.gorm.disabled
configuration key.
Threading behaviors
GORM Listeners are executed in the same thread than the caller in order to reuse the current opened session, if any. Avoid blocking logic if possible or use the listener body to call another event.Vetoing changes
If a listener handles one of the before* topics and returns a boolean value, it becomes part of the vetoing chain:- Returning false will cancel the current database write
- Returning true will just let the chain continuing
5.6 Spring Beans
Plugin developpers and any crazy tweakers may need to override one or more Events Bus beans, like the Spring Integration plugin does. The grailsEvents bean is also useful to inject events methods into unhandled artifacts (other than domain, service, controller).Bean Name | Type | Default Implementation | Description |
---|---|---|---|
grailsEvents | org.grails.plugin.platform.events .Events | org.grails.plugin.platform.events .EventsImpl | Main events gateway which contains injected methods in artifacts |
grailsEventsPublisher | org.grails.plugin.platform.events .publisher.EventsPublisher | org.grails.plugin.platform.events .publisher.DefaultsEventsPublisher | Publisher bean, triggers events. To be implemented by extensions if required (e.g. events-si) |
grailsEventsRegistry | org.grails.plugin.platform.events .registry.EventsRegistry | org.grails.plugin.platform.events .registry.DefaultsEventsRegistry | Registry bean, store listeners and route events. To be implemented by extensions if required (e.g. events-si) |
gormTopicSupport | org.grails.plugin.platform.events .dispatcher.GormTopicSupport | org.grails.plugin.platform.events .dispatcher.GormTopicSupport2X | Translates gorm events to topic names and processes veto(s). |
grailsEventsGormBridge | org.grails.plugin.platform.events .publisher.GormBridgePublisher | Listens for GORM events and publishes to the right bus using gormTopicSupport. | |
grailsTopicExecutor | org.springframework .core.task.TaskExecutor | org.springframework.scheduling .concurrent.ThreadPoolTaskExecutor | Carries on events execution. |
5.7 Securing events
To be implemented. You can still use headers or data to pass security context for instance. M4 release will bring platform-security abstraction ready for events.
5.8 Extensions
Writing extensions is one of the greatest habbits of grails developpers. Groovy and Grails communites are like that, pragmatic and pleasant.Having seen the referenced beans in the previous chapter should already give you ideas to improve or customize your needs. There are two available examples of extensions:- events-si : This plugin overrides the publisher and registry beans in order to replace the default mechanisms with the much
- events-push : This plugin registers new Listeners from your cool browsers using javascript, authorized through the
5.9 Configuration properties
Based on Platform-Core configuration mechanism, the plugin provides the following Events-Bus related keys:Configuration Key | Type | Default | Description |
---|---|---|---|
grails.plugin.platform.events.disabled | Boolean | false | Fully disable Events Bus mechanism, no events methods will be injected |
grails.plugin.platform.events.poolSize | Integer | 10 | Allow X concurrent workers to process events |
grails.plugin.platform.events.gorm.disabled | Boolean | false | Disable GORM bridge, stopping GORM events from being published |
grails.plugin.platform.events.catchFlushException | Boolean | true | Catch any GORM flushing exceptions which could be noisy specially when vetoing changes |