In this post, we'll see how to apply transformations to the events data stored in Riak without the data leaving the cluster. We saw in the previous parts how to gather, aggregate and store events in Riak, and how to fetch them for external processing. We'll see now how to reduce bandwidth usage by applying data transformation without moving the events outside of the cluster.
If you missed Part 2
We strongly recommend that you read part 2 of this blog series. The previous parts explains how Booking.com collects and stores events from its backend into a central storage, and how we use it to do events analysis.
The reasoning is actually very simple. The final goal is to perform data processing of the events blobs that are stored in Riak in real-time. Data processing usually produces a very small result, and it appears to be a waste of network bandwidth to fetch data outside of Riak to perform data analysis on consumer clusters, as in this example::
This diagram is equivalent to:
So instead of bringing the data to the processing code, let's bring the code to the data:
This is a typical use case for MapReduce. We're going to see how to use MapReduce on our dataset in Riak, and also why it's not a usable solution.
For the rest of this post, it’s important to establish a reference for all the events that are stored for a time period of exactly one second. Because we already happen to store our events by a second (and call it an “epoch”), using this unit of measure is a practical consideration that we’ll refer to as epoch-data.
A first attempt: MapReduce
MapReduce is a very well known (if somewhat outdated) way of bringing the code near the data and distributing data processing. There are excellent papers explaining this approach for further background study.
To perform events processing of an epoch-data on Riak, the MapReduce job would look like the following list. Metadata and data keys concepts are explained in the part 2 of the blog series. Here are the MapReduce phases:
- Given a list of epochs and DCs, the input is the list of metadata keys, and as additional parameter, the processing code to apply to the data.
- A first Map phase reads the metadata values and returns a list of data keys.
- A second Map phase reads the data values, deserialises it, applies the processing code and returns the list of results.
- A Reduce phase aggregates the results together
This works just fine. For one epoch-data, one data processing code is properly mapped to the events, the data deserialised and processed in around 0.1 second (on our initial 12 nodes cluster). This is by itself an important result: it's taking less than one second to fully process one second worth of events. Riak makes it possible to implement a real-time MapReduce processing system .
Should we just use MapReduce and be done with it? Not really, because our use case involves multiple consumers doing different data processing at the same time. Let's see why this is an issue.
To be able to test the MapReduce solution, we need a use case and some metrics to measure.
The use case is the following: every second, multiple consumers (say 20) need the result of one of the data processing (say 10) of the previous second.
We'll consider that an epoch-data is roughly 70MB, data processing results are around 10KB each. Also, we'll consider that the Riak cluster is a 30 nodes ring with 10 real CPUs available for data processing on each node.
The first metric we can measure is the external network bandwidth usage. This is the first factor that encouraged us to move away from fetching the events out of Riak to do external processing. External bandwidth usage is the bandwidth used to transfer data between the cluster as a whole, and the outside world.
The second metric is the internal network bandwidth usage. This represents the network used between the nodes, inside of the Riak cluster.
Another metric is the time (more precisely the CPU-time) it takes to deserialise the data. Because of the heavily compressed nature of our data, decompression and deserialising one epoch-data takes roughly 5 sec.
The fourth metric is the CPU-time it take to process the deserialized data, analyze it and produce a result. This is very fast (compared to deserialisation), let’s assume 0.01 sec. at most.
Note: we are not taking into account the impact of storing the data in the cluster (remember that events blobs are being stored every second) because it’s impacting the system the same way in both external processing and MapReduce.
Metrics when doing external processing
When doing standard data processing as seen in the previous part of this blog series, one epoch-data is fetched out from Riak, and deserialised and processed outside of Riak.
External bandwidth usage
The external bandwidth usage is high. For each query, the epoch-data is transferred, so that's 20 queries times 70MB/s = 1400 MB/s. Of course, this number is properly spread across all the nodes, but that's still roughly 1400 / 30 = 47 MB/s. That, however, is just for the data processing. There is a small overhead that comes from the clusterised nature of the system and from gossiping, so let's round that number to 50 MB/s per node, in external output network bandwidth usage.
Internal bandwidth usage
The internal bandwidth usage is very high. Each time a key value is requested, Riak will check its 3 replicas, and return the value. So 3 x 20 x 70MB/s = 4200 MB/s. Per node, it's 4200 MB/s / 30 = 140 MB/s
Deserialise time is zero: the data is deserialised outside of Riak.
Processing time is zero: the data is processed outside of Riak.
Metrics when using MapReduce
When using MapReduce, the data processing code is sent to Riak, included in an ad hoc MapReduce job, and executed on the Riak cluster by sending the orders to the nodes where the epoch-data related data chunks are stored.
External bandwidth usage
When using MapReduce to perform data processing jobs, there is certainly a huge gain in network bandwidth usage. For each query, only the results are transferred, so 20 x 10KB/s = 200 KB/s.
Internal bandwidth usage
The internal usage is also very low: it's only used to spread the MapReduce jobs, transfer the results, and do bookkeeping. It's hard to put a proper number on it because of the way jobs and data are spread on the cluster, but overall it's using a couple of MB/s at most.
Deserialise time is high: for each query, the data is deserialised, so 20 x 5 = 100 sec for the whole cluster. Each node has 10 CPUs available for deserialisation, so the time needed to deserialise one second worth of data is 100/300 = 0.33 sec. We can easily see that this is an issue, because already one third of all our CPU power is used for deserialising the same data in each MapReduce instance. It's a big waste of CPU time.
Processing time is 20 x 0.01 = 0.2s for the whole cluster. This is really low compared to the deserialise time.
Limitations of MapReduce
As we've seen, using MapReduce has its advantages: it's a well-known standard, and allows us to create real-time processing jobs. However it doesn't scale: because MapReduce jobs are isolated, they can't share the deserialised data, and CPU time is wasted, so it's not possible to have more than one or two dozens of real-time data processing jobs at the same time.
It's possible to overcome this difficulty by caching the deserialised data in memory, within the Erlang VM, on each node. CPU time would still be 3 times higher than needed (because a map job can run on any of the 3 replicas that contains the targeted data) but at least it wouldn't be tied to the number of parallel jobs.
Another issue is the fact that writing MapReduce jobs is not that easy, especially because — in this case — it’s a prerequisite to know Erlang.
Last but not least, it's possible to create very heavy MapReduce jobs, easily consuming all the CPU time. This directly impacts the performance and reliability of the cluster, and in extreme cases the cluster may be unable to store incoming events at a sufficient pace. It's not trivial to fully protect the cluster against MapReduce misuse.
A better solution: post-commit hooks
We explored a different approach to enable real-time data processing on the cluster that scales properly by deserialising data only once, allows us to cap its CPU usage, and allows us to write the processing jobs in any language, while still bringing the code to the data, removing most of the internal and external network usage.
This technical solution is what is currently in production at Booking.com on our Riak events storage clusters, and it uses post-commit hooks and a companion service on the cluster nodes.
We'll explore in detail this solution in the next blog post, so stay tuned!
 Using MapReduce on Riak is usually somewhat discouraged because most of the time it's being used in a wrong way, for instance when performing bulk fetch or bulk insert or traversing a bucket. The MapReduce implementation in Riak is very powerful and efficient, but must be used properly. It works best when used on a small number of keys, even if the size of data processed is very large. The fewer keys the less bookkeeping and the better performance. In our case, there are only a couple of hundred keys for one second worth of data (but somewhat large values, around 400K), which is not a lot. Hence the great performance of MapReduce we've witnessed. YMMV.