The Big Data technology landscape is expanding. Choosing the right frameworks from a growing range of technologies is increasingly becoming a challenge in projects. Helpful for the selection process are performance comparisons of frequently used operations such as cluster analyses. This is exactly what a cooperation between mgm and the University of Leipzig has been about. In the context of the master thesis “Skalierbares Clustering geotemporaler Daten auf verteilten Systemen” (“Scalable clustering of geotemporal data in distributed systems”) of Paul Röwer at the chair of Prof. Dr. Martin Middendorf for parallel processing and complex systems, the k-means algorithm has been implemented for four open source technologies of the Apache Software Foundation — Hadoop, Mahout, Spark, and Flink. Benchmarks have been carried out which compare runtime and scalability.
Cluster analysis is a process for grouping unstructured data without extensive preliminary considerations. Geotemporal cluster analysis groups objects on the basis of spatial and temporal similarities. Two key challenges come about in the context of Big Data:
- The cluster analysis has to be doable in an acceptable period of time — especially in terms of varying, potentially very high data traffic and appropriately provided computing power. Therefore, distributed, highly scalable algorithms are necessary.
- The clustering of temporal geodata is a three dimensional optimization problem. In addition to longitude and latitude, the timestamps of data points have to be considered. Adding more factors like temperature further complicates the cluster analysis.
Implementations of the k-means algorithm in Big Data frameworks
The k-means algorithm belongs to a group of partitioning cluster techniques, which are based on minimizing variance. Due to its linear complexity and adaptability it is well-suited for Big Data scenarios. Modifications such as k-means++ or Streaming k-means (PDF paper) provide further possible uses. The following section outlines the adjustments of the k-means algorithm for the frameworks compared in the benchmark.
In order to use the k-means algorithm with Hadoop, it has been parallelized by means of MapReduce. The cluster heuristic of k-means has been segmented into two parts — the reassignment of points to centres and the recalculation of centres. W. Zhao et.al. describe all necessary steps in the paper “Parallel k-means clustering based on mapreduce“.
Apache Mahout has been a top level project of the Apache Software Foundation since 2010. It ranks among the most comprehensive machine learning libraries. Implementations of the libraries’ algorithms rely on Apache Hadoop and MapReduce. By now users can alternatively choose the programming model of Apache Spark.
Mahout already contains an implementation of the k-means algorithm. In order to use it, the object type has to be converted to Mahout’s three dimensional vector format. This might prove difficult in use cases such as the clustering of text. Adjusting the distance function however didn’t present any difficulties.
In Memory-Technologies represent the next step in the direction of better performance. The designers of Apache Spark promise computation times that are hundred times faster compared to Apache Hadoop’s MapReduce. In contrast to MapReduce implementations, which have to reload the input data from disk in every iteration of the cluster analysis, in-memory techniques keep the required data in memory during the whole computation process.
Apache Spark already provides a library for machine learning. In addition to the standard k-means it contains a variant of k-means++ which offers better performance and quality of results due to its optimized seedings. Up to now it is limited to clustering in Euclidean space however. Therefore the algorithm had to be newly implemented for cluster analysis on geotemporal data. Spark’s library is still in an early stage of development however and the required interfaces for adjusting its algorithms might be delivered in the near future.
The Apache Flink project is the most recent technology presented here. Similar to Apache Spark it belongs to the area of in-memory-computing. The Flink framework features a functional programming model and facilitates massive parallel data analyses. The concepts of bulk- and delta-iterations enable Apache Flink to process iterative data analyses efficiently.
Bulk- and delta-iterations are jump- or step-functions which refer to the latest iteration’s state of the program flow until a certain break condition is reached. The delta operates on the data set or finished intermediary solutions. The bulk iteration operates only on partial solutions.
To date there is no library of the k-means algorithm in the Apache Flink project. Consultation with the developers, however, revealed that an implementation is work in progress. The source code which hasn’t been published officially has been used for an implementation of the k-means algorithm as a basis for our benchmark.
Evaluation: Clustering Benchmark
The purpose of the evaluation has been to examine both scalability and performance of the k-means algorithm using the introduced Big Data technologies. At first, the test data and the computer network which have been utilized are being described. After that we round up the experiments and discuss the results.
The test data originates from the event database of the GDELT project (Global Database of Events, Language and Tone). It is available in a variant of the CAMEO format (Conflict and Mediation Event Observations) — an ontology for describing current events.
During inspection the raw data turned out to be inconsistent. Therefore, it was necessary to extract the consistent data sets and pack them anew. The selected data sets have been written out in the CSV format and stored in HDFS. In addition to essential attributes like latitude, longitude and time stamp, the data sets contained additional event information. For our evaluation purposes however, only the three mentioned attributes have been used.
For executing the runtime measurement a benchmark had to be designed. Since empiric runtime measurements depend on the executing system, here’s a detailed overview of all relevant Hard- and Software:
|Number of Computers in System||3|
|Hardware per Computer||I7 Processor (8 Core) with 3,4GHz, 16GB RAM, 256GB SSD harddrive|
|Operating System||Ubuntu Linux 14.10 (64Bit)|
|Distribution||Cloudera Hadoop in Version 5.4.4|
|Frameworks||Apache Hadoop 2.6.0, Apache Mahout 0.8, Apache Spark 1.3.0, Apache Flink 0.9.0|
The memory configuration of the computer clusters for the respective technologies complements the system’s configuration:
mapreduce.map.memory.mb 4GB mapreduce.reduce.memory.mb 3GB mapreduce.map.java.opts.max.heap 3584MB mapreduce.reduce.java.opts.max.heap 3GB yarn.nodemanager.resource.memory-mb 12GB yarn.scheduler.minimum-allocation-mb 3GB yarn.scheduler.increment-allocation-mb 512MB yarn.scheduler.maximum-allocation-mb 12GB yarn.scheduler.maximum-allocation-vcores 32
|Apache Spark Submit||
- -master yarn-cluster - -executor-memory 6485M - -total-executor-cores 16
|Apache Flink Cluster on YARN||
-n 3 (number of YARN containers) -jm 1024 (memory for job manager container in MB) -tm 3072 (memory for task manager container in MB)
The experiments aimed at collecting data about runtimes and their standard deviations. On this basis, assessments about the scalability of the selected technologies can be made.
Setup and Configuration
The benchmark consists of the following program sequence: The k-means algorithm creates the initial setting of the cluster centers (Seed) during the first execution. After termination of this first cluster analysis another run with the newly generated seed is initiated. All frameworks carry out the cluster analysis using the same initial centers and input data in order to guarantee comparability.
In all executions of the k-means algorithm, the k parameter is set to ten and the number of iterations is limited to 30.
A note on Spark: Measuring the runtime for Apache Spark hasn’t been possible for the largest amount of test data due to memory shortages. Therefore the highest limit has been figured out: 30 million datasets.
A note on Mahout: Apache Mahout didn’t accept the given split size of 128 MB which ensures that additional containers will be started from a certain number of inputs. Measuring the runtime in Mahout has therefore only been possible for up to one million datasets.
The runtime measurements of the k-means implementations have been carried out with six different datasets. The collected time values have been averaged in the end.
In contrast to the general consensus that in-memory technologies provide better runtimes, the results show that with Apache Spark this is only the case if enough memory is available. The statement is true, however, for Apache Flink. Due to the combination of memory and hard drive storage Apache Flink is performing best from a certain point.
For exploring the scalability of the four Big Data technologies we take the natural logarithm of the time scale of the diagram in figure one.
All four Apache technologies scale nearly linear. The rise of both graphs of the in-memory technologies, however, differs from the MapReduce variants. The graph of Apache Spark and Apache Flink show a steeper rise than the graphs of Apache Hadoop MapReduce and Apache Mahout.
The runtime and standard deviations indicate how reliable applications — which have been developed with these frameworks — will perform in terms of time. In order to guarantee identical inputs it is necessary that all k-means iterations of a framework operate on one and the same initial configuration. We used 30 iterations and 10 million data points for this analyzes (except for Mahout where we had to use 1 million).
The calculated results for all frameworks are summarized in the following table:
|Apache Hadoop MapReduce||0,27%|
Taken as a whole, all standard deviations amount to about one per cent, which is quite positive.
In summary, the runtime measurements and the behaviour in terms of resource requirements and scalability revealed the following: MapReduce scales flatter than the in-memory technologies. Examining linearity by means of a big input set has still to be done. From a programmers point of view the self-made implementation with MapReduce provides better adaptability and transparency of the program flow compared to using the ready-made implementation of Apache Mahout.
When it comes to Apache Spark and Apache Flink the experiments showed that Flink supports the temporary transfer of data to disk. In case of memory shortages Apache Flink is still able to carry out the requested computations while Apache Spark 1.3 is not. The Apache Spark project, however, is working on a similar solution which is included in all version after 1.5. Apache Flink performed best in terms of runtime and scalability in the experiments carried out.
If you look at the results in a larger context, the difficulty of choosing the right technology remains. Even if MapReduce offers the best scalability, its computations are clearly slower compared to using Spark and Flink. But even between these two rivals, there is no clear winner.
Flink can use the harddrive for executing computations which don’t have to be held in RAM exclusively. The native integration in distributions like Cloudera, Hortonworks and MapR, however, is not very advanced yet. Spark’s integration, however, is much better, but it misses the ability to process data which doesn’t fit into RAM.
It will prove very interesting to further observe the technologies and to see whether Flink brings forward its integration into the distributions or if Spark brings forward its processing of larger amounts of data faster. In any case, there is still need of capable software architects for adapting the algorithms to the features of the respective technologies.
Geotemporal clustering has proved to be very potent for our benchmark. On the one hand, frameworks can be observed and analysed over a long period of time at high load. On the other hand, k-means is a simple process which can be easily controlled and is rapidly realizable and adaptable to different scenarios in the respective frameworks.