1. Introduction

RxJava is a popular library for composing asynchronous and event-based programs by using observable sequences.

RxJava helps you build reactive applications and an increasing number of libraries take advantage of RxJava as the defacto standard for building Reactive applications.

In GORM 6.0, a new implementation of GORM called RxGORM has been introduced that builds on RxJava helping you building reactive data access logic using the familiar GORM API combined with RxJava.

This plugin helps integrate RxJava with the controller layer of Grails to complete the picture and enable complete end-to-end integration of RxJava with Grails.

2. Getting Started

To get started you first need to declare a dependency on the RxJava plugin for Grails:

build.gradle
dependencies {
    ...
    compile 'org.grails.plugins:rxjava"
}

If you wish to use RxJava 2.x then you can declare the rxjava2 dependency:

build.gradle
dependencies {
    ...
    compile 'org.grails.plugins:rxjava2:2.0.0'
}
RxGORM does not currently support RxJava 2.x due to the fact that the MongoDB driver only supports RxJava 1.x. However the remaining features such as server sent events work with both RxJava and RxJava 2.x.

You then may want to include an implementation of RxGORM, for the examples in this documentation we will use RxGORM for MongoDB:

build.gradle
dependencies {
    ...
    compile 'org.grails.plugins:rx-mongodb'
}

The plugin includes a generate-rx-controller command for the Grails command line that can help you generate a controller for a domain class that implements the RxEntity trait:

$ grails generate-rx-controller example.Book

3. Using RxGORM with Controllers

The RxJava plugin for Grails itself has no knowledge of RxGORM, it simply provides an API to help compose controllers responses from rx.Observable objects.

The grails.rx.web.Rx class provides static methods for mapping observables to Grails responses.

To use the class you can add a static import to any controller:

import static grails.rx.web.Rx.*
import static rx.Observable.*

Alternatively you can implement the RxController trait and use the rx property which is an instance of RxHelper:

import grails.rx.web.*
import static rx.Observable.*

class MyController implements RxController {

}

Also notice you typically statically import the rx.Observable factory methods.

Generally when writing controllers that work with RxJava you must return an rx.Observable from the controller.

The RxJava plugin for Grails will then automatically create an asynchronous, non-blocking request that subscribes the rx.Observable and processes the emitted results.

When working with RxGORM this means that it is possible to write simple logic that returns an observable. For example:

def show() {
    Book.get(params.id?.toString())
}

The above example uses the RxGORM get(id) method that returns an rx.Observable.

The framework will subscribe to the rx.Observable and if it returns a result will automatically call the respond method.

This the the most simple case and doesn’t require using any of methods of the grails.rx.web.Rx class.

However, you may want to customize the response and this is where the the API provided by RxController comes in. In the following section we will go through some examples of how to use the class to formulate responses.

3.1. Retrieving an Object Reactively

The following example shows how you can retrieve an entity reactively using RxGORM and the RxJava plugin:

def show() {
    Book.get(params.id?.toString()) (1)
            .map { Book book ->
        rx.respond(book) (2)
    }
    .switchIfEmpty( (3)
        just( rx.render(status: NOT_FOUND) ) (4)
    )
}
1 First we invoke the RxGORM get(id) method which returns an rx.Observable
2 If the rx.Observable emits a result, map it to the respond method using the Observable.map method.
3 If the rx.Observable does not emit a result use the switchIfEmpty to return a different rx.Observable
4 The rx.Observable.just(..) method is used to render a 404 status

What is import to understand is that the closure passed to the map method is step <2> does not have access to the web request as it is executed in a completely different thread.

This means that you cannot use the regular respond method defined by the controller class and must instead use the respond method statically imported from grails.rx.web.Rx.

Generally if you try to access any objects related to the request within a closure passed to the methods of an rx.Observable this will lead to an exception.

It is therefore important to ensure data you need to obtain from the request, such as parameters and so on, is obtained before hand. For example:

def show() {
    String author = params.author (1)
    Book.get(params.id?.toString())
            .map { Book book ->
        rx.render view:"book", model:[book:book, author:author] (2)
    }
}
1 The author parameter is retrieved in the controller action, not in the body of the map method
2 This allows us to pass the parameter to the model

3.2. Listing Objects Reactively

Implementing the index action to list objects is an interesting example and introduces some important concepts.

Below you can see an example implementation:

def index(Integer max) {
    params.max = Math.min(max ?: 10, 100)
    zip( Book.list(params), (1)
         Book.count() ) { List bookList, Number count -> (2)
        rx.render view:"index", (3)
                  model:[bookList: bookList, bookCount: count]
    }
}
1 In this case the zip method returns an Observable that combines the list of books and the count of the number of books obtained from the Book.count() method, which also returns an rx.Observable
2 The closure is passed the result of the zip method which is the list of books and the count
3 Finally, the RxHelper.render(..) method is used to indicate that the index view should be rendered with the appropriate model

The critical thing to understand here is the usage of the zip method. The zip method allows you to combine multiple rx.Observable queries and formulate a result from them. The queries are executed asynchronously in a non-blocking manner and the final result passed to the view for rendering.

3.3. Saving Objects Reactively

Saving RxGORM objects presents a different set of challenges. One challenge is you typically need to bind request data to the object in a non-blocking manner.

With this in mind the Rx.bindData(..) method provides the ability to bind an incoming JSON body to an object.

The bindData method returns an rx.Observable that emits the object once binding is complete.

The following example is a full implementation of saving an object:

def save() {
    rx.bindData(new Book(), request) (1)
            .switchMap { Book book -> (2)
        if(book.hasErrors()) {
            just(
                rx.respond( book.errors, view:'create') (3)
            )
        }
        else {
            book.save(flush:true) (4)
                    .map { Book b ->
                rx.respond b, [status: CREATED, view:"show"] (5)
            }
            .onErrorReturn { Throwable e -> (6)
                if(e instanceof ValidationException) {
                    rx.respond e.errors, view:'create'
                }
                else {
                    log.error("Error saving entity: $e.message", e)
                    return INTERNAL_SERVER_ERROR
                }
            }
        }
    }
}
1 The bindData method is used to bind the incoming request to a new object
2 The switchMap method is used to return another rx.Observable to be processed from the result of the original bindData observable.
3 If the object has errors then the errors are rendered using the respond method.
4 Otherwise the object is saved using the RxGORM save() method which returns an rx.Observable.
5 If the object was successfully saved then the respond method is used to render the object
6 Otherwise if an error occurred it is handled appropriately

4. Server Sent Events

You can write controllers that support Server Sent Events using the RxJava plugin for Grails.

Using the stream method you can return an observable that streams events to the browser.

For example:

def index() {

    rx.stream { Subscriber subscriber -> (1)
       for(i in (0..5)) {
           if(i % 2 == 0) {
               subscriber.onNext(
                   rx.render("Tick") (2)
               )
           }
           else {
               subscriber.onNext(
                   rx.render("Tock")
               )

           }
           sleep 1000 (3)
       }
       subscriber.onCompleted() (4)
   }
}
1 Call the stream method passing a closure that accepts an rx.Subscriber to start sending events
2 Emit a one or many items using onNext
3 Call sleep to simulate a slow request
4 Call onCompleted to complete the request

The above example is a simple example that simulates a process that takes a while using the sleep method, alternating the words "Tick" and "Tock".

On the client side you can register a JavaScript function that uses the controller as an event source:

function tickTock() {
    var eventSource = new EventSource("ticktock");
    eventSource.onmessage = function(event) {
        console.log("data: "+event.data)
        document.getElementById('message').innerHTML = event.data;
    };

}
tickTock()
The EventSource object is supported in most modern browsers, except Internet Explorer. You can use polyfills JavaScript replacement for IE.

In the above example the onmessage handler will be invoked every time an item is emitted.

If you wish to send a particular named event you can use the name argument of the stream method:

stream "ticktock", { Subscriber subscriber ->

And then attach an event listener for only that event on the client:

function tickTock() {
    var eventSource = new EventSource("ticktock");
    eventSource.addEventListener('ticktock', function(event) {
        console.log("data: "+event.data)
        document.getElementById('message').innerHTML = event.data;
    }, false);
}