Storing log messages in Hadoop

In part 1 of this article series we described the various challenges of dealing with large amounts of logging data in a heavily distributed software ecosystem. After evaluating different approaches, we quickly selected Hadoop as the technology of our choice. In this article we will describe how some pitfalls we had to solve when using Hadoop to store log messages.

Before we begin, let’s sum up the requirements for the logging system we are building:

  • It should be able to store at least 40 GB data per day.
  • The total data size will be around 20 TB of data.
  • Searching for log messages should be done in real time. The acceptable response time for a search query is less than 10 seconds.

The transport and aggregation of the log messages is not part of this article series, we will concentrate on the storage backend.

We are using Hadoop as the storage backend and Lucene for implementing the real time search functionality.

Today we will concentrate on the storage of log messages in Hadoop.

Overview of Hadoop

In this section, I will give a brief overview of the basic concepts of Hadoop. If you are already familiar, with Hadoop, you may skip to the next section.

Hadoop consists of two components:

  • Hadoop Distributed Filesystem (HDFS): This is a file system which spans over all servers in the Hadoop cluster and provides a single namespace for storing files. It is optimized for reading and writing large files. The files are broken down into blocks with a size of at least 64 MB. Each block is stored on multiple servers to increase the reliability of the filesystem. Data access is done by connecting directly to the server where the relevant blocks are stored. Therefore network bandwidth is minimized and the IO load is distributed throughout the cluster.
    HDFS is based on the Google File System.
  • Map/Reduce: The basic idea behind Map/Reduce is that a user submits a processing job to the so called JobTracker. The job will then be split up into a lot of tasks which are distributed among the servers in the Hadoop cluster. Each task will process only data blocks which are stored locally on the server it is running on. The output of each task is written into the HDFS. The JobTracker monitors all running tasks and will kill and restart tasks, which have failed, hang or disappear. This first phase is called the Map phase.
    When the tasks have finished, the intermediate output files will be sorted and combined into the overall output file. This is called the Reduce phase.
    You can get a detailed explanation of this mechanism in Googles Map/Reduce paper.

Storing log messages in Hadoop

The first challenge for implementing a storage for log messages was to find a file format which allows fast and direct access to a single log message while storing hundreds of thousands log message in a single file. Each file needs to have a size of at least 64 MB to utilize the large block size of HDFS.

Hadoop provides two file formats for grouping multiple entries in a single file:

  • SequenceFile: A flat file which stores binary key/value pairs. The output of Map/Reduce tasks is usually written into a SequenceFile.
  • MapFile: Consists of two SequenceFiles. The data file is identical to the SequenceFile and contains the data stored as binary key/value pairs. The second file is an index file, which contains a key/value map with seek positions inside the data file to quickly access the data.
A SequenceFile stores entries as binary key/value-pairs.

We started using the SequenceFile format to store log messages. It turned out that, while this format seems to be well suited for storing log messages and processing them with Map/Reduce jobs, the direct access to specific log messages is very slow. The API to read data from a SequenceFile is iterator based, so that it is necessary to jump from entry to entry until the target entry is reached.

Since one of our most important use cases is searching for log messages in real time, slow random access performance is a show stopper.

MapFiles use 2 files: the index file stores seek positions for every n-th key in the datafile. The datafile stores to data as binary key/value-pairs.

Therefore we moved to MapFiles. MapFiles have the disadvantage that a random access needs to read from 2 separate files. This seems to be slow, but the indexes which store the seek positions for our log entries are small enough to be cached in memory. Once the seek position is identified, only relevant portions of the data file are read. Overall this leads to a nice performance gain.

Google Protocol Buffers

Since MapFiles and SequenceFiles use binary key / value pairs we need a data format to store log messages in these files. As we have seen in part 1 of this article series, a log message typically consists of a couple of data fields followed by the free text message. In order to be able to search efficiently for log messages, we wanted to use the data fields as separate entities. Storing the whole log message as a single string was therefore out of  the question.

We ended up using Google Protocol Buffers as the format to transfer and store log messages. Protocol Buffers is a way of encoding structured data. The most important reasons for choosing this format were:

  • Speed: Deserialization speed is one of the most important factors when evaluating file formats. Especially Map/Reduce Jobs that crunch through the whole dataset stored in the HDFS rely an fast object deserialization.
    The project “thrift-protobuf-compare” is dedicated to comparing different Java serialization libraries. According to their benchmarks, Protocol Buffers is one of the fastest frameworks. Object deserialization with Protocol Buffers is 16 times faster than with pure Java serialization.
  • Size: Having to store billions of serialized objects, the size of the resulting data is another key factor. Again, the benchmarks of the thrift-protobuf-compare project show that Protocol Buffers produce among the smallest objects. Serialized objects are around 4 times smaller than those produced by the standard Java serialization.
  • Migrations: One unique feature of Protocol Buffers is the ability to change the file format without loosing backwards compatibility. It is possible to add or remove fields from an object without breaking working implementations. This is a very important feature when serializing objects for long time storage.
  • Platform- & Language-independent: Protocol Buffer objects may be accessed from multiple languages on any operating system. This feature allows us to use Protocol Buffers as the sole data format throughout the whole processing chain.

Compression and Seek-Position-Grouping

Log messages consist of textual data which can be compressed very efficiently. MapFiles and SequenceFiles both offer a transparent compression mechanism, which we are using in our solution. It is possible to compress each log entry individually or use a block level compression where multiple entries are compressed together.

Our log messages have an average size of 500 characters. Most of our messages are a lot smaller with the occasional stack trace or data dump pushing the average message size. Compressing each log message individually decreases the size down to around 80% of the original message size. This is not very efficient.

Using block compression, it is possible to drop the size down to around 20% of the original message size. Hadoops default setting uses a block size of 1 MB uncompressed data which in our case means around 2000 log messages are compressed together in a single block.

We had to realize that there is a downside to using block compression. With block compression each block has to be read completely and decompressed before a single entry may be accessed. This is not really a problem on its own, but combined with the seek positions stored in the index file it starts to be a problem.

Index files store the seek position of each n-th entry in the datafile. This means that in order to save memory for example only each 16th entry will be written in the index file. The seek position is the position where the data may be found in the compressed data file. If we have 2000 log entries stored in a single block and the position of each 16th entry is written to the index file, we have 125 entries with identical seek positions (all 2000 log entries start at the same block and have identical seek positions), which is a waste of memory. We call this the seek position grouping issue.

On the other hand when 2000 entries are found at the same seek position, we need to iterate over the entries stored in a block until we reach our requested entry. This seriously impacts random access performance.

Lessons learned

It is absolutely necessary to tweak the block compression size and the skip rate in the index file to find the optimal compromise between compression factor and the number of entries per block. The skip rate should be chosen so that each block has only one entry in the index file.

Before optimizations we needed around 10 seconds to read 100 random entries from HDFS. After optimizing the block size and skip rate the time for reading 100 random entries dropped to around 200ms which is a speed increase by a  factor 50. At the same time the compressed size only went up by 5%. Our sweet spot is a block size of 32 kB which means that we store around 64 log messages per block.

Now that the log messages are stored in HDFS, we need an efficient way to search for these messages. This will be the topic of the next article.