-
Notifications
You must be signed in to change notification settings - Fork 10
The Design of NativeTask
Author: Binglin Chang (decstery@gmail.com)
NativeTask is a high performance C++ API & runtime for Hadoop MapReduce. Why it is called NativeTask is that it is a native computing unit only focus on data processing, which is exactly what Task do in the Hadoop MapReduce context. In other word, NativeTask is not responsible for resource management, job Scheduling and fault-tolerance. Those are all managed by original Hadoop components as before, unchanged. But the actual data processing and computation, which consumes most of cluster resources, are delegated to this highly efficient data processing unit.
NativeTask is designed to be very fast, with native C++ API. So more efficient data analysis applications can build upon it, like LLVM based query execution engine mentioned in Google's Tenzing. Actually this is the main objective of NativeTask, to provide a efficient native Hadoop framework, so much more efficient data analyze tools can be built upon it:
-
Data warehousing tool using state of the art query execution techniques existing in parallel DBMSs, such as compression, vectorization, dynamic compilation, etc. These techniques are more easy to implement in native code, as we can see that most of these techniques are implemented using C/C++: Vectorwise, Vertica.
-
High performance data mining/machine learning libraries, most of these algorithms are CPU intensive, involving lot of numerical computation, or have been implemented using native languages already, a native runtime permits better performance, or easy porting these algorithms to Hadoop.
From user's perspective, NativeTask is a lot like Hadoop Pipes: using header files and dynamic libraries provided in NativeTask library, you compile your application or class library to a dynamic library rather than executable program(because we use JNI), then using a Submitter tool to submit you job to Hadoop cluster like streaming or pipes do. Tutorials or manuals are not available yet, also you can read examples in src/main/native/examples.
- High performance, more cost effective for your Hadoop cluster;
- C++ API, so user can develop native applications or apply more aggressive optimizations not available or convenient for java, like SSE/AVX instruction, LLVM, GPU computing, coprocessor etc.
- Support no sort, by removing sort, the shuffle stage barrier can be eliminated, yielding better data processing throughput;
- Support foldl style API, much faster for aggregation queries;
- Binary based MapReduce API, no serialization/deserialization overhead;
- Compatible with Hadoop 0.20-0.23(need task-delegation patch)
That's the topic people most interested in, but before the explain technical details of NativeTask, the more appropriate question to begin with should be:
Does Hadoop fast enough?
Actually, No. It is common to see a well hand written C++ program to process 1GB data in just a few seconds, but it may take MapReduce task minutes to process the same data, and many research have shown that Hadoop MapReduce is not so efficient comparing to traditional parallel DBMS for analytical workloads.
On the other hand Hadoop does better at scalability and fault tolerance. Although it is not efficient enough, but I believe there is no technical limitations for Hadoop to get the same performance as hand written native programs. so:
How fast can it get?
Let's do some computation for this, for example, consider a commodity server:
Dell PowerEdge C2100
CPU: 2 * 6 core Xeon5600
Memory: 48GB
Disk: 12 * 2TB SATA
This server can run 12 tasks in parallel, each task use 1 core(2 thread), 4GB memory, 1 SATA disk. A typical map task data flow and it's ideal speed would be:
Read data from HDFS 100MB/s (data local task)
Decompression 700-2000MB/s ratio 2-5x (snappy or lz4)
RecordReader+Mapper 2000MB/s (LineRecordReader+IdenticalMapper)
Sort 300-600MB/s (varies a lot, faster if key/value are large)
Compression 250-500MB/s (varies a lot, depending on data type)
Write to local Disk 100MB/s (2000MB/s with page cache)
One thing to notice here is that with lightweight compression enabled, disk is not bottleneck any more, system throughput is more and more determined by the raw CPU costs.
So if all things are perfect, a map task should handle 1GB(250MB compressed) data:
Read + Decompression 2.5s
RecordReader+Mapper 0.5s
Sort 2s
Compression+Write 3s
Total 8s
So it is 1GB/8s = 125MB/s. Furthermore, for selection+filter+join/aggregation queries, sort is not needed, output size is much less than input size, and each core has 2 threads to use in one task, combine all these factors, it is possible to process 1GB data in just 3s, that's about 333MB/s. For the whole server, it is 12 * 333MB/s = 4GB/s. This means that in best conditions(totally balanced scheduling, perfect data locality, no slow node or failure), A 25 node cluster with 10GbE should:
- Complete 1TB Terasort in 58 seconds (27s map + 10s shuffle + 21s reduce), if input, map output, final output are all compressed (Terasort is an IO test by default and do not allow compression, but it can be served as typical MapReduce framework test).
- Answer an simple aggregation query against 1TB dataset in 10 seconds.
Sure there are lots of assumptions in the arguments above, but again there are no technical limits in every stage of the whole processing flow. With this processing throughput, it is possible to setup Hadoop based data warehouse at very low cost comparing to commercial data warehousing solutions, but with comparable performance. The server mentioned above cost about 10-20K$ per node, with 8TB(3replicaion)/24T(decompressed) capacity, that is 1-2K$ per core, 1-2K$/TB. With hardware cost continue to decrease, this cost will continue to drop.
Although this sounds amazing, but it is a long way to get there. Currently, a well written Hadoop map task can process 1GB data in about 40-120s, so it's 10-30MB/s, Hive/Pig tasks may take much longer time because their high level abstraction. Apparently it is far from the maximum possible speed(100-300MB/s). This leads to the next question:
Why Hadoop not perform well enough? How to improve?
Here are some top reasons(but not all):
-
I/O bottleneck. Most Hadoop workloads are data intensive, so if no compression is used for input, mid-output, and output, I/O(disk, network) could be a bottleneck.
The solution is use compression everywhere. Luckily there are amazing general lightweight compression algorithms out there: snappy & lz4, with 2x-5x compression ratio(actually much higher for Haodop workload data types), I/O bandwidth is virtually 2x-5x of real I/O bandwidth.
Another thing need to mention is high speed network, today's server are much powerful than a few years ago, with more and more cores and RAM per node, a server can run more tasks concurrently, so high speed network like 10~40GbE will become standard setup for Hadoop cluster, whether the current Hadoop network stack(jetty/netty based) can sustain such big throughput is also questionable. -
Inefficient implementation. This inefficiency lies everywhere:
-
Map side sort: current sort can be 10x slower than a well written sort, because current sort implementation suffer from cache locality problem and is not partition based. This will likely be improved in latest Hadoop version but it is still not optimal.
-
Serialization/Deserialization: this leads to inevitable object creation, lots of small buffer copies, heavy stream abstraction, primitive type boxing/unboxing, suboptimal compare operation, etc. Ser/Deser are overused both in MapRedcue framework level and query execution level(Hive/Pig), this is the main reason for Hadoop's poor data processing throughput. There are discussions for this long ago, but no progress yet. Here is my thought: at MR framework level a pure binary interface is enough & efficient for a query execution engine build upon it, or even more aggressive: don't use MR API, just use task input split & data redistribution utility(shuffle) provided by the MR framework; At query execution level, ser/deser is not necessary too, the most efficient way is to use some sort of schema to describe data, using C struct like binary representation to store data, then using LLVM to directly generate native code based on schema and logical query plan. This can leads to a big boost in processing throughput, Google has reported 6x-12x throughput boost using LLVM in Tenzing.
-
Shuffle: Hadoop 0.23 has done many optimization for shuffle(netty, batch fetch, etc.), but it can be further optimized(for example, shuffle in lastest Hadoop version still slower than Baidu's internal version). When sort is not needed, there are more optimizations to exploit. And sure there will be a lot of tuning work to fully utilize high speed Ethernet too.
-
Data locality. This is one of the main advantage of parallel DBMS over Hadoop, with advanced data partitioning, indexing, and sophisticated query plan, most data are processed locally and data movements are reduced to minimum. Hive have done some similar optimizations, but more can be done, also some optimization need more flexible computing model beyond MapReduce.
-
Scheduling & starting overhead. This has big impact on small jobs and multiple iteration jobs.
-
-
Inflexible programming paradigm. MapReduce is a very general data processing model, this gives it's strength, but also limits its performance. For some specific tasks, there are more efficient methods to adopt. There are many examples in Tenzing paper, also there are lots of research recently about improving query performance for MapReduce. Hive has done many optimizations on application level, but some framework level optimizations/interfaces are needed, such as hash-aggregation with no sort for aggregation queries, map-side join with dictionary-server, chained MapReduce job(combine reducer with mapper of the next MR job) etc.
These factors directly leads to the design principles of NativeTask:
-
Native implementation. I'm fully aware that java is very efficient, actually based on my experience, java is very efficient for normal tasks, and java has certain runtime optimizations techniques which are much more difficult for c/c++ to realize. For example, it is very difficult to do dynamic optimizations such as lock coarsening, virtual function inlining in C++. But there are some tasks/optimizations, which I believe are essential for this project, are better done in a native runtime: * Compression Nearly all the fastest compression algorithms are written in native code, Currently Hadoop uses JNI to call these libraries in a bulk processing manner, but still there are some overheads crossing JNI boundary, especially when decompression speed is very fast(>1GB/s). And some techniques like lazy decompression, direct operations on compressed data can not fit in bulk processing. * SSE/SIMD This is similar to compression, currently Hadoop use JNI to leverage SSE optimization such as CRC checksum. But again it is not a general solution. * LLVM As mentioned before, on of the main objectives of this project is to provide a native runtime to support high level query execution engine, it is almost certain that LLVM will be used. Because LLVM is a native C++ library, so C++ is more suitable.
-
Avoid serialization and memory copy. As mentioned before, serialization has a lot of overhead. To get maximum throughput, it is better to abandon serialization, or to introduce some
serialization method that can operate directly on serialized data, or to avoid object creation and memory copy. Again it is hard or not user friendly in java, but convenient and straightforward in native code, such as C struct like data representation. In addition, when the whole data flow is in native side(CRC checksum, decompression, reader, process, writer, compression, CRC checksum), a lot of small memory copies can be and should be eliminated. So the interface and underlying processing flow are designed to try to eliminate most memory copies. -
Keep it simple. This project mainly focuses on pure data processing, unlike typical distributed systems, there shouldn't be much complex things involved, such as multi-thread programming & synchronization, high level abstractions or complex system programming. For example, this project try to avoid asynchronized output collector, io stream abstractions and other complex things existing in current MapReduce design.
-
Less concern of compatibility. As mentioned before, the main objective of this project is to build high level data analysis tools/libraries upon this, the compatibility should be constrained in a higher level(such as query language level), while permitting more flexibility in the lower level, so we can experiment varies things on this. And the new MRv2/YARN framework permits us to experimenting new frameworks. Finally, since this project is in very early stage, lots of things will certainly go through radical changes during development.
NativeTask consists of two major parts: java side and native side. Java side is responsible to bypass normal java data flow and delegate the data processing to native side, and native side do the actual computation. Java side and native side communicate with each other using JNI, in a synchronized, batch processing(block based) way. This is different from other IPC mechanisms used in Streaming and [Pipes](http://hadoop.apache.org/common/docs/current/api/org/apache/ hadoop/mapred/pipes/package-summary.html). Sockets and pipes are fast enough for data processing, but they consume a lot of CPU and will introduce multi-thread programming and asynchronized processing.
To bypass normal java data flow, NativeTask introduces a task delegation interface, it will insert the bypassing logic into the beginning of MapTask and ReduceTask(needs modification to the current MapReduce source code). The bypassing logic will check whether a delegator is configured in JobConf, if there is, it will use the configured delegator to run the task, bypassing the original logic. The delegation interface looks like this:
MapTask: void run(TaskAttemptID taskID,
JobConf job,
TaskUmbilicalProtocol umbilical,
DelegateReporter reporter,
Object split)
ReduceTask: void run(TaskAttemptID taskID,
JobConf job,
TaskUmbilicalProtocol umbilical,
DelegateReporter reporter,
RawKeyValueIterator rIter)
For MapTask, split information is needed, currently only FileSplit is supported by native RecordReader. For ReduceTask, shuffle and merge is still done in java side unchanged, so RawKeyValueIterator is passed to delegator. A native implementation of shuffle and merge will certainly have better performance in the future. I have proposed another possible (and more general) solution [Extensible Task(MAPREDUCE-3246)](https:// issues.apache.org/jira/browse/MAPREDUCE-3246) to try to make task extensible, but in practice I found the delegation interface more convenient because there are still many works can't be done in native side right now. Anyway these are minor issues, since both are easy to refactor.
Currently delegation supports two modes of dataflow:
-
Native Mapper/Reducer only: compatible with existing InputFormat/OuputFormat and RecordReader/Writer, Key/Value pairs are passed to/from native side in batch. The dataflow of a typical MapTask:
RecordReader -> Serialize -> [DirectByteBuffer] -> Native Mapper -> Native Output Collector(Sort & Spill)
The dataflow of a typical ReduceTask:
RawKeyValueIterator -> [DirectByteBuffer] -> Native Reducer -> [DirectByteBuffer] -> Deserialize -> RecordWriter -
Native Mapper/Reducer with native RecordReader/Writer: currently InputFormat/OutputFormat still exist for input split and output commit, but RecordReader/Writer are native, so native task can implement RecordReader/Writer for read input or write output directly, yielding better performance and flexibility. The dataflow of a typical MapTask:
Input Split -> Native RecordReader -> Native Mapper -> Native Output Collector
The dataflow of a typical ReduceTask:
RawKeyValueIterator -> [DirectByteBuffer] -> Native Reducer -> Native RecordWriter
As described before, the java side and native side pass serialized K/V data in a block based batch processing pattern, rather than record based. This is because JNI calls have considerable overheads, batch processing can minimize the numbers of JNI calls. The block size is about 32KB~128KB, smaller than L2-cache.
The JNI based batch processing is implemented in Java class NativeBatchProcessor and native C++ class BatchHandler, the JNI stuffs are isolated in these 2 classes, so other part of the project needn't to deal with the complexity of JNI.
One problem of C++ is its lack of reflection, so it's difficult to setup mapper, reducer, record reader, writers in JobConf at client side and create them dynamically at task. Pipes uses static linking, unlike Pipes, NativeTask uses something more dynamic, a class library based structure. A typical application based on NativeTask consists of several dynamic libraries(as class libraries), for example:
[Task JVM]
|
delegation
|
|--load-> [libnativetask.so]
|--load-> [userlibrary.so]
|--load-> [application.so]
|
create native objects
|
run mapper/reducer
|
|----------------|
done()
NativeTask uses a little template tricks to realize a very simple equivalent of Hadoop's ReflectionUtils.newInstance(). Consider .so library as class libraries(like .jar files), every .so library have an entrance function to create C++ objects of the classes in this library. The dynamic library, libnativetask.so, is the NativeTask runtime, but it is also served as a class library, with some predefined Mapper/Reducer, Partitioner and RecordReader/Writer, such as IdentitcalMapper/Reducer, HashPartitioner, TotalOrderPartitioner, LineRecordReader/Writer, etc.
The drawback of dynamic linking is the poor ABI compatibility of C++, but since this is an open source project, and mainly target on Linux and homogeneous computing environment, and based on my experience in HCE(Hadoop C++ Extension), this is not a serious problem.
To minimize buffer copy, two light weighted io buffers are introduced: ReadBuffer & AppendBuffer, these are different from decorator pattern based java & Hadoop IO streams, ReaderBuffer & AppendBuffer are implemented to inline most frequently invoked methods, and add code path to avoid one buffer copy when supporting compression/decompression. This doesn't mean NativeTask don't use decorator based stream, but they are only used in batch mode, such as file read/write and CRC checksum.
It is much easier to add a compression codec in native code, currently snappy, lz4 and gzip have been integrated into NativeTask.
The dataflow and main logic of map/reduce task are almost the same as of the original implementation, the differences are the implementation details. The general difference is that the native implementation tends to be simpler and so is easy to be optimized, and the mapper/reducer, reader/writer API is designed to make zero copy possible.
This part contributes a lot of performance gains. As mentioned before, sort implementation of the current Hadoop is suboptimal. So a different partition based sort & spill method is used. The main components of for this method is described below:
Basically, map output collect is a partitioned key/value buffer, mapper emit
key/value pairs and then a partition number is generated using partitioner,
map output collect find a PartitionBucket to put this key/value pair to,
a PartitionBucket has a array of MemoryBlocks to hold KV pair, if the last
MemoryBlock is full, it will allocate a new MemoryBlock from MemoryPool,
if there is not enough memory in MemoryPool, a spill will be activated.
MemoryPool hold the buffer of size io.sort.mb, and track current buffer usage, notice that this buffer will only occupy virtual memory not RSS(memory really used) if the memory is not actually accessed, this is better than java because java initialize arrays.
MemoryBlock is small chunk of memory block backed by MemoryPool, used by PartitionBucket. The default size of MemoryBlock equals ceil(io.sort.mb / partition / 4 / MIN_BLOCK_SIZE) * MIN_BLOCK_SIZE, currently MIN_BLOCK_SIZE equals 32K, and the max size of MemoryBlock is 1M, it should be dynamically tuned according to partition number & io.sort.mb in the future. The purpose of MemoryBlock is to reduce CPU cache miss. When sorting large indirect addressed KV pairs, the sort time will be dominated by RAM random reads, so MemoryBlock is used to let each bucket get relatively continuous memory.
PartitionBucket stores KV pairs for a partition, it has two arrays: vector<MemoryBlock *> blocks blocks used by this bucket vector<uint32_t> offsets KV pair start offset in MemoryPool This vector is not under memory control(in io.sort.mb) yet, but in practice it doesn't affect memory footprint too much.
This approach will not work well when partition number & Key/Value size is large, but this is rare case, and it can be improved, just for example, we can use MemoryPool directly (disable MemoryBlock) if io.sort.mb/partition number is too small.
Since map output buffer is partitioned, we can sort each partition separately, this is different from java's single buffer approach. By doing so, sort can be much faster, because sort a big array is much slower than sort many small arrays; small array also means less cache miss; and partition number does not needed to be compared in sort. My test have shown 10x-20x speedup in sort performance.
Currently only binary comparator is supported, because it is efficient, and
enough for most applications, fix length key comparison and user defined
compare function maybe useful, they can be implemented in the future.
NO sort dataflow is easy to implement in the native map side, just do not sort each PartitionBucket, since combiner relies on grouping KV pairs together, so combiner is not supported in no sort dataflow, but combine can be done in mapper logic in many cases. Originally I plan to implement grouping dataflow that do support combiner, but after sort is optimized, there seems very little benefits to support grouping.
Since reduce side shuffle and merge is not implemented yet, no sort dataflow in reduce side is implemented in java. A patch is submitted to MAPREDUCE-3246 with both map and reduce side implementation.
Since map output KV buffer is partitioned, parallel sort and spill became possible, but this need some change to the original Hadoop code so I left this not implemented. For example, suppose a map task with reducer number of 100, instead of spilling to one file, we spill to one directory: output |- partition0-49.out |_ partition50-100.out then sorting, combining, spilling, compression can all be done in parallel, to fully utilize CPU resource and reduce task execution time.
Shuffle and merge are not implemented yet, so there nothing special. 2 new interfaces are introduced in combiner and reducer stage, so you can use mapper or folder interface in combiner and/or reducer stage. These two interfaces are both passive interfaces, which are suitable in no sort dataflow to implement aggregation style workloads. Mapper API is for user who want to manage their hashtable by themselves, Folder API is for users who want the framework to manage hashtable for them. This work is experimental and not finished yet.
To increase usability, A few classes are built into NativeTask library: LineRecordReader/LineRecordWriter IdenticalMapper/IdenticalReducer HashPartitioner TotalOrderPartitioner More Reader/Writers will be added, to support other Input/OutputFormats such as SequenceFile and RCFile.
I also implemented Terasort & Wordcount, bundled with NativeTask library, to make performance test easier.
There is an example in the "example" directory, a simple version of Hadoop Streaming, to illustrate a relatively complex demo.
There are quite some utility classes missing in C++ comparing to Java, I have to re-implement them, such as synchronization utils, process & pipes, random generator etc. Some of them are copied and modified based on JDK and google-leveldb.
This project use a lot of open source projects from google: snappy, gtest, cityhash, leveldb, probably sparsehash for hash aggregation implementations in the future. Another project is LZ4, I'm quite impressed by its simplicity and amazing speed.
I tested hadoop-1.0 and NativeTask using simple MapReduce applications: Terasort and WordCount, on a 15 node cluster.
The test cluster has 16 nodes connected by 1Gb Ethernet, each node has:
CPU: Xeon(R) CPU E5645 * 2, 2.4GHz, 12 core, 24 thread
Memory: 32GB
Disk: 12 * 1T SATA
JDK: 1.6 u23
Map Task: 7
Reduce Task: 7
I use Hadoop version 1.0 patched with task delegation patch. The namenode and jobtracker are deployed on the save node, datanodes and tasktracker are deployed on the other 15 nodes. So the whole cluster has 105 map slots, and 105 reduce slots. Block size is configured to 256MB.
The NativeTask library is compiled by gcc version 3.4.5, because it is the only available compiler in the test environment, this compiler is very old and probably generate bad native code. Actually on my own computer Macbook Pro with gcc version 4.2.1 (Apple Inc. build 5659), the result is much better(50%-70% faster), the CPU of my computer is Intel Core i5 2.3GHz, it should have similar performance with Xeon E5645. Anyway I suggest anyone who is interested to compile the code and run on their own environment, and let me know. I don't think I will have resources and time to do large scale tests recently :(
Standard Terasort is actually an IO test and don't allow compression, but for the
purpose of this experiment, to evaluate the data processing throughput, snappy
compression is used in input, mid-output and final output, this actually moves
the bottleneck from disk and network IO to CPU. This test focus on pure
framework performance, key/value is passed directly in mapper and reducer,
without object creation and copying.
WordCount is a simple aggregation workload, and their are some computation in
application level. The original WordCount demo implementation is inefficient,
involving lots of type cast, object creation and copying. I make an optimized
version using the same implementation in NativeTask, both test results will
be included.
Hera are some characteristics of terasort and wordcount:
Terasort | WordCount | |
---|---|---|
Key value size | 100 | 8-16 |
Combiner | No | Yes |
Input | 200G(44G compressed) | 100G(52G compressed) |
MapTask | 200(1G/task) | 200(500M/task) |
ReduceTask | 200 | 100 |
Compression Ratio | about 0.2 | about 0.5 |
Input/Output | 1:1 | 1:0(almost) |
Input data generation commands:
Terasort
bin/hadoop jar hadoop-examples-1.0.1-SNAPSHOT.jar teragen 2000000000 /tera200G-snappy
WordCount
bin/hadoop jar hadoop-examples-1.0.1-SNAPSHOT.jar randomtextwriter -Dtest.randomtextwrite.total_bytes=100000000000 -Dtest.randomtextwrite.bytes_per_map=500000000 -outFormat org.apache.hadoop.mapred.TextOutputFormat /text100G-snappy
Tests execution commands:
Terasort Java
bin/hadoop jar hadoop-examples-1.0.1-SNAPSHOT.jar terasort /tera200G-snappy /terasort200G-java
Terasort NativeTask
bin/hadoop jar lib/hadoop-nativetask-0.1.0.jar terasort /tera200G-snappy /terasort200G-nt
WordCount Java
bin/hadoop jar hadoop-examples-1.0.1-SNAPSHOT.jar wordcount /text100G-snappy /wordcount-100G-java
WordCount Java Optimized
bin/hadoop jar hadoop-examples-1.0.1-SNAPSHOT.jar wordcount -Dwordcount.enable.fast.mapper=true /text100G-snappy /wordcount-100G-java-opt
WordCount NativeTask
bin/hadoop jar lib/hadoop-nativetask-0.1.0.jar -reader NativeTask.LineRecordReader -writer NativeTask.TextIntRecordWriter -mapper NativeTask.WordCountMapper -reducer NativeTask.IntSumReducer -combiner NativeTask.IntSumReducer -input /text100G-snappy -output /wordcount-100G-nt
Terasort
Terasort 200G(io.sort.mb=1200M, no merge) 200Map,200Reduce | Total Time(s) | Map Avg(s) | Map Best(s) | Sort(s) | Shuffle Avg(s) | Shuffle Best(s) | Reduce Avg(s) | Reduce Best(s) | Map CPU(ms) | Reduce CPU(ms) | Map Memory(M) | Reduce Memory(M) |
---|---|---|---|---|---|---|---|---|---|---|---|---|
java | 220 | 51 | 47 | 23.336 | 31 | 20 | 20 | 14 | 10357020 | 11466330 | 292001 | 338160 |
native | 139 | 15 | 14 | 3.476 | 30 | 20 | 17 | 11 | 295510 | 10595440 | 259581 | 336060 |
ratio | 1.583 | 3.4 | 3.36 | 6.71 | 1.03 | 1 | 1.176 | 1.273 | 3.504 | 1.082 | 1.125 | 1.006 |
WordCount
WordCount 200G(io.sort.mb=300M) 200Map, 100Reduce | Total Time(s) | Merge Segments | Map Avg(s) | Map Best(s) | Sort(s) | Shuffle Avg(s) | Shuffle Best(s) | Reduce Avg(s) | Reduce Best(s) | Map CPU(ms) | Reduce CPU(ms) | Map Memory(M) | Reduce Memory(M) |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
java | 266 | 5 | 124 | 117 | 45 | 8 | 8 | 1 | 1 | 25324990 | 410990 | 211082 | 21153 |
java optimized | 243 | 5 | 112 | 95 | 46 | 8 | 8 | 1 | 1 | 22909200 | 412430 | 104078 | 21054 |
native | 55 | 4 | 17 | 16 | 5.52 | 8 | 8 | 1 | 1 | 3287460 | 443890 | 104350 | 21706 |
ratio | 4.42 | - | 6.59 | 5.93 | 8.33 | 1 | 1 | 1 | 1 | 6.869 | 0.939 | 0.997 | 0.970 |
There is a lot of performance gains in map tasks, this is because it is all native, and it has a relatively efficient implementation of sort and spill. The speedup is higher in WordCount than in Terasort, this is because the KV size for terasort is much larger than wordcount, so there are more records processed in WordCount for the same amount of input, the framework has some constant overhead for each record, and sort performance is related to record count, so the small the record is, or the more records there are, the more speed advantage NativeTask will have.
Reduce side does change much, about 8% in Terasort test case. This is because reduce side shuffle and merge are still done in java, shuffle and merge take most CPU resource and task execution time in reduce task; and there are extra serialization overheads when crossing JNI boundary. After shuffle and merge are implemented, or maybe just merge, similar(perhaps smaller) performance gains are expected.
As mentioned before, the shuffle implementation is suboptimal in hadoop-1.0, although the current trunk version has improved shuffle performance a lot, it still can be optimized. Finally, this test environment only use 1GbE network, we can get better whole job speedup if high speed networks like 10GbE is used.
As I said before, The NativeTask library used in the experiment is probably suboptimal. For example a native wordcount task unittest runs about 11s on my laptop, and 16s on test environment, a native terasort task unittest runs about 9s in my laptop, and 14s on test environment. Here are some logs generated by the tests:
On my laptop:
12/01/04 17:35:30 INFO Native Mapper with MapOutputCollector, RecordReader: NativeTask.LineRecordReader Combiner: NativeTask.IntSumReducer Partitioner: default
12/01/04 17:35:33 INFO Spill 0 [0,100) collect: 1.515s sort: 1.192s spill: 0.227s, record: 12841142, key: 1000, block: 400, size 17855, real: 18895
12/01/04 17:35:36 INFO Spill 1 [0,100) collect: 1.226s sort: 1.154s spill: 0.223s, record: 12778865, key: 1000, block: 400, size 17855, real: 18907
12/01/04 17:35:39 INFO Spill 2 [0,100) collect: 1.463s sort: 1.167s spill: 0.224s, record: 12748890, key: 1000, block: 400, size 17855, real: 18894
12/01/04 17:35:40 INFO Sort 3 [0,100) time: 0.699
12/01/04 17:35:41 INFO Merge 4 segments: record 0, key: 1000, size 17855, real 18958, time: 0.383
On test environment:
12/01/04 15:54:56 INFO Native Mapper with MapOutputCollector, RecordReader: NativeTask.LineRecordReader Combiner: NativeTask.IntSumReducer Partitioner: default
12/01/04 15:55:01 INFO Spill 0 [0,100) collect: 2.426s sort: 1.557s spill: 0.352s, record: 12841142, key: 1000, block: 400, size 17855, real: 18895
12/01/04 15:55:05 INFO Spill 1 [0,100) collect: 2.097s sort: 1.507s spill: 0.287s, record: 12778865, key: 1000, block: 400, size 17855, real: 18907
12/01/04 15:55:09 INFO Spill 2 [0,100) collect: 2.077s sort: 1.506s spill: 0.399s, record: 12748890, key: 1000, block: 400, size 17855, real: 18894
12/01/04 15:55:11 INFO Sort 3 [0,100) time: 0.951
12/01/04 15:55:11 INFO Merge 4 segments: record 0, key: 1000, size 17855, real 18958, time: 0.491
One the other hand, the same java task unittests run about the same speed on my laptop and on test environment. So it is very likely a compiler issue, excluding this factor, NativeTask should have extra speed advantage, about 40%-60%.
Generally, NativeTask outperforms original MapReduce framework, about 3x-7x for map task, 1x-1.1x for reduce task, 1.5x-5x for whole job. If the compiler hypothesis has some truth, the speedup could be 4.5x-12x for map task, and the speedup should be larger correspondingly. The main reason for NativeTask's high performance are avoiding serialization, avoiding heavy abstraction, better usage of compression, and speed advantage of C++ over Java. Since this project is in very early stage, I expect more improvements in the future. As mentioned before, it is possible that the throughput for a single map task can reach 300MB/s, currently NativeTask is about 50-100MB/s, so there is space for improvement. NativeTask only addresses some aspects of Hadoop's inefficiency, other aspects like shuffle, data locality, schedule & startup overhead are not the scope of this project, but may become dominate factors in some workloads. These aspects are better to be addressed in a higher level, such as data warehousing tools like hive, or BSP workloads like giraph. The next step of this project will be to integrate no sort dataflow, support folder API, implement reduce shuffle and merge, parallel sort and spill. Again, the main objective of this project is to provide a efficient native Hadoop framework, so much more efficient data analyze tools can build upon it, with the same performance of commercial systems.
I am thinking a modified version of hive, which transform its physical query plan to LLVM IR, then run on top of NativeTask. According to Google's tenzing paper, and current status of Hive and NativeTask, an 10x speedup for Hive is entirely possible, and with more advanced techniques already exist in commercial databases, it possbile to reach comparable performance of commerical data warehousing products.
Another possible direction is Hadoop distribution for single fat node or very small cluster. Most analytical workloads are TB scale for small companies, only a few large companies really need to scale to PB scale, with manycore processors and very dense disk storage, a commodity server in the near future can have the same computing power and capacity of today's small Hadoop cluster, a single fat node Hadoop can perform many optimizations which are impossible in distributed mode. No network bottleneck, data can be shared directly, combine the performance boost of NativeTask, small workloads won't need a cluster to run anymore. In the future, perhaps every data analyst can use Hadoop to analyze TBs of data only with their computer, and if he or she wants more processing power, just connect to cloud and submit your same Hadoop application unchanged.
If anyone have similar thoughts and want to start open source projects or realize them in existing projects, please let me know:)
For more information about vectorization or dynamic compilation:
- Efficiently Compiling Efficient Query Plans for Modern Hardware
- MonetDB/X100: Hyper-pipelining query execution
A interesting article about future hardware trend and programming model: