Posted by on April 8, 2014 in Big Data

In my previous posts, I outlined how to import data into Hive tables using Hive scripts and dynamic partitioning. However, we’ve found that this only works for small batch sizes and it is not scalable for larger jobs. Instead, we found that it is faster and more efficient to partition the data as they are being read from Kafka brokers in a MapReduce job. The idea is to write the data into partitioned files and then simply move the files into the directories for the respective Hive partitions.

The first thing we need to do is to look into the data stream and build a json object for each record or line read in. This done by using a fast JSON parser like the one from the http://jackson.codehaus.org project:

private String DEFAULT_DATE = “1970-01-01”;

public String getKey(byte[] payload) {

org.codehaus.jackson.map.ObjectMapper objectMapper =  new ObjectMapper();

String timeStamp = DEFAULT_DATE;

try {

JsonNode jsonNode =

objectMapper.readTree(inString); // A record is read into the ObjectMapper

timeStamp = parseEventDataTimeStamp(jsonNode); // parse the partition date from the record

}

catch (Exception ex) {}

return String.format(“partitiondate=%s”, timeStamp);

}

Once we have the data in a JSON object, we can then parse out the partition data. In this example we want to get the EventDate from the record and extract the date value through the regex expression (“(^\\d{4}-\\d{2}-\\d{2}).*$”). This regex matches dates in the format 2014-01-10 and uses this value as part of the filename it will generate for this data. The method below shows you how to parse the partition date:

Sample Input JSON String:

{“EventType”:”INFO”,”EventDescription”:”Test Message”,”EventDate”:”2014-01-10T23:06:22.8428489Z”, …}

private String parseEventDataTimeStamp(JsonNode jsonNode) {

String timeStamp = jsonNode.path(“EventDate”).getValueAsText();

if (timeStamp == null) {

timeStamp = DEFAULT_DATE;

}

Pattern pattern = Pattern.compile(“(^\\d{4}-\\d{2}-\\d{2}).*$”); // This matches yyyy-mm-dd

Matcher matcher = pattern.matcher(timeStamp);

if (matcher.find()) {

timeStamp = matcher.group(1);

}

else {

timeStamp = DEFAULT_DATE;

}

return timeStamp;

}

The date is returned as a string in the form: “partitiondate=2014-01-10″. We can use this to specify which directory it should be moved to for the Hive table. So in this example, let’s say we have a Hive table called EventData. There would be directory named EventData for the Hive table in HDFS and there would be subdirectories for each Hive partition. We have a separate application that manages these files and directories. It gets all of the necessary information from the filename generated by our process. So we would create a partition/directory named partitiondate=2014-01-10 in the directory for EventData and place the file with all records of 2014-01-10 in there.

The getKey method can be used in a MapReduce java app to read from the Kafka brokers using multiple Hadoop nodes. You can use the method below to generate a data stream in the mapper process:

// This is a hashmap which caches partitiondate keys

private Map<String,Data> cachedMessageStreams = new HashMap<>();

public Data getMessageStream(byte[] payload) throws IOException, CompressorException {

// Generate a key for the partitiondate using information within the input payload message from Kafka.

String key = fileFormat.getKey(payload);

// We store the partitiondate information in a cache so that we only generate new files for records with new

// partitiondates. If the record belongs to an existing file that is already open with the same partitiondate, we

// don’t have to create a new file and just write to the existing one.

// Execute this block if the record contains a new partitiondate

if (!cachedMessageStreams.containsKey(key)) {

MessageStream messageStream = null;

String tempFileName = getFormattedFileName(key); // See the method definition below

// Create a new outputstream to write to a new file. We have custom formats for json and gzip files

OutputStream outputStream =

formatFactory.getCompressedStream(externalFileSystem.createFile(tempFileName));

messageStream = fileFormat.getStream(outputStream);

Data data = new Data(messageStream, tempFileName);

cachedMessageStreams.put(key, data);

return data;

}

else {

// If the record contains a partitiondate key that is already cached

// ie: a file and outputstream is already open for that partitiondate

// reuse the existing key and use it write data to the corresponding

// outputstream and file

return cachedMessageStreams.get(key);

}

}

// This is a generic method were you would write the data using the

// getMessageStream() method.

public void writeMessage(byte[] messageBytes) throws IOException {

try {

Data data = getMessageStream(messageBytes);

data.messageStream.writeMessage(messageBytes);

++data.messageCount;

}

catch (CompressorException ce) {

throw new IOException(ce);

}

}

// This is an example how you could generate a filename with the partitiondate information

// along with other pertinent information

private String getFormattedFileName(String key) {

String fileName = String.format(“%s__%sstart=%s__mapper=%s__id=%s”, topicProperties.topic, key,

getStartTime(), getHostName());

// We have custom methods that gets the Kafka topics from config files

// that we use to write to the filenames of the output files

return Paths.get(topicProperties.tempDir, fileName).toString();

}

I’ve given an overview of a way to read message data from Kafka into partitioned files using MapReduce. The partition information is written to the filenames generated by the methods outlined above. An example of a generated filename is:

eventdata__partitiondate=2014-01-10__start=2014.01.10_14.36.08__mapper=hadoopnode01__count=5273.json.gz

We store the Kafka topic name, partitiondate, creation time of the partition file, the hostname of the Hadoop node the mapper is on and a count of the number of records in the file. The file extension tells you that the file format is json that has been compressed using the gzip algorithm.

We use this information in a separate application to move the file to the respective directory in HDFS to load it into the corresponding Hive table. We found this process to be much faster and efficient than using Hive scripts to dynamically generate the partitions for us while loading Hive data.

What has your experience been in importing large amounts of data into Hadoop and Hive? What has worked, and what hasn’t?

About Xuyen On

Xuyen On is a Senior Software Engineer at Ancestry.com who works in the Data Services Team where he is building out a new infrastructure to collect Big Data and make it available to company.


We really do appreciate your feedback, and ask that you please be respectful to other commenters and authors. Any abusive comments may be moderated.

Commenting is open until Tuesday, 22 April 2014