B.

Distributed Top-N Similarity Join with Hive and Perl Part I

At Booking.com we have a lot of data to manage. Most of the time MySQL does the job, but for very large datasets and heavy analytics we use Hadoop and Hive. Most of the time Hive serves our analytical needs well, but there are some cases when the built-in features are not enough. One such case is similarity analysis.

Similarity Analysis

In similarity analysis we try to quantify the similarity between different objects. This is usually done by a similarity function, which compares attributes of two objects and scores the similarity between them. There are different ways to define similarity function, but most of them are based on defining similarity as inverse function of a distance between two objects, where objects are represented as vectors of attributes. That way the similarity problem is reduced to calculation of distances between vectors in N-dimensional space.

The problem arises when the number of objects we want to compare is very large, because in order to explicitly compare all possible vector pairs, we need a nested loop which scales as O(n^2). Accordingly, when n is very large O(n^2) gets very slow. Thus, it is a problem when you have a large dataset of vectors and you need to do a similarity analysis, but you need it to run fast.

There is no canonical solution for making similarity analysis fast. It all depends on a specific case and every implementation is done to take advantage of that specific case's properties. The most common approach is to bucket the "likely similar" candidates in a clever way so that the number of comparisons is reduced. The algorithm for sampling candidates is not universal and also depends on the properties of a specific case so, naturally, there are many different bucketing algorithms, most common being different LHS schemes like MinHash and more recent ones like DISCO and DIMSUM. Furthermore, for many common cases there are open source libraries, like Mahout , Spark, and MLib, which implement common bucketing schemes and common similarity functions like Cosine Similarity.

If your use case is a common one, like collaborative filtering, or document similarity matching based on term co-occurrence, you can use some of the mentioned solutions. However, if your use case is uncommon, then you have to implement the solution yourself.

Our Case

At Booking.com we deal with hotels. Naturally, we do similarity analysis on hotels by comparing vectors of their attributes. An attribute is some characteristic of a Hotel relevant for analysis, like for example geographical location, presence of a pool, or Wi-Fi. Depending on the type of analysis we do, there are different groups we want to compare and different similarity functions we want to use.

For example, for revenue analysis it does not make sense to compare a small Bed & Breakfast to a large hotel with 200 rooms. In this case, we can use the number of rooms as a factor in bucketing hotels to "likely similar" groups. However, for customer experience analysis, the hotel size may not matter that much. Those two properties may be similar in terms of having a low number of complaints, high review scores, good hiking options, and historical ruins near by. Therefore, depending on type of similarity we need a different, custom-made similarity function and a different bucketing scheme. In addition, we need to be able to compare fairly large buckets. In some cases it even makes sense to compare every possible combination. We want our analysts and data scientists to have that freedom when looking for data patterns. This means using a brute force approach in combination with candidate bucketing with freedom to choose the brute force ratio we want to have for a given case.

In the worst-case scenario, we deal with the comparison of every pair, for 500,000 hotels that is 250 billion vector pairs to process. We want that done in a short amount of time so it seems more like an ad-hoc query instead of a long running job.

Considering the power needed, the natural choice for this adventure was our big Hadoop cluster.

The requirements summed up in a short list are:

  1. In order to compare everything to everything, we need to be able to deal with O(n^2).
  2. Since we want the flexibility of comparing different attributes and experiment with different similarity functions without restriction, we need to be able to deal with heavy similarity functions.
  3. Given O(n^2) and a heavy similarity function, the best way currently to make this fast is with massive parallel processing.
  4. We need the solution to be easy to use. Ideally, we want to be able to say: join these two datasets and here is the similarity function to use.
  5. From a pragmatic perspective, since we already have two large Hadoop clusters we want to take advantage of those resources.
  6. Since we have a Hive data warehouse, the similarity join should work on Hive tables out of the box.

The hardest part in this list is making O(n^2) play nice with Hadoop MapReduce, since MapReduce does not fit well into this type of a problem. While we were aware that this is not an ideal situation, there were still major benefits of being able to use the existing infrastructure and tools we already have.

Our Solution

First thing to realize is that Hive joins are key based as they translate to MapReduce jobs, which are also, by paradigm, key based. Similarity join is not key-based. We are not pairing up records based on key matching, but instead we have custom similarity function, which scores similarity for every two pairs of records based on different attributes. Then, in the end result we want to get top N similar records. This discrepancy in the nature of similarity join and key based join means that, in theory, Hive and MapReduce are not an ideal choice for this type of problem. However, in practice, our data warehouse is stored in Hive, our analysts and data scientists are using Hive, and SQL-like syntax of Hive has a large usability advantage over writing custom Java/Scala code. All of this means that it would be very valuable for us to be able solve this problem in Hive.

Therefore, we wanted to try and see if Hive can be made to play nice with this type of a problem.

These are the different approaches we experimented with.

I. Naive approach:

Try to do a cross join in Hive and use similarity function in a select transform.

It turns out that this approach is dreadfully slow, and the main reason is that Hive by default directs all cross join record pairs to a single reducer, which cannot handle this load by itself.

Hive is designed to work with key based equality joins and the principle is that records which have a key match go to the same reducer. That makes perfect sense when we have a key condition, but in case of a cross join there is no key condition and therefore all possible combinations are a valid join-match. This means they all end up on one reducer. Hive is aware of this and warns you a cross join is happening. The problem is that we really need the cross join.

II. Brute force approach:

Since Hive needs keys, what about making an intermediate table with the IDs of all the record pairs?

It’s going be a lot of them, but after all it's Hadoop. It should be able to handle it. In return you can join tables with a key match and you can have multiple reducers. While this can actually work, it's very slow and it's really an abuse of MapReduce. If you have one million records, then the join mid-table will have one trillion keys. You will need to maintain (or create every time) one trillion records just to cross join two tables. If you want to join some other tables in addition to the original two you need to generate a new join table. In short, don’t do this.

III. Hive performance tuning approach:

What if there is a way to somehow make Hive not to force all the record combinations to a single reducer?

Well, it turns out that it is possible. You can specify the number of reducers explicitly and then use DISTRIBUTE BY to distribute map output to multiple reducers so your similarity function can be ran in parallel on multiple reducers. But, before you get too excited, there is a catch: even though Hive will distribute the join pairs to multiple reducers for further processing, Hive will first have to do the cross join to get all of the combinations that will be sent to the reducers. That process can be very slow because the number of mappers that will do this work is determined dynamically based on the size of the larger table. But with a similarity join the larger table is not that large by Hive standards. Even with one million records, which is large for a similarity join, it is still small with Hive and will give you several mappers. Obviously, several mappers are way below what we need. With a data size increase the number of mappers will become a bottleneck. To deal with this you can tweak the split sizes to increase the number of mappers, but you will have to set very low limits compared to the Hive defaults. It will, at the very least, be an unconventional usage of Hive and MapReduce. If you don't mind this tweaking and a lot of nested queries, you will end up with something like this:

set mapred.reduce.tasks=100;
set mapred.min.split.size=100000;
set mapred.max.split.size=200000;
set mapred.min.split.size.per.node = 100000;
set mapred.max.split.size.per.node = 200000;
set mapred.min.split.size.per.rack = 100000;

ADD FILE ./distance.pl;
ADD FILE ./filter.pl;

SELECT TRANSFORM (
    id_a,
    id_b,
    distance
)
USING "/usr/local/bin/perl filter.pl"
AS (id_a, id_b, distance)

FROM ( --numerically sorted distances:

    SELECT
        id_a,
        id_b,
        cast(distance as double) as distance

    FROM ( -- distances:

        SELECT TRANSFORM (
            id_a,
            id_b,
            attribute_a_1,..., attribute_a_N,
            attribute_b_1,..., attribute_b_N
        )
        USING "/usr/local/bin/perl distance.pl"
        AS (id_a, id_b, distance)

        FROM ( -- All combos distributed

            SELECT
                id_a,
                id_b,

                attribute_a_1,
                ...,
                attribute_a_N,

                attribute_b_1,
                ...,
                attribute_b_N

                -- all pairs:
                FROM (

                    SELECT
                        A.id        as id_a,
                        B.id        as id_b,

                        A.attribute_1 as attribute_a_1,
                        ...,
                        A.attribute_N as attribute_a_N,

                        B.attribute_1 as attribute_b_1,
                        ...,
                        B.attribute_N as attribute_b_N

                    FROM (
                        SELECT
                            id,
                            attribute_1,...,attribute_N
                        FROM
                            table_A
                    ) A
                    CROSS JOIN
                    (
                        SELECT
                            id,
                            attribute_1,...,attribute_N
                        FROM
                            table_B
                    ) B

                ) ALL_COMBINATIONS

                CLUSTER BY
                    id_a

            ) ALL_COMBINATIONS_DISTRIBUTED

        ) DISTANCES

        DISTRIBUTE BY id_a
        SORT BY
            id_a, distance

    ) DISTANCES_DISTRIBUTED_SORTED

;

The above query does the following:

  1. Sets the max split size to 200KB so we get a lot of mappers.
  2. Does a cross join with multiple mappers.
  3. Distributes all pairs to multiple reducers.
  4. On the reduce side it uses our "distance.pl" reducer to score similarity of each pair.
  5. At the end it uses our "filter.pl" script to filter out pairs below the desired similarity threshold.

So, we have a pure MapReduce way to run similarity joins. It works, and you can distribute distance calculation load over multiple reducers and have as many mappers as you want. However, there are some major problems with it:

  1. Although we can spawn as many mappers as we want, they are really not doing any calculation work. They are just getting all the pairs that need to be processed. In addition, the forwarding of all those pairs to reducers takes too long. In fact, only 5-10% of the time is spent on reducer side processing the data, and 90-95% of the time is spend on Hive trying to get the data to reducers. Basically, if you want to use Hive the wrong way, there is no better candidate than the cross join.
  2. As if the cross join issue was not enough, after the calculation of the similarity scores for all pairs, there is an additional step for filtering. This is very inefficient because for each record A it has to sort all similarity scores between record A and all of the records in table B. Aside from being a good lesson in inefficiency, there is nothing good about that. One way to do it efficiently is to remove the sort step and second reducer, and maintain a priority queue in the first reducer so we have our top N matches right after the first reducer without the need for sorting and filtering. But that would add an additional usability problem because every time you need a similarity join you have to deal with the plumbing instead of just focusing on the similarity function.
  3. Even if we didn't have the above mentioned problems, still from the usability point of view, large nested queries like these are cumbersome and error prone in terms of developing and maintaining their code. It is quite the opposite of easy-to-use, which was also an important requirement.

In order to bring this to a satisfactory level, we needed to remove the slow useless parts (the all-pair forwarding step), keep the fast useful parts (reducers doing the actual work), and package it in a form that is easy to use.

IV. Perl steps in to help

Good packaging requires quality glue and, when it comes to glue, Perl has never let us down. We made a small Perl library that makes working with similarity joins a breeze. Also, with optimizations made, it is a lot faster than doing this with Hive alone. The optimizations made are:

  1. Direct data distribution to reducers:
    • First table is streamed directly to multiple reducers.
    • Second table is cached in a distributed cache so it is available to reducers on all data nodes.
  2. Joining the cached second table with streaming records from first table on multiple reducers in parallel.
  3. In each reducer the priority queue is maintained and returned when the streamed key has been processed, so we get the top N similar records for the key in sorted order directly from reducer (no need for sort step).

Basically this is a distributed reduce side cross join coordinated from Perl which uses Hive and Hadoop as a convenient mechanism for data flow and distributed caching. You can use HQL to specify data sources for join, and a code distance function in Perl. In principle, you could do all of this without Hadoop and Hive if you are willing to manage all the data distribution and caching yourself, but it is much easier and more cost effective to take advantage of Hadoop Streaming and Distributed Cache for the data flow management during this process.

This is how the above example looks in Perl:

use strict;
use warnings;

use Hive::SimilarityJoin::Runner;

# Create a similarity join job
my $job = Hive::SimilarityJoin::Runner->new({
    # Performance:
    nr_reducers => 100,

    # IN:
    dataset_info => {
        main_dataset => {
            hql => q|
                SELECT
                    id,
                    attribute_1,...,attribute_N
                FROM
                    table_A
            |,
        },
        reference_data => {
            hql          => q|
                SELECT
                    id,
                    attribute_1,...,attribute_N
                FROM
                    table_B
            |,
        },
    },

    # Top-N bucket size:
    bucket_size => 5,

    # OUT:
    out_info => {
        out_dir  => '/tmp/simjoin_out',
        out_file => 'simjoin_result.tsv',
    },
});

$job->run();

# provide a similarity/distance function in the main script;
sub similarity_distance {
    my ($rec_a,$rec_b) = @_;
    my ($id_a,attribute_a_1,...,attribute_a_N) = @$rec_a;
    my ($id_b,attribute_b_1,...,attribute_b_N) = @$rec_b;

    my $distance;
    # ... do your calc

    return [$id_a, $id_b, $distance];
}

You can customize the distance calculation to suite your needs. For example, if we want to calculate geographical distances between cities, the similarity distance function becomes:

sub similarity_distance {
    my ($row_a, $row_b) = @_;

    my ($id_1, $lat_1, $long_1) = @$row_a;
    my ($id_2, $lat_2, $long_2) = @$row_b;

    return if $id_1 == $id_2;

    require Math::Trig qw(great_circle_distance :pi);
    my $earthRadius = 6371;
    my $degToRad = pi / 180.0;

    # convert to radians
    my $long_1_radians = $long_1 * $degToRad || 0;
    my $lat_1_radians  = $lat_1  * $degToRad || 0;
    my $long_2_radians = $long_2 * $degToRad || 0;
    my $lat_2_radians  = $lat_2  * $degToRad || 0;

    # sphere earth approximation
    my $geo_distance = 0;
    $geo_distance = great_circle_distance(
        $long_1_radians,
        pi/2 - $lat_1_radians,
        $long_2_radians,
        pi/2 - $lat_2_radians,
        $earthRadius
    );

    return [$id_1,$id_2, $geo_distance];
}

Also, you don’t need to have the library installed on the remote Hadoop data nodes. It is enough to have it on the box from where you run the script. All the HQL and reducer code will be generated at runtime and uploaded to the remote data nodes.

The source code is available on github.

Benchmark:

We used the above mentioned geo distance between two cities to compare performance between pure MapReduce approach and hybrid Hive/Perl combination approach. Depending on dataset size and number reducers, our module performs [4x-27x] times faster then ordinary MapReduce approach.

Here is the summary of the benchmark results for these two approaches:

Hive with performance tuning:
+--------------+----------------+---------+
| dataset | reducers | reducers | runtime |
|  size   | total    | parallel |         |
+---------+----------+----------+---------+
| 10.000  | 100      | 100      | 8min    |
+---------+----------+----------+---------+
| 25.000  | 500      | 500      | 54min   |
+---------+----------+----------+---------+

Perl-Hive hybrid:
+--------------+----------------+---------+
| dataset | reducers | reducers | runtime |
|  size   | total    | parallel |         |
+---------+----------+----------+---------+
| 10.000  | 100      | 100      | 2min    |
+---------+----------+----------+---------+
| 25.000  | 500      | 500      | 2min    |
+---------+----------+----------+---------+
| 50.000  | 1.000    | 500      | 3min    |
+---------+----------+----------+---------+
| 200.000 | 5.000    | 500      | 17min   |
+---------+----------+----------+---------+
| 500.000 | 25.000   | 500      | 115min  |
+---------+----------+----------+---------+

What is interesting to note is that the performance difference is not that dramatic for small datasets (Perl-Hive hybrid is only 4x times faster). But as the data grows, the performance difference becomes much more significant. For 25.000 records, the performance increase is 27 times in favor of hybrid solution. For larger datasets the non-hybrid solution was so slow that we decided not to compare at all.

At the end of the day we can process 250 billion distances in less than two hours.

End remarks

For some interesting examples of what we found by analysing our hotels, stay tuned for part two of this post.

Cheers and happy data crunching!

comments powered by Disqus