Posted by on May 23, 2013 in Big Data

Hive is good at querying immutable data like log files. These are files that do not change after they are written. But what if you want to query data that can change? For example, users of our site frequently make modifications to their family trees. Some of this data sits in very large and frequently updated transactional tables in our databases. They have records that can be inserted, updated and possibly deleted. We would like to query this data in order to see how our users interact with the site. So, how do you maintain data that is mutable in Hive? We will outline a strategy to do this.

First we need to get the data from the database into Hive staging tables. We do this by an ETL (Extract, Transform, and Load) process to export the SQL transactional logs for each table we want to export into CSV (Comma Separated Value) files. These CSV files record each insert, update or delete operation for a given table. These files need to have three fields in addition to the fields you want to export into Hive:

1. row_id – This field is a unique identifier for each row. It should be an incrementing integer value that follows the order of the update operation as they should be applied to the table.

2. op – This field identifies the update operation. It can be either an insert, update, or delete. This column can be an enumeration or an integer type (i.e.: insert = 1, update = 2, delete = 3).

3. op_timestamp – This field is a timestamp that records when the update operation was applied to the table.

Let’s take a simple example of two tables to track active users on the site:

UserActivity

row_idopop_timestampuser_idlast_login_timeancestor_ids
1Insert2013-01-01 01:30:001002013-01-01 1:00:001000, 1010, 1011
2Update2013-01-02 13:10:001002013-01-02 13:00:001000, 1010, 1011, 1050

 

ActiveUser

row_idopop_timestampUser_idfirst_namelast_name
1Insert2013-01-01 01:30:00100JohnSmith
2Delete2014-01-02 00:00:00100JohnSmith

 

These tables are used to track the activities of active users. Here we have a user named John Smith who created a new account on 2013-01-01. He found and added three ancestors to his family tree (IDs:  1000, 1010 and 1011) so we create a new insert record for this user in UserActivity and ActiveUser. We see that John logged in the following day and found another ancestor with ID 1050, so he added it to his family tree. We add a new row with an update operation with the updated list of ancestor IDs: 1000, 1010, 1011 and 1050. But let’s say a user leaves the site and doesn’t come back for a year. The system looks for these inactive accounts and deletes them. This is recorded by row_id 2 in the ActiveUser table.

So we need to create the Hive staging tables. For example:

CREATE TABLE IF NOT EXISTS UserActivityStg (

row_id bigint,

op string,

op_timestamp timestamp,

user_id int,

first_name string,

last_name string,

)

ROW FORMAT DELIMITED

FIELDS TERMINATED BY ‘,’

STORED AS TEXTFILE

LOCATION ‘/hive_stage/ UserActivityStg /’;

 

CREATE TABLE IF NOT EXISTS UserStg (

row_id bigint,

op string,

op_timestamp timestamp,

user_id int,

last_login_time timestamp,

ancestor_ids string,

)

ROW FORMAT DELIMITED

FIELDS TERMINATED BY ‘,’

STORED AS TEXTFILE

LOCATION ‘/hive_stage/ UserStg /’;

 

The CSV files can simply be copied into the staging directories that we specified in the create statements above. Analysts may want to query the historical usage or update pattern of the tables and this data can get quite large so we should store the historical transaction logs in Hive tables instead of a database. But this doesn’t capture the current state of the tables. In order to do that we have a Java process that reads from the Hive staging tables through a JDBC connector and applies the transaction operation to a Hive table that is backed by HBase. This is an example of how these tables would be created within Hive:

 

CREATE TABLE UserActivity (

row_id bigint,

op string,

op_timestamp timestamp,

user_id int,

first_name string,

last_name string,

)

STORED BY ‘org.apache.hadoop.hive.hbase.HBaseStorageHandler’

WITH SERDEPROPERTIES (“hbase.columns.mapping” = “:key, cf1:row_id, cf1:op, cf1:op_timestamp, cf1:user_id, cf1:first_name, cf1:last_name”)

TBLPROPERTIES (“hbase.table.name” = “UserActivity”);

 

We used the HBase Java API to apply the inserts, updates and deletes directly to the HBase tables. By mapping Hive tables on top of HBase, we get the best of both worlds with the ability to alter the data through HBase and the ability to query data with joins through Hive. So everything should be great and our problems solved right? Not quite. Although we were able to do queries with joins on these tables backed by HBase, they were four to five times slower than querying regular Hive tables that were based on regular files and the query speed decreased as more columns were added.

 

So we are exploring different options to see how we can get everything working with acceptable speed. One option is to create another Hive table backed by a regular file and to simply schedule a job to copy the data from the HBase table. This seems to work for smaller data sets, but it’s still too slow for the larger data sets so we’re tweaking HBase and Hive to see if we can optimize the process.

 

If you have any experience querying and maintaining mutable data, we would love to hear from you.

About Xuyen On

Xuyen On is a Senior Software Engineer at Ancestry.com who works in the Data Services Team where he is building out a new infrastructure to collect Big Data and make it available to company.

5 Comments

data recovery disk 

All three of these examples leave you pretty gutted and data recovery becomes your next course of action!
When data is corrupt ordam aged in some way, it could be that you can listen to half of a song, see half of a picture, hear a film but have no picture or just a plain error message saying, “cannot open file” or something similar.

May 31, 2013 at 2:00 am
October 10, 2013 at 7:08 pm
Abhinav Mithal 

Hey Xuyen, You have defined the problem pretty nicely in this blog.
I am in process of solving exactly same problem. The solution is mostly same expect for one small ting which I will describe below.
We have an HBase table (in hbase) which gets updated periodically by several ETLs, and we want to be able to query the data in Hive with fast performance.
Problem Statement:
Here is the solution: I have a column in Hbase called “update time” which always holds the last update unix timestamp for that row. In Hive I have an external table mapped to this hive table. As a result any queries in this external hive table gets all of the latest and greatest updates to the rows in Hbase. But like you said such queries are slow. So just like your solution I added a local hive table (managed table) and do INSERT INTO local hive table from external hive table. Here is the performance enhancement piece. I only insert rows to local which got updated since last time I run the insert process (by looking at the unix tiestamp column).
And then when we query HIve we will get multiple rows for each time an update happed to a row in hive, but I sort them with column ‘timestamp’ DES and pick the top row returned from the set.
Hope this helps or opens more discussions / options.

January 16, 2014 at 5:21 pm
Abhinav Mithal 

Oh! and if you have solved this more efficiently already I would appreciate if you could shae your thoughts around it. Thanks!

January 16, 2014 at 5:22 pm
    Xuyen On 

    Actually we’re doing something very similar Abhinav. We insert new data into HBase and it takes care of all the updates of records internally. When we want to do a query, we grab the data by some range specified the key in HBase and export them to Hive tables for further processing. We only grab the records with the latest timestamp so that we get the most recent snapshot of that table. This is far from ideal, but I think we will see better solutions later when they get OLTP working on Hadoop.

    January 21, 2014 at 7:09 pm

We really do appreciate your feedback, and ask that you please be respectful to other commenters and authors. Any abusive comments may be moderated.

Commenting is open until Thursday, 6 June 2013