Posted by on February 5, 2014 in Big Data

In my last post, I introduced our first steps in creating a scalable, high volume messaging system and would like to provide an update on our progress. We have built out a 0.7.2 Kafka cluster to start ingesting data from our servers. The cluster consists of the following:
5 x  Kafka nodes
•    Dual 6 core Xeon CPUs
•    96 GB RAM
•    12 x 2TB hard disks (2 for OS and Apps, 10 for data configured in Raid 10)
•    10 Gb NIC

3 x Zookeeper nodes
•    Dual 6 core Xeon CPUs
•    96 GB RAM
•    1 x 2TB hard drive
•    10 Gb NIC

We have not yet reached any limits pushing data into Kafka from our producers as we are still in the initial stages. However, it was found that a single (threaded) consumer was able to ingest data at a rate of approximately 100-150 Mb/sec on a virtual machine with a 10 Gb connection.
The initial logs that we’re collecting are formatted in JSON and have dynamic schemas. We are storing them into a single Hive table and partitioning them by year-month-day, as well as by the stackid to which the message originates from. In order to partition the data, we need to access data within nested JSON fields for the timestamp and stackid. One way to do this is to create a simple staging table that just uses the raw JSON files ex:
CREATE EXTERNAL TABLE IF NOT EXISTS stg (json string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘\n’
STORED AS TEXTFILE
LOCATION ‘ext_tables/stg/’;

The raw JSON files can be simply copied into this folder and it will can be queried through Hive. However, in order to make the processing more efficient, we would like to partition the data so that we don’t have to scan the entire data set for every query. So we can create another Hive table that is partitioned by some fields for example, timestamp and stacked:

CREATE EXTERNAL TABLE IF NOT EXISTS prtData (json string comment ‘Raw JSON string log message’)
partitioned by (ts string, stackid string)
stored as rcfile
LOCATION ”ext_tables/prtData/’;

We can then access the raw JSON files from the stg table and insert them into the partitioned table, prtData, through the following hive query:

insert into table prtData partition (ts,stackid)
select a.json as json, to_date(b.ts) as ts, c.stackid as stackid
from stg a
lateral view json_tuple(a.json, ‘TimeStamp’, ‘Properties’) b as ts,props
lateral view json_tuple(b.props, ‘StackId’) c as stackid
distribute by ts, stackid
sort by ts, stackid;

The above query assumes that the JSON data is well formed and has a Timestamp field at the first level and a nested StackId field within a Properties field ex:

{“EventType”:”INFO”,”TimeStamp”:”2014-01-13T06:10:59.97746Z”, “Properties”:{“StackId”:”Server1″,”ComponentId”:”AggService”,”ClientId”:”1″}}
The Hive UDF function json_tuple allows us to handle a dynamic JSON schema by doing a lateral view which creates columns on the fly. It is used when we want to access more than one field within the JSON data at a time. Otherwise we would use the get_json_object if we only need to access a single JSON field at a time. Another difference between these two methods worth noting is that with get_json_object, you can simply access a nested JSON field using the “.” notation ex:

get_json_object(stg.json, ‘$.Properties.StackId’).

With json_tuple, you have to export the nested JSON data to another json_tuple call in order to access the nested field ex:

lateral view json_tuple(a.json, ‘TimeStamp’, ‘Properties’) b as ts,props
lateral view json_tuple(b.props, ‘StackId’) c as stackid

The first line extracts the TimeStamp field and Properties JSON object into an alias table called b. This alias table stores the Timestamp field in ts and the Properties JSON object in props. The TimeStamp field can be accessed directly as a string, but we only want to access the StackId field with the Properties JSON object. This is done by passing b.props into another json_tuple function, shown on the second line: lateral view json_tuple(b.props, ‘StackId’) c as stackid. We now can access both TimeStamp and StackId fields and use them for partitioning the data.
If you are using 0.9 Hive or older, you need to sort the data before inserting them into another table. Otherwise, we found that records can get dropped or duplicated. This is done by the last two lines of the query:
distribute by ts, stackid
sort by ts, stackid;

This sorting technique is faster and more efficient than a simple order by clause because it will distribute the ordering operation to multiple reducers. One for each different instance of ts, stackid. However, ordering is done only within these partitions. This is different than the order by clause which is good if you want complete ordering over all the data, but it has to do it within one reducer. This means that it will take a lot longer since it has to process all the data on one machine.
In summary, I presented a way to handle log data in JSON format with dynamic schemas. I presented a way to query data that is embedded within the raw JSON by using the Hive functions: get_json_object() and json_tuple(). I also presented a way to efficiently process the data by partitioning using the timestamp and stackid embedded within the JSON data. This allows the user to query subsets of the data based on partition values like ‘2014-01-01’ for time and ‘web_logs’ for stackid instead of having to scanning the entire data set for every query.

About Xuyen On

Xuyen On is a Senior Software Engineer at Ancestry.com who works in the Data Services Team where he is building out a new infrastructure to collect Big Data and make it available to company.

2 Comments

tridev 

Thanks……Its really nice

April 6, 2014 at 5:31 am
rob 

Very helpful, thx

May 10, 2014 at 1:01 pm

We really do appreciate your feedback, and ask that you please be respectful to other commenters and authors. Any abusive comments may be moderated.

Commenting is open until Wednesday, 19 February 2014