At Ancestry.com we are becoming more data driven. That means we want to capture more data about our systems, including how our users are interacting with them. Part of that strategy is to capture the log files from our application servers and put them into our Hadoop cluster.
We have tried using MSMQ and RabbitMQ in a custom framework to capture messages, but found that it does not scale enough for our requirements. So we are trying out a new messaging framework from LinkedIn called, Kafka .
Kafka is a distributed open source messaging system that captures logs into files. It is designed to handle very large volumes of data in the order of 10-50 MB/sec, depending on the configuration of the Kafka cluster. There are 3 main parts to the framework:
Figure 1 High Level diagram of Kafka Messaging Framework.
1) Producer – This is a process that sits on the application server and its job is to capture the log data, and also send them as messages to the brokers in the Kafka cluster. Messages are categorized into message streams called “topics,” and the messages within a topic can be split into partitions
Figure 2 How partitions are structured in a topic.
Partitioning allows messages in a topic to be written across many brokers, and this allows for scalability. The producer can be written in almost any language. We will most likely be developing custom producers in .Net and Java. We will also be writing custom wrapper classes that will intercept method calls using Aspect Oriented Programming (AOP) techniques initially. This will allow us to get a broad range of data quickly and easily so that we can get quick insight into our systems. Later on, we will be adding more instrumentation into our code to allow for more insight into specific parts of our systems.
2) Kafka cluster – The cluster is made up of brokers that run on multiple nodes. The brokers store the messages as a set of log files. These log files are kept as a running window of messages for a given amount of time. We will be keeping about 7 days’ worth of messages before they will be deleted in order to make room for new data. The Kafka cluster uses ZooKeeper to keep track of the states of each of the brokers so that they can be dynamically added or removed. This is also important because it is also used to reroute messages if a broker goes down for any reason.
3) Consumer – The consumer’s job is to capture the messages within the Kafka cluster while they exist, and then forward them to another component – which could be a real-time reporting system, a data warehouse, or a simple file dump. Our initial goal is to get the system log data from our application servers into Hadoop, and load them into Hive tables so that we can easily query the data. We’re trying out another open source project from LinkedIn called Camus to achieve this. Camus is a specialized MapReduce job that reads from the Kafka brokers and writes the files out directly into HDFS. Consumers keep track of the last position they read by the offsets from the starting position of a file. Camus saves these offset values in HDFS so that the next time it starts, it knows where it left off from the last run.
We are starting out with Kafka 0.7 because it has been used longer and our hope is that it’s more stable than the latest version, which is at the time of this writing 0.8. Some of the issues we ran into so far include:
- We tried using the built-in Avro encoders and file format, but ran into issues when trying to read the resulting Avro files in Hive. So we are using JSON files stored as string initially.
- We are finding duplicates appear occasionally in the output of our Camus jobs, so we will need to handle them during our Hive loading process.
- It might be possible for a Kafka broker to go down, so we are writing an app that will monitor the brokers and restart them if necessary.
We will be setting up a test cluster and will try to do some load testing to see how much our cluster can handle. Stay tuned!
1,2 – 2014-01-12: http://kafka.apache.org/documentation.html