We have built out an initial logging framework with Kafka 0.7.2, a messaging system developed at LinkedIn. This blog post will go over some of the lessons we’ve learned by building out the framework here at Ancestry.com.
Most of our application servers are Windows-based and we want to capture IIS logs from these servers. However, Kafka does not include any producers that run on the Microsoft .Net platform. Thankfully, we were able to find an open source project where someone else wrote libraries that run on .Net that could communicate with Kafka. This allowed us to develop our own custom producers to run on our Windows application servers. You may find that you will also need to develop your own custom producers because every platform is different. You might have applications running on different OS’s, or your applications might be running in different languages. The Kafka apache site lists all the different platforms and programming languages that it supports. We plan on transitioning onto Kafka 0.8 but we could not find any corresponding library packages like there was for 0.7.
Something to keep in mind when you design your producer is that it should be as lean and efficient as possible. The goal is to have as high throughput for sending messages to Kafka as possible while keeping the CPU and memory overhead as low as possible, so as to not overload the application server. One design decision we made early on was to have compression in our producers in order to make communication between the producers and Kafka more efficient and faster. We initially used gzip because it was natively supported within Kafka. We achieved very good compression results (10:1) and also had the added benefit of saving storage space. We have 2 kinds of producers. One ran as a separate service which simply reads log files in a specified directory where all the log files to be sent are stored. This design is well suited for cases when the log data is not time critical because the data is buffered in log files on the application server. This is useful because if a Kafka cluster becomes unavailable, the data is still saved locally. It’s a good safety measure against network failures and outages. The other kind of producer we have is hard coded into our applications. The messages are being sent directly to Kafka from code. This is good for situations where you want to get the data to Kafka as fast as possible and could be interfaced with a component like Samza (another project from LinkedIn) for real-time analysis. However, messages can be lost if the Kafka cluster becomes unavailable so a fail over cluster would be needed to prevent message loss.
To get data out of Kafka and into our Hadoop cluster we wrote a custom Kafka consumer job that is a Hadoop map application. It is a continuous job that runs every 10-15 minutes. We partitioned our Kafka topics to have 10 partitions per broker. We have 5 Kafka brokers in our cluster that are treated equally, which means that a message can be routed to any broker determined by a load balancer. This architecture allows us to scale out horizontally, and if we need to add more capacity to our Kafka cluster, we can just add more broker nodes. Conversely, we can take out nodes as needed for maintenance. Having many partitions allows us to scale out more easily because we can increase the number of mappers in our job to read from Kafka. However, we have found that splitting up the job into too many pieces may result in too many files being generated. In some cases, we were generating a bunch of small files that were less than the Hadoop block size, which was set to 128Mb. This problem was made evident when we had a large ingest of a batch of small files which had over 40 million small files being loaded into our Hadoop cluster. This caused our NameNode to go down because it was not able to handle the sheer number of file handles within the directory. We had to increase the Java heap memory size to 16 GB just to be able to do an ls (listing contents) on the directory. Hadoop likes to work with a small number of very large files (they should be much larger than the block size) so you may find that you will need to tweak the number of partitions used for the Kafka topics, as well as how long you want your mapper job to write to those files. Longer map times with fewer partitions will result in fewer and larger files, but it will also mean that it will take longer for the messages to be queried in Hadoop and it can limit the scalability of your consumer job since you will have less possible mappers to assign the job.
Another design decision we made was to partition the data within our consumer job. Each mapper would create a new file each time a new partition value was detected. The topic and partition values would be recorded in the filename. We created a separate process that would look in a staging directory in HDFS where the files were be generated. This process would look at the file names and determine whether there are existing table and partitions in Hive. If there are, it would simply move those files into the corresponding directory in the Hive External table directory in HDFS. If the partition did not already exist, it would dynamically create new ones. We also compressed the data within the consumer job to save disk space. We initially tried gzip, which gave good compression rates, but it dramatically slowed down our Hive queries due to the processing overhead. We are now trying bzip2 which gives less compression, but our Hive queries are running faster. We choose bzip2 because of its lower processing overhead, but also because it is a splitable format. This means that Hadoop can split a large bz2 file and assign multiple mappers to work on it.
That covers a few of the lessons learned thus far as we build out our messaging framework here at Ancestry. I hope you will be able to use some of the information covered here so that you can avoid the pitfalls we encountered.