Taming the events: How we regained petabyte-scale Hive query power

When events sizes started to matter

In the beginning, four long years ago, simply having every data set in Hadoop was good enough. We were taking baby steps into the big data world and had a limited user base migrating from other sytems into the big data clusters, enabling them to execute heavy queries on large data sets in a timely manner. As the number of users and size of the data sets grew in size, we were challenged by new performance problems.

The majority of those data sets were all sorts of server-generated events stored in JSON — a format which has since become the norm for ease of use and development. We had gone the typical route followed by all NoSQL new starters in allowing our developers maximum flexibility and get rid of nearly all schemas (which had never really existed for these events).

To make this bunch of JSON objects easier to query and process, we used Hive1, making a big partitioned table with only a few columns for UUID2, datacenter IDs, timestamps and a few other things, plus a very fat column containing the whole JSON. Using Hive was an obvious choice for us as we already had many MySQL tables imported there for the analysts to use. Putting the events in the same pool allowed for some powerful scenarios, where you could easily join anything to anything. We could also use Perl and all the business logic we wrote in it directly in Hive thanks to the TRANSFORM construct 3.

This was already billions of events per day coming from a wide range of application types and servers, the bulk of it coming from the web and mobile front-ends. Traffic kept growing steadily and more event types were added. Individual records became fatter, too. Why limit yourself to a few metrics when you can store everything and keep it forever? Analysts and developers could, and would, scan ranges of several weeks, or even months, in a single Hive query — and everyone was pretty happy.

It worked quite well for a long time: Hive would hide the absurdly massive resources needed for querying petabytes behind a friendly face, and nobody (except the handful of people in charge of maintaining and expanding the whole thing) would know about it. But, due to this inflation combined with the rapidly growing number of users, queries soon started getting much slower.

Cool and simple ideas

We scratched our heads for a long time. How could we make it as efficient as it used to be, without imposing a fixed schema? How could we have the tons of scripts and queries that rely on these ugly blobs still work, but regain the efficiency they once had? All the papers that we read and the experience of the big players in the industry told us one thing: you definitely need a schema for readability by analysts (at the discovery phase) and for efficient querying.

Imposing this is also at odds with how things are done at Booking.com. We always go out of our way to make everyone's job easier, and not apply restrictions, even when it implies adopting creative solutions. Some of these may occasionally make your skin crawl, but as long as they do the job, that's what matters. The approach has served us well so far.

So we had this simple idea: instead of splitting the JSONs according to some schema kept in a registry, which would have been the natural thing to do, why not... do nothing? Keep the events as they always were, and live with it. They'd still be usable by most queries, just not the heaviest ones. And for these fat queries, we would make something brand new: a faster ORC4 table, with the JSONs split according to the most common use patterns that we see in actual queries.

This means a table whose structure would be driven by the actual needs of our users. It would mutate by itself over time to fit the evolution of these needs, avoiding breaking legacy scripts (by keeping the old table) while allowing new scripts (or those that would need adjustments for performance reasons) to use the new one. A self-mutating table, as silly as it sounds.

What our users were looking for

Getting a full list of all the queries that are run on a cluster wasn't too difficult. We have loads of monitoring scripts, some of which collect data about individual jobs and put them in databases with columns having information about:

user ID
user name
job name
number of maps and reducers
resources used in CPU and MB-seconds
the query (in the case of Hive jobs)

YARN archives most of this on job completion in one XML file per job, so that's easy enough5, and joining this with the extra information we needed wasn't difficult.

Once we had this covered, and true to our usual ways, we went for the simplest thing that could possibly work: figure out what was usually fetched inside the JSONs by extracting all the get_json* calls (and including a faster custom JSON UDF that we've developed) contained in the query strings. For this we used some Perl text processing modules (like Text::Balanced) that are part of the usual junk-processing arsenal. Plugging something in Hive's query parser would have been marginally neater, but the Perl-based solution got the job done in no time.

The extracted data sets gave us a long list of JSON keys in dot notation, like "foo.bar.baz", that would tell us what people were looking for in these events. We had some constraints in mind for the exercise: force some keys that we knew we'd need later to be counted as "seen" even though they were not present in the queries (= a whitelist) and prevent too much fragmentation (foo.bar.baz1 and foo.bar.baz2 should be seen as foo.bar and kept together as a "leaf" JSON fragment). We made a script that would do just that. It checks what the people needed, when they needed it and keeps that resulting list of keys with timestamps (telling when the corresponding query ran) in a database table.

We'd give them everything they wanted

We needed to run something continuously on the stream of real events to:

  1. Figure out which of those keys would return actual results, because everyone makes mistakes in their queries from time to time and it's easy to misspell "bar" for "baz". If we only trusted the queries without checking what they returned, we'd soon find ourselves populating many columns with NULLs and making the schema bloated and unusable.
  2. Figure out what data type the returned data could fit in once ingested in Hive, in order to specify the most efficient storage format for the destination columns at the splitting phase.
  3. Create destination partitions with the proper columns and types, cleanup the schema when needed, and insert the split JSONs into the destination.

A handful of simple tricks

The script that we wrote does all of this in one pass.

  1. Obtain the keys from step #1, append them to the key log table in MySQL and merge the set with the keys found during the previous runs.
  2. Run a Hive query that splits the JSON blobs for the processed hour in tiny fragments, according to this list of keys. Then for every fragment of actual data, figure out what data type it would best fit into (about 8 types, from TINYINT to TEXT). This is done using a special UDF that does both the JSON splitting and the data type check in one go, outputting two columns for each fragment: the data, and it's guessed type. As should be obvious, the guessed data type for a specific key is dependent on what JSON object is being analyzed, so it's really important that this is done for all JSONs and all keys.
  3. Out of the temporary table we just created in the previous step, extract the "fattest" data type that was found for every destination data column / JSON key. This will be the data type for our destination column. This also allows us to detect which keys are just mistakes, as they returned no data at all and their detected data type should be NULL. To figure this all out we run a simple auto-generated aggregate query on the datatype columns. Here's a picture that will hopefully make this clearer (click on the image to see a larger version): click on the image to see a larger version
  4. Use this information to modify the destination table's schema, stored in ORC format, on the fly. One of the nice things about ORC is that partitions of a table don't have to all use the exact same schema: the column order has to stay the same, but the data types can vary. When creating the partition and inserting, the data type specified in the table definition is used, so what is basically text will be properly converted to numeric or other types and stored accordingly. But when reading, the partition definition (not the table) becomes authoritative. This means that in a single query hitting several partitions, the same column can provide different data types, but they will be properly cast at runtime. Which means you can have for instance INTs of various widths in the same column spread over several partitions, starting with TINYINT when you began gathering data, all the way up to BIGINT as your IDs increased. And it will just work.
  5. Finally, populate the destination partition with the contents of the temporary table, minus the data type columns.
  6. Update the table definition with per-column comments in JSON format, recording meta information like the first and last time some actual data was seen for the column. This allows us to sort-of nullify some columns that are not used anymore. Since columns cannot be dropped, we do it this way to reuse them for other keys once the time they've been empty is longer than the predefined time window. Efficient in terms of storage space and for keeping a slim schema.
  7. Drop partitions older than the time window. Done.

Was it worth it?

This is the stage where you start questioning the sanity of this whole enterprise. After all, we found ourselves building a table that is mutating by itself on-demand (since it only uses information that users provide through their queries), and there ought to be a million ways this could fail horribly. You usually carefully craft ALTER TABLE DDL statements, not leave it to a script without direct supervision, right?

It turns out it seems to work quite well. Besides, this is "only" derived data, and we could go back to the source in the event of something gone wrong.

The script that orchestrates all this is only a handful of lines long, including loads of comments related to some of ORC's youth issues (i.e. bugs) that prevented us making it even niftier, and the logic is pretty straightforward.

Let's have a look at a query example; this is the type of scan on a partition that gets run routinely. Before the new table appeared, the query would look like this. It uses event_parser, a UDF we wrote to make processing of our JSONs easier and faster. The syntax is even uglier than that of a query using stock get_json_object(), but it runs quicker:

CREATE TEMPORARY FUNCTION event_parser AS 'com.booking.hive.udtf.EventParser';

create table mydb.sample_raw AS
SELECT  event.epoch,
FROM default.raw_events as event
LATERAL VIEW event_parser( event.json,
    ) response
        yyyy_mm_dd  = '2016-04-18'
    AND hh          = 21
    AND is_frontend = 1
    AND action_name = 'explorer';

Here's the syntax for running the same query on the new table, undoubtedly a whole lot better:

create table mydb.sample_flat AS
SELECT  epoch,
FROM default.events_flat_web
        yyyy_mm_dd    = '2016-04-18'
    AND hh            = 21
    AND is_frontend   = 1
    AND `action/name` = 'explorer';

In terms of storage, removing the lesser used data sets and switching to ORC4+ZLIB instead of RCFile6+Snappy brought the volume down by 60%. For the hourly partition we're querying here, it means going from nearly 500GB to less than 200GB. But this is big data, and size doesn't matter that much, as usual (unless you're the one buying the hard drives). What about performance, then?

Here is the CPU time for the old version (the result is a table of nearly 100 million rows):

1 days 1 hours 25 minutes 10 seconds 20 msec

And for the new version:

0 days 2 hours 41 minutes 12 seconds 420 msec

Which is 9,672 seconds vs 91,510, close to a 90% drop. What are we going to do with all these free CPU cycles on our hands?

We may want to keep it

Experimenting was fun, but the only thing that really matters is the benefits for our users and infrastructure. In short:

  • Query runtimes were divided by about four
  • CPU time were divided by up to ten
  • Queries are easier to write and read, and more expressive

Which makes it a nice hack, but first and foremost a successful experiment.

The funny thing is, this was initially meant to solve a short-term problem for a few queries, hence the limited time window to save storage space. But it turns out our users liked the new table so much they didn't want us to drop any old partitions, and started using the new table as a full replacement for the old one.

As has happened many times before, what started its life as a quick hack (in search of a better solution) is going to be sticking around quite a bit longer.

  1. Hive is a SQL-like interface for writing MapReduce Jobs on Hadoop clusters. 

  2. See the definition of Universally unique identifier 

  3. TRANSFORM is possibly one of the neatest things in Hive, allowing to plug arbitrary code written in any language into a HQL query. This makes Hive become de facto a Hadoop map-reduce platform accessible to any language, in a handier way than using simple HadoopStreaming. More information on Hive transform can be found in the Hive Wiki

  4. Hive ORC Manual explains the implementation and usage details of the file format. 

  5. The jobs statistics and info are not retrieved from the YARN History Server's REST API as you'd normally expect, as it turned out this front-end was unable to cope with multi-thousand-container jobs. Querying the History Server for such jobs made it so unresponsive it couldn't do anything else, including archiving completed jobs data, which turned it into a single point of failure for the whole cluster. Yet another case of Hadoop fun. 

  6. See the definition of Record Columnar File 

comments powered by Disqus