1. Introduction
RxJava is a popular library for composing asynchronous and event-based programs by using observable sequences.
RxJava helps you build reactive applications and an increasing number of libraries take advantage of RxJava as the defacto standard for building Reactive applications.
In GORM 6.0, a new implementation of GORM called RxGORM has been introduced that builds on RxJava helping you building reactive data access logic using the familiar GORM API combined with RxJava.
This plugin helps integrate RxJava with the controller layer of Grails to complete the picture and enable complete end-to-end integration of RxJava with Grails.
2. Getting Started
To get started you first need to declare a dependency on the RxJava plugin for Grails:
dependencies {
...
compile 'org.grails.plugins:rxjava"
}
If you wish to use RxJava 2.x then you can declare the rxjava2
dependency:
dependencies {
...
compile 'org.grails.plugins:rxjava2:2.0.0'
}
RxGORM does not currently support RxJava 2.x due to the fact that the MongoDB driver only supports RxJava 1.x. However the remaining features such as server sent events work with both RxJava and RxJava 2.x. |
You then may want to include an implementation of RxGORM, for the examples in this documentation we will use RxGORM for MongoDB:
dependencies {
...
compile 'org.grails.plugins:rx-mongodb'
}
The plugin includes a generate-rx-controller
command for the Grails command line that can help you generate a controller for a domain class that implements the RxEntity
trait:
$ grails generate-rx-controller example.Book
3. Using RxGORM with Controllers
The RxJava plugin for Grails itself has no knowledge of RxGORM, it simply provides an API to help compose controllers responses from rx.Observable
objects.
The grails.rx.web.Rx class provides static methods for mapping observables to Grails responses.
To use the class you can add a static import to any controller:
import static grails.rx.web.Rx.*
import static rx.Observable.*
Alternatively you can implement the RxController
trait and use the rx
property which is an instance of RxHelper
:
import grails.rx.web.*
import static rx.Observable.*
class MyController implements RxController {
}
Also notice you typically statically import the rx.Observable
factory methods.
Generally when writing controllers that work with RxJava you must return an rx.Observable
from the controller.
The RxJava plugin for Grails will then automatically create an asynchronous, non-blocking request that subscribes the rx.Observable
and processes the emitted results.
When working with RxGORM this means that it is possible to write simple logic that returns an observable. For example:
def show() {
Book.get(params.id?.toString())
}
The above example uses the RxGORM get(id) method that returns an rx.Observable
.
The framework will subscribe to the rx.Observable
and if it returns a result will automatically call the respond
method.
This the the most simple case and doesn’t require using any of methods of the grails.rx.web.Rx
class.
However, you may want to customize the response and this is where the the API provided by RxController
comes in. In the following section we will go through some examples of how to use the class to formulate responses.
3.1. Retrieving an Object Reactively
The following example shows how you can retrieve an entity reactively using RxGORM and the RxJava plugin:
def show() {
Book.get(params.id?.toString()) (1)
.map { Book book ->
rx.respond(book) (2)
}
.switchIfEmpty( (3)
just( rx.render(status: NOT_FOUND) ) (4)
)
}
1 | First we invoke the RxGORM get(id) method which returns an rx.Observable |
2 | If the rx.Observable emits a result, map it to the respond method using the Observable.map method. |
3 | If the rx.Observable does not emit a result use the switchIfEmpty to return a different rx.Observable |
4 | The rx.Observable.just(..) method is used to render a 404 status |
What is import to understand is that the closure passed to the map
method is step <2> does not have access to the web request as it is executed in a completely different thread.
This means that you cannot use the regular respond
method defined by the controller class and must instead use the respond
method statically imported from grails.rx.web.Rx
.
Generally if you try to access any objects related to the request within a closure passed to the methods of an rx.Observable
this will lead to an exception.
It is therefore important to ensure data you need to obtain from the request, such as parameters and so on, is obtained before hand. For example:
def show() {
String author = params.author (1)
Book.get(params.id?.toString())
.map { Book book ->
rx.render view:"book", model:[book:book, author:author] (2)
}
}
1 | The author parameter is retrieved in the controller action, not in the body of the map method |
2 | This allows us to pass the parameter to the model |
3.2. Listing Objects Reactively
Implementing the index
action to list objects is an interesting example and introduces some important concepts.
Below you can see an example implementation:
def index(Integer max) {
params.max = Math.min(max ?: 10, 100)
zip( Book.list(params), (1)
Book.count() ) { List bookList, Number count -> (2)
rx.render view:"index", (3)
model:[bookList: bookList, bookCount: count]
}
}
1 | In this case the zip method returns an Observable that combines the list of books and the count of the number of books obtained from the Book.count() method, which also returns an rx.Observable |
2 | The closure is passed the result of the zip method which is the list of books and the count |
3 | Finally, the RxHelper.render(..) method is used to indicate that the index view should be rendered with the appropriate model |
The critical thing to understand here is the usage of the zip
method. The zip
method allows you to combine multiple rx.Observable
queries and formulate a result from them. The queries are executed asynchronously in a non-blocking manner and the final result passed to the view for rendering.
3.3. Saving Objects Reactively
Saving RxGORM objects presents a different set of challenges. One challenge is you typically need to bind request data to the object in a non-blocking manner.
With this in mind the Rx.bindData(..) method provides the ability to bind an incoming JSON body to an object.
The bindData
method returns an rx.Observable
that emits the object once binding is complete.
The following example is a full implementation of saving an object:
def save() {
rx.bindData(new Book(), request) (1)
.switchMap { Book book -> (2)
if(book.hasErrors()) {
just(
rx.respond( book.errors, view:'create') (3)
)
}
else {
book.save(flush:true) (4)
.map { Book b ->
rx.respond b, [status: CREATED, view:"show"] (5)
}
.onErrorReturn { Throwable e -> (6)
if(e instanceof ValidationException) {
rx.respond e.errors, view:'create'
}
else {
log.error("Error saving entity: $e.message", e)
return INTERNAL_SERVER_ERROR
}
}
}
}
}
1 | The bindData method is used to bind the incoming request to a new object |
2 | The switchMap method is used to return another rx.Observable to be processed from the result of the original bindData observable. |
3 | If the object has errors then the errors are rendered using the respond method. |
4 | Otherwise the object is saved using the RxGORM save() method which returns an rx.Observable . |
5 | If the object was successfully saved then the respond method is used to render the object |
6 | Otherwise if an error occurred it is handled appropriately |
4. Server Sent Events
You can write controllers that support Server Sent Events using the RxJava plugin for Grails.
Using the stream method you can return an observable that streams events to the browser.
For example:
def index() {
rx.stream { Subscriber subscriber -> (1)
for(i in (0..5)) {
if(i % 2 == 0) {
subscriber.onNext(
rx.render("Tick") (2)
)
}
else {
subscriber.onNext(
rx.render("Tock")
)
}
sleep 1000 (3)
}
subscriber.onCompleted() (4)
}
}
1 | Call the stream method passing a closure that accepts an rx.Subscriber to start sending events |
2 | Emit a one or many items using onNext |
3 | Call sleep to simulate a slow request |
4 | Call onCompleted to complete the request |
The above example is a simple example that simulates a process that takes a while using the sleep
method, alternating the words "Tick" and "Tock".
On the client side you can register a JavaScript function that uses the controller as an event source:
function tickTock() {
var eventSource = new EventSource("ticktock");
eventSource.onmessage = function(event) {
console.log("data: "+event.data)
document.getElementById('message').innerHTML = event.data;
};
}
tickTock()
The EventSource object is supported in most modern browsers, except Internet Explorer. You can use polyfills JavaScript replacement for IE.
|
In the above example the onmessage
handler will be invoked every time an item is emitted.
If you wish to send a particular named event you can use the name argument of the stream
method:
stream "ticktock", { Subscriber subscriber ->
And then attach an event listener for only that event on the client:
function tickTock() {
var eventSource = new EventSource("ticktock");
eventSource.addEventListener('ticktock', function(event) {
console.log("data: "+event.data)
document.getElementById('message').innerHTML = event.data;
}, false);
}