Lately I have been doing a lot of work using Microsoft’s Hadoop offering, HDInsight. I suspect a lot of people who read my blog are unfamiliar with what Hadoop actually is so I thought I’d recount a recent test I did that exhibits the scale-out nature of processing data on Hadoop.
The test used a mapreduce job (written in Java) to process an incoming CSV file and load it into an HBase database on that same cluster (HBase is a NoSQL data store, for the purposes of this blog post that’s all you need to know right now). Mapreduce is a technique for doing data processing on Hadoop’s scale out architecture where incoming data is split according to some expression over the incoming rows and then rows with the same result of that expression are combined using some sort of aggregation. The splitting and aggregation can be done on multiple nodes in the Hadoop cluster and hence I like to refer to mapreduce as distributed GroupBy.
The actual work that our mapreduce job is doing isn’t important, but for your information its combining many rows of data pertaining to a single customer into a JSON document and loading that document into HBase. Upon successful completion of the job HBase contained 3125000 JSON documents.
I loaded 1.6GB of raw source data (62.5m rows) into HBase using HDInsight, the quickest I was able to complete this was 34m2s.The key to speeding up our throughput was to (a) use a bigger HDInsight cluster and (b) split our input data into multiple files thus forcing the processing to be distributed over more map tasks. With more performance tuning (aka scaling out to more HDInsight nodes) I am confident we get this much lower.
Note that it is possible to specify the number of tasks that the map phase uses rather than Hadoop guessing how many it should use, for this test I chose not to specify that. In other words, splitting the incoming data over multiple files is not a necessity, it was just a trick I pulled to affect the mapreduce job.
I generated a 1.6GB (1,677,562,500B) file containing 62 500 000 rows. On the first run I used an HDInsight cluster that had 2 worker nodes. The entire mapreduce job took ~1hr13m50s. The map phase took ~58m, reduce phase took ~1hr6m (so clearly they overlapped – that is because the reduce phase starts as soon as the first map task completes and as you will see below the map tasks completed at wildly different times).
Even though the reduce phase took longer its actually the map phase which caused the long execution time. To try and pinpoint why it took so long I dove into the logs that Hadoop produces. Unless you tell Hadoop otherwise it determines how many tasks it should spin up in the map phase and in this case it determined it needed 4 map tasks:
I’ve highlighted the elapsed times for each, note the 4th is much lower. This would explain why the reduce phase took so long, it started as soon as the first map task completed but then had to wait ~52minutes until all the other map tasks were complete.
Each one of those tasks has its own task log and from those task logs I found the following information:
Processing split: wasb://firstname.lastname@example.org/nls/20141009T130423423_20140101000000_usuals_1_0/input/usuals_1_0_20140101000000.csv:0+536870912
Processing split: wasb://email@example.com/nls/20141009T130423423_20140101000000_usuals_1_0/input/usuals_1_0_20140101000000.csv:536870912+536870912
Processing split: wasb://firstname.lastname@example.org/nls/20141009T130423423_20140101000000_usuals_1_0/input/usuals_1_0_20140101000000.csv:1073741824+536870912
Processing split: wasb://email@example.com/nls/20141009T130423423_20140101000000_usuals_1_0/input/usuals_1_0_20140101000000.csv:1610612736+66949764
The numbers at the end represent the byte ranges that each task is processing (the first one starts from byte 0 as you would expect). Notice the last one (1610612736+66949764). That means it is starting from byte 1610612736 and processing the next 66949764 bytes. Given that task is the 4th task of 4 it shouldn’t surprise you to know that if you add those two numbers together they come to 1677562500 which is exactly the same size as the input file. In other words, the logs tell us exactly what we should expect, that the input data has been split over the 4 tasks that it deemed necessary to process this file.
Notice that the first 3 tasks processed 536 870 912B, the 4th processed only about 12% of that, 66 949 764B. This would explain why the 4th task completed so much quicker than the others. The data has not been split evenly, and clearly that’s a problem because one of the map tasks completed so much quicker than the others which ultimately means the reduce phase has to sit around waiting for all the data – the uneven split of the data has caused inefficient use of our cluster.
We can infer some things from this:
- The less data that a task has to process, then the less time that task takes to complete (pretty obvious)
- If we can increase the number of tasks, the data will be distributed more uniformly over those tasks and they should complete much quicker (and in roughly the same amount of time) due to having less data to process and less competition for resources.
Thus I ran the same test again changing only one variable, the number of nodes in the HDInsight cluster – I increased it from 2 to 20. I hoped that this would increase the number of map tasks. Unfortunately the job failed (my fault, I left some output files lying around from a previous run and that caused a failure) however it got as far as completing the map phase which is pretty much all I cared about:
As you can see there were still only 4 tasks and they actually took longer. So, we didn’t achieve more tasks and thus we didn’t speed the job up. That’s not good. I can’t explain right now why they actually took longer. The same number of tasks (4) distributed over a greater number of nodes (20) would, you would think, be slightly quicker due to less resource contention. Bit of a weird one that and I can’t explain it right now.
I wondered if splitting the input file into lots of smaller files would make a difference so I split that file into 20 equally sized smaller files and ran the job again on the 2-node cluster. This time we got 20 tasks:
Which is great, however the map phase failed due to out-of-memory issues:
So, I uploaded those same 20 files to the 20node cluster and ran again. We got 20 tasks in the map phase and, thankfully, this time they all completed successfully. The entire job (map + reduce) completed in 34m2s (less than half the time taken on the 2node cluster when loading the single file), the map phase completed in 10m34s, reduce phase took 24m46s. The overlap there is only 1m18s and that’s because the durations of the map tasks were more uniformly distributed due to the data being separated over more tasks. Here are the 20 map tasks with durations:
That has been a bit of a braindump but I figured it might be interesting to anyone starting out on the path of doing data processing on Hadoop. Please post questions in the comments.
UPDATE, Thanks to Paul Lambert I’ve found this very useful primer on attempting to artificially set the number of mappers and reducers: HowManyMapsAndReduces