In this post, we'll see how to push data to Riak, how to read it later on, and how to perform data processing out of Riak. As a reminder, we saw in the previous part that the events are schema-less HashTable data structures, grouped by epochs, data centers, types and subtypes, then serialised using Sereal and highly compressed.
If you missed part 1
We strongly recommend that you read part one of this blog series. The previous part explains how Booking.com collects and stores events from its backend into a central storage, and why Riak was chosen to do so.
Pushing to Riak
Pushing data to Riak is done by a number of relocators, which are daemons running on the aggregation layer that then push events blobs to Riak.
Side note: it's not recommended to have keys more then 1-2MB in Riak (see this FAQ). And since our blobs can be 5-10MB in size, we shard them into chunks, 500KB each. Chunks are valid Sereal documents, which means we do not have to stich chunks together in order to retrieve data back.
This means that we have quite a lot of blobs to send to Riak, so to maximise our usage of networking, I/O, and CPU, it's best to send data in a mass-parallel way. To do so, we maintain a number of forked processes (20 per host is a good start), in which each of them push data to Riak.
Whatever protocol is used, it's important to maximise I/O utilisation. One way is to use an HTTP library that parallelises the requests in term of I/O (YAHC is an example). Another method is to use an asynchronous Riak Client like AnyEvent::Riak.
We use an in-house library to create and maintain a pool of forks, but there are more than one existing libraries on CPAN, like Parallel::ForkManager.
PUT to Riak
Writing data to Riak is rather simple. For a given epoch, we have the list of events blobs, each of them having a different DC/type/subtype combination (remember, DC is short for Data Center). For example:
The first task is to slice the blobs into 500 KB chunks and add a postfix index number to their name. That gives:
Next, we can store all the event blobs in Riak in the
events bucket. We can
simulate it with curl:
curl -d <data> -XPUT "https://node:8098/buckets/events/keys/1413813813:1:type1:subtype1:0" # ... curl -d <data> -XPUT "https://node:8098/buckets/events/keys/1413813813:2:type3::0"
Side note: we store all events in each of the available Riak clusters. In other words, all events from all DCs will be stored in the Riak cluster which is in DC 1, as well as in the Riak cluster which is in DC 2. We do not use cross DC replication to achieve that - instead we simply push data to all our clusters from the relocators.
Once all the events blobs are stored, we can store the metadata, which is
the list of the event keys, in the
epochs bucket. This metadata is stored in one
key per epoch and DC. So for the current example, we will have 2 keys:
1413813813-2. We have chosen to store the list of events
blobs names as pipe separated values. Here is a simulation with curl for
curl -d "type1:subtype1:0|type1:subtype1:1|type3::0" -XPUT "https://riak_host:8098/buckets/epochs/keys/1413813813-2"
Because the epoch and DC are already in the key name, it's not necessary to repeat that in the content. It's important to push the metadata after pushing the data.
When pushing data to the Riak cluster, we can use different attributes to change the way data is written - either by specifying which ones when using the PBC API, or by setting the buckets defaults.
Riak's documentation provides a comprehensive list of the parameters and their meaning. We have set these parameters as follows:
"n_val" : 3, "allow_mult" : false, "last_write_wins" : true, "w" : 3, "dw" : 0, "pw" : 0,
Here is a brief explanation of these parameters:
n_val:3means that the data is replicated three times
last_write_winsprohibit siblings values; conflicts are resolved right away by using the last value written
w:3means that when writing data to a node, we get a success response only when the data has been written to all the three replica nodes
dw:0instruct Riak to wait for the data to have reached the node, not the backend on the node, before returning success.
pw:0is here to specify that it's OK if the nodes that store the replicas are not the primary nodes (i.e. the ones that are supposed to hold the data), but replacement nodes, in case the primary ones were unavailable.
In a nutshell, we have a reasonably robust way of writing data. Because our data is immutable and never modified, we don't want to have siblings or conflict resolution on the application level. Data loss could, in theory, happen if a major network issue happened just after having acknowledged a write, but before the data reached the backend. However, in the worst case we would lose a fraction of one second of events, which is acceptable for us.
Reading from Riak
This is how the data and metadata for a given epoch is laid out in Riak:
bucket: epochs key: 1428415043-1 value: 1:cell0:WEB:app:chunk0|1:cell0:EMK::chunk0 bucket: events key: 1428415043:1:cell0:WEB:app:chunk0 value: <binary sereal blob> bucket: events key: 1428415043:1:cell0:EMK::chunk0 value: <binary sereal blob>
Fetching one second of data from Riak is quite simple. Given a DC and an epoch, the process is as follow:
- Read the metadata by fetching the key
<epoch>-<dc>from the bucket
- Parse the metadata value, split on the pipe character to get data keys, and prepend the epoch to them
- Reject data keys that we are not interested in by filtering on type/subtype
- Fetch the data keys in parallel
- Deserialise the data
- Data is now ready for processing
Reading a time range of data is done the same way. Fetching ten minutes of data from Wed, 01 Jul 2015 11:00:00 GMT would be done by enumerating all the epochs, in this case:
1435748400 1435748401 1435748402 ... 1435749000
Then, for each epoch, fetch the data as previously mentioned. It should be noted that Riak is specifically tailored for this kind of workload, where multiple parallel processes perform a huge number of small requests on different keys. This is where distributed systems shine.
events bucket (where the event data is stored) has the following properties:
"r" : 1, "pr" : 0, "rw" : "quorum", "basic_quorum" : true, "notfound_ok" : true,
Again, let's look at these parameters in detail:
r:1means that when fetching data, as soon as we have a reply from one replica node, Riak considers this as a valid reply, it won't to compare it with other replicas.
pr:0remove the requirement that the data comes from a primary node
notfound_ok:truemakes it so that as soon as one node can't find a key, Riak considers that the key doesn't exist (
These parameter values allow to be as fast as possible when fetching data. In theory, such values don't protect against conflicts or data corruption. However, in the "Aggregated Events" section (see the first post), we've seen that every event blob has a suffix checksum. When fetching them from Riak, this enables the consumer to verify that there is no data corruption. The fact that the events are never modified ensures that no version conflict can occur. This is why having such "careless" parameter values is not an issue for this use case.
Real time data processing outside of Riak
After the events are properly stored in Riak, it's time to use them. The first usage is quite simple: extract data out of them and process it on dedicated machines, usually grouped in clusters or aggregations of machines that perform the same kind of analysis. These machines are called consumers, and they usually run daemons that fetch data from Riak, either continuously or on demand. Most of the continuous consumers are actually small clusters of machines spreading the load of fetching data.
Some data processing is required at near real-time. This is the case for monitoring, and building graphs. Booking.com heavily uses graphs at every layer of its technical stack. A big portion of graphs are generated from Events. Data is fetched every second from the Riak storage, processed, and dedicated graphing data is sent to an in-house Graphite cluster.
Other forms of monitoring also consume the events stream- fetched continuously and aggregated in per-second, per-minute, and daily aggregations in external databases, which are then provided to multiple departments via internal tools.
These kind of processes try to be as close as possible to real-time. Currently there are 10 to 15 seconds of lag. This lag could be shorter: a portion of it is due to the collection part of the pipeline, and an even bigger part of it is due to the re-serialisation of the events as they are grouped together, to reduce their size. A good deal of optimisation could be done there to reduce the lag down to a couple of seconds . However, there was no operational requirement for reducing it and 15 seconds is small enough for our current needs.
Another way of using the data is to stick to real-time, but accumulate seconds in periods. One example is our Anomaly Detector, which continuously fetches events from the Riak clusters. However, instead of using the data right away, it accumulates it on short moving windows of time (every few minutes) and applies statistical algorithms on it. The goal is to detect anomalous patterns in our data stream and provide the first alert that prompts further action. Needless to say, this client is critical.
Another similar usage is done when gathering data related to A/B testing. A large number of machines harvest data from the events' flow before processing it and storing the results in dedicated databases for use in experimentation-related tooling.
There are a number of other usages of the data outside of Riak, including manually looking at events to check new features behaviours or analysing past issues / outages.
Limitations of data processing outside of Riak
Fetching data outside of the Riak clusters raises some issues that are difficult to work around without changing the processing mechanism.
First of all, there is a clear network bandwidth limitation to the design: the more consumer clusters there are, the more network bandwidth is used. Even with large clusters (more than 30 nodes), it's relatively easy to exhaust the network capacity of all the nodes as more and more fetchers try to get data from them.
Secondly, each consumer cluster tends to use only a small part of the events flow. Even though consumers can filter out types, subtypes, and DCs, the resulting events blobs still contain a large quantity of data that is useless to the consumer. For storage efficiency, events need to be stored as large compressed serialised blobs, so splitting them more by allowing more subtyping is not possible .
Additionally, statically splitting the events content is too rigid since use of the data changes over time and we do not want to be a bottleneck to change for our downstream consumers. Part of an event from a given type that was critical 2 years ago might be used for minor monitoring now. A subtype that was heavily used for six month may now be rarely used because of a technical change in the producers.
Finally, the amount of CPU time needed to uncompress, load, and filter the big events blobs is not tiny. It usually takes around five seconds to fetch, uncompress, and filter one second's worth of events. Which means that any real-time data crunching requires multiple threads and likely multiple hosts - usually a small cluster. It would be much simpler if Riak could provide a real-time stream of data exactly tailored to the consumer need.
Next post: data filtering and processing inside Riak
What if we could remove the CPU limitations by doing processing on the Riak cluster itself? What if we could work around the network bandwidth issue by generating sub-streams on the fly and in real-time on the Riak cluster?
This is exactly what we implemented, using simple concepts, and leveraging the ease of use and hackability of Riak. These concepts and implementations will be described in the next part of this blog posts series!
 Some optimisation has been done, the main action was to implement a module to split a sereal blob without deserialising it, thus speeding up the process greatly. This module can be found here: Sereal::Splitter. Most of the time spent in splitting sereal blobs is now spent in decompressing it. The next optimization step would be to use compression that decrunches faster than the currently used gzip; for instance LZ4_HC.
 At that point, the attentive reader may jump in the air and proclaim "LevelDB and snappy compression!". It is indeed possible to use LevelDB as Riak storage backend, which provides an option to use Snappy compression on the blocks of data stored. However, this compression algorithm is not good enough for our need (using gzip reduced the size by a factor of almost 2). Also, Leveldb (or at least the eleveldb implementation that is used in Riak) doesn't provide automatic expiration which is critical to us, and had issues with reclaiming free space after key deletions, with versions below 2.x