B.

Using Riak as Events Storage - Part 4

We saw in the previous parts of this blog series how to gather, aggregate and store events in Riak, and how to fetch them for external processing. In this post, we'll see how to use post-commit hooks to apply transformations to the events data stored in Riak, without using MapReduce jobs.

If you missed Part 3...

We strongly recommend that you read Using Riak as Events Storage - Part 3 of this blog series, to understand why MapReduce doesn't fit our needs. The previous parts explains how Booking.com collects and stores events from its backend into a central storage, and how we use it to do events analysis.

Strategy and Features

The previous parts introduced the need for data processing of the events blobs that are stored in Riak in real-time, and the strategy of bringing the code to the data:

instead of bringing the data to the processing code, let's bring the code to the data

Using MapReduce for computing on-demand data processing worked fine but didn't scale to many users (see part 3).

Finding an alternative to MapReduce for server-side real-time data processing requires listing the required features of the system and the compromises that can be made:

Real-time isolated data transformation

As seen in the previous parts of this blog series, we need to be able to perform transformation on the incoming events, with as little delay as possible. We don't want any lag induced by a large batch processing. Luckily, these transformations are usually small and fast. Moreover, they are isolated: the real-time processing may involve multiple types and subtypes of events data, but should not depend on previous events knowledge. Cross-epoch data processing can be implemented by reusing the MapReduce concept, computing a Map-like transformation on each events blobs by computing them independently, but leaving the Reduce phase up to the consumer.

Performance and scalability

The data processing should have a very limited bandwidth usage and reasonable CPU usage. However, we also need the CPU usage not to be affected by the number of clients using the processed data. This is where the previous attempt using MapReduce showed its limits. Of course, horizontal scalability has to be ensured, to be able to scale with the Riak cluster.

One way of achieving this is to perform the data processing continuously for every datum that reach Riak, upfront. That way, client requests are actually only querying the results of the processing, and not triggering computation at query time.

No back-processing

The data processing will have to be performed on real-time data, but no back-processing will be done. When a data processing implementation changes, it will be effective on future events only. If old data is changed or added (usually as a result of reprocessing), data processing will be applied, but using the latest version of processing jobs. We don't want to maintain any history of data processing, nor any migration of processed data.

Only fast transformations

To avoid putting too much pressure on the Riak cluster, we only allow data transformation that produces a small result (to limit storage and bandwidth footprint), and that runs quickly, with a strong timeout on execution time. Back-pressure management is very important, and we have a specific strategy to handle it (see "Back-pressure management strategy" below)

The solution: Substreams

Substreams, a simplified overview

With these features and compromises listed, it is now possible to describe the data processing layer that we ended up implementing at Booking.com.

This system is called Substreams. Every seconds, the list of keys of the data that has just been stored is sent to a companion app - a home-made daemon - running on every Riak node. This fetches the data, decompresses it, runs a list of data transformation code on it, and stores the results back into Riak, using the same key name but with a different namespace. Users can now fetch the processed data.

A data transformation code is called a substream because most of the time the data transformation is more about cherry-picking exactly the needed fields and values out of the full stream, rather than performing complex operations.

The companion app is actually a simple pre-forking daemon with a Rest API. It's installed on all nodes of the cluster, with around 10 forks. The Rest API is used to send it the list of keys, and wait for the process completion. The events data doesn't transit via this API; the daemon is fetching itself the key values from Riak, and stores the substreams (results of data transformation) back into Riak.

The main purpose of this system is to drastically reduce the size of data transferred to the end user by enabling the cherry-picking of specific branches or leaves of the events structures, and also to perform preliminary data processing on the events. Usually, clients are fetching these substreams to perform more complex and broader aggregations and computations (for instance as a data source for Machine Learning).

Unlike MapReduce, this system has multiple benefits:

Data decompressed only once

Deserialisation and decompression is done once, for many data processing jobs

A given binary blob of events (at mot 500K of compressed data) is handled by one instance of the companion app, which will decompress it once, then run all the data processing jobs on the decompressed data structure in RAM. This is a big improvement compared to MapReduce, the most CPU intensive task is actually to decompress and deserialise the data, not to transform it. Here we have the guarantee that data is decompressed only once in its lifetime.

Transformation at write time, not at query time

Data is created once and for all

Unlike MapReduce, once a transformation code is setup and enabled, it'll be computed for every epoch, even if nobody uses the result. However, the computation will happen only once, even if multiple users request it later on. Data transformation is already done when users want to fetch the result. That way, the cluster is protected against simultaneous requests of a big number of users. It's also easier to predict the performance of the substreams creations.

Hard timeout - open platform

Data decompression and transformation by the companion app is performed under a global timeout that would kill the processing if it takes too long. It's easy to come up with a realistic timeout value given the average size of event blobs, the number of companion instances, and the total number of nodes. The hard timeout makes sure that data processing is not using too many resources, ensuring that Riak KV works smoothly.

This mechanism allows the cluster to be an open platform: any developer in the company can create a new substream transformation and quickly get it up and running on the cluster on its own without asking for permission. There is no critical risk for the business as substreams runs are capped by a global timeout. This approach is a good illustration of the flexible and agile spirit in IT that we have at Booking.com.

Implementation using a Riak commit hook

detailed picture with the commit hook

In this diagram we can see where the Riak commit hook kicks in. We can also see that when the companion requests data from the Riak service, there is a high chance that the data is not on the current node and Riak has to get it from other nodes. This is done transparently by Riak, but it consumes bandwidth. In the next section we'll see how to reduce this bandwidth usage and have full data locality. But for now, let's focus on the commit hook.

Commit hooks are a feature of Riak that allow the Riak cluster to execute a provided callback just before or just after a value is written, using respectively pre-commit and post-commit hooks. The commit hook is executed on the node that coordinated the write.

We set up a post-commit hook on the metadata bucket (the epochs bucket). We implemented the commit hook callback, which is executed each time a key is stored to that metadata bucket. In part 2 of this series, we explained that the metadata is stored in the following way: - the key is <epoch>-<datacenter_id>, for example: 1413813813-1 - the value is the list of data keys (for instance 1413813813:2:type3::0)

The post-commit hook callback is quite simple: for each metadata key, it gets the value (the list of data keys), and sends it over HTTP in async mode to the companion app. Proper timeouts are set so that the execution of the callback is capped and can't impact the Riak cluster performance.

Hook implementation

First, let's write the post commit hook code:

metadata_stored_hook(RiakObject) ->
    Key = riak_object:key(RiakObject),
    Bucket = riak_object:bucket(RiakObject),
    [ Epoch, DC ] = binary:split(Key, <<"-">>),
    MetaData = riak_object:get_value(RiakObject),
    DataKeys = binary:split(MetaData, <<"|">>, [ global ]),
    send_to_REST(Epoch, Hostname, DataKeys),
    ok.

send_to_REST(Epoch, Hostname, DataKeys) ->
    Method = post,
    URL = "https://" ++ binary_to_list(Hostname)
       ++ ":5000?epoch=" ++ binary_to_list(Epoch),
    HTTPOptions = [ { timeout, 4000 } ],
    Options = [ { body_format, string },
                     { sync, false },
                      { receiver, fun(ReplyInfo) -> ok end }
              ],
    Body = iolist_to_binary(mochijson2:encode( DataKeys )),
    httpc:request(Method,
                  {URL, [], "application/json", Body},
                  HTTPOptions, Options),
    ok.

These two Erlang functions (here they are simplified and would probably not compile), are the main part of the hook. The function metadata_stored_hook is going to be the entry point of the commit hook, when a metadata key is stored. It receives the key and value that was stored, via the RiakObject, uses its value to extract the list of data keys. This list is then sent to the companion damone over Http using send_to_REST.

The second step is to get the code compiled and Riak setup to be able to use it is properly. This is described in the documentation about custom code.

Enabling the Hook

Finally, the commit hook has to be added to a Riak bucket-type:

riak-admin bucket-type create metadata_with_post_commit \
'{"props":{"postcommit":["metadata_stored_hook"]}'

Then the type is activated:

riak-admin bucket-type activate metadata_with_post_commit

Now, anything sent to Riak to be stored with a key within a bucket whose bucket-type is metadata_with_post_commit will trigger our callback metadata_stored_hook.

The hook is executed on the coordinator node, that is, the node that received the write request from the client. It's not necessary the node where this metadata will be stored.

The companion app

The companion app is a Rest service, running on all Riak nodes, listening on port 5000, ready to receive a json blob, which is the list of data keys that Riak has just stored. The daemon will fetch these keys from Riak, decompress their values, deserialise them and run the data transformation code on them. The results are then stored back to Riak.

There is little point showing the code of this piece of software here, as it's trivial to write. We implemented it in Perl using a PSGI preforking web server (Starman). Using a Perl based web server allowed us to also have the data transformation code in Perl, making it easy for anyone in the IT department to write some of their own.

Optimising intra-cluster network usage

As seen saw earlier, if the commit hook simply sends the request to the local companion app on the same Riak node, additional bandwidth usage is consumed to fetch data from other Riak nodes. As the full stream of events is quite big (around 150 MB per second), this bandwidth usage is significant.

In an effort to optimise the network usage, we have changed the post-commit hook callback to group the keys by the node that is responsible for their values. The keys are then sent to the companion apps running on the associated nodes. That way, a companion app will always receive event keys for which data are on the node they are running on. Hence, fetching events value will not use any network bandwidth. We have effectively implemented 100% data locality when computing substreams.

Better implementation where metadata is sent to the Riak node that contains the data

This optimisation is implemented by using Riak's internal API that gives the list of primary nodes responsible for storing the value of a given key. More precisely, Riak's Core application API provides the preflist() function: (see the API here) that is used to map the result of the hashed key to its primary nodes.

The result is a dramatic reduction of network usage. Data processing is optimised by taking place on one of the nodes that store the given data. Only the metadata (very small footprint) and the results (a tiny fraction of the data) travel on the wire. Network usage is greatly reduced.

Back-pressure management strategy

For a fun and easy-to-read description of what back-pressure is and how to react to it, you can read this great post by Fred Hebert (@mononcqc): Queues Don't Fix Overload.

What if there are too many substreams, or one substream is buggy and performs very costly computations (especially as we allow developers to easily write their own substream), or all of a sudden the events fullstream change, one type becomes huge and a previously working substream now takes 10 times more to compute?

One way of dealing with that is to allow back-pressure: the substream creation system will inform the stream storage (Riak) that it cannot keep up, and that it should reduce the pace at which it stores events. This is however not practical here. Doing back-pressure that way will lead to the storage slowing down, and transmitting the back-pressure upward the pipeline. However, events can't be "slowed down". Applications send events at a given pace and if the pipeline can't keep up, events are simply lost. So propagating back-pressure upstream will actually lead to load-shedding of events.

The other typical alternative is applied here: doing load-shedding straight away. If a substream computation is too costly in CPU time, wallclock time, disk IO or space, the data processing is simply aborted. This protects the Riak cluster from slowing down events storage - which after all, is its main and critical job.

That leaves the substream consumers downstream with missing data. Substreams creation is not guaranteed anymore. However, we used a trick to mitigate the issue. We implemented a dedicated feature in the common consumer library code; when a substream is unavailable, the full stream is fetched instead, and the data transformation is performed on the client side.

It effectively pushes the overloading issue down to the consumer, who can react appropriately, depending on the guarantees they have to fulfill, and their properties.

  • Some consumers are part of a cluster of hosts that are capable of sustaining the added bandwidth and CPU usage for some time.
  • Some other systems are fine with delivering their results later on, so the consumers will simply be very slow and lag behind real-time.
  • Finally, some less critical consumers will be rendered useless because they cannot catch up with real-time.

However, this multitude of ways of dealing with the absence of substreams, concentrated at the end of the pipeline, is a very safe yet flexible approach. In practice, it is not so rare that a substream result for one epoch is missing (one blob every couple of days), and such blips have no incidence on the consumers, allowing for a very conservative behaviour of the Riak cluster regarding substreams: "when in doubt, stop processing substreams".

Conclusion

This data processing mechanism proved to be very reliable and well-suited for our needs. The implementation required surprisingly small amount of code, leveraging features of Riak that proved to be flexible and easy to hack on.

This blog post ends the series about using Riak for event storing and processing at Booking.com. We hope you liked it !

comments powered by Disqus