Hadoop and Spark shuffling
Shuffling is generally the most costly operation that we encounter in Hadoop and Spark processing. It has a huge impact on processing performance and can be a bottleneck in cases when our big data requires a lot of grouping or joining. That’s why I think it’s worth to spend a while to understand how shuffle is handled by both of those engines.
Shuffling is a process of data redistribution. Data is being sent through the network so that, as a result, it is grouped differently across the cluster of nodes. It is needed for all of the operations which require grouping, like calculating the number of occurrences of different words in a text book. Or finding the average sell outcomes of some company. In other words I may say that shuffling is a process of changing the data partitioning. We rearrange the data between the machines, by keys, so that the upcoming operations can be handled.
Spark and Hadoop have different approach implemented for handling the shuffles.
Hadoop implements so called Shuffle and Sort mechanism. It is a phase which happens between each Map and Reduce phase. Just to remind Map and Reduce handles the data which are organised into key-value pairs. Once the Mappers are done with the calculations, the results of each Mapper are sorted by the key in so called buffers. Once the buffers are filled up, the data spills on the machines’ local disks (this data gets removed as soon as the job finishes). Then it is straightly pushed to the Reducers (the shuffle phase), so that records associated with the same key end up in the same Reducer. This shuffling happens as soon as each Mapper finishes in order not to over-flood the network, which could happen if we waited for all of the Mappers to finish. Depending on the setup Reducers may start running before all Mappers have finished their jobs. When data gets to Reducers it is sorted once again. Results are then written to the HDFS or any other used filesystem.
In order to reduce the amount of data shuffled through the network we may define a Combiner function, which does the same calculations as Reducer but on the Mappers’ side, so before the data is being transferred. Role of the Combiner is to simply precalculate the data so that we send less over the network. However there is no way of controlling how many times and if at all Combiner will actually be used – Hadoop decides on that.
Hadoop has a default Shuffle & Sort mechanism which is based on alphabetical sorting and hash shuffling of the keys. However there is a way of implementing a custom mechanism by overwriting the following classes:
- Partitioner – according to which the data will be shuffled
- RawComparator – responsible for data sorting on the Mapper side
- RawComparator – which handles the data grouping on the Reducer side
Hadoop 2.9.0 also experimentally allows to replace the default Shuffle and Sort with other plugable versions.
Knowing the flaws of the MapReduce Spark creators implemented a modified Shuffling mechanism which got updated along the way. For the sake of comparison it is common to call in Spark the before shuffle phase – Map phase and after shuffle phase – the Reduce phase.
In Spark shuffling is triggered by some operations like distinct, join, coalesce, repartition or all *By and *ByKey functions.
First that was a Hash shuffle. So instead of sorting the data on the Map side, they were hash partitioned. Each Mapper created a file for every Reducer, containing the partitioned data portion. As a result many files were usually created (total #Mappers * #Reducers). The consolidated version merged the files coming from Mappers on the same machines, limiting the number of files to #Cores * #Reducers. The hashing was definitely faster than sorting but the number of files created a bottleneck (IO overhead, performance degradation). This sorting was fully decommissioned in Spark 2.0.
In Spark 1.2 the sorting was “restored” in Spark shuffling. In sort-shuffle for each Mapper 2 files are created – 1 with data which is ordered to resemble the Reduce-side partitioning (in-memory sorting). Second one is the index file, which holds the information about beginnings and endings of partitions in the data file. If during sorting data doesn’t fit the memory the spills are generated. They are just merged on the fly while being read by reducers. If map side combination is required then this data is not only sorted but also pre-aggregated by partitions. Then proper data part is easily fetched by just knowing the data positions in the blocks from the index files. If the operation requires that then the reduce sorting also happens.
As the approach is not effective when small amount of partitions needs to be created (which is set with spark.shuffle.sort.bypassMergeThreshold parameter – 200 by default) in such a case first data is hashed into separate files instead of sorting and then the files are merged together.
Tungsten shuffle-sort improvements were slowly introduced since Spark 1.4 in order to lower memory usage and Java object overheads during shuffle and to speed up the sorting. New so called Unsafe API (escaping java memory safety) was introduced, which allowed data hashing and comparison without the need of data deserialization. It uses its own off-heap binary memory management and takes advantage of the data schema. Data is managed without safety checks on the byte level with UnsafeRow format. Right now sort operates on serialized binary data rather than Java objects, which reduces memory consumption and GC overheads. Also spill merging is optimised – serialized and compressed spill partitions can be easily concatenated.
This improved shuffling is the only one available in Spark 2.2. So it means org.apache.spark.shuffle.sort.SortShuffleManager is the only ShuffleManager in Spark.
Unlike in Hadoop there is no overlapping copy phase (shuffle is a pull operation where in Hadoop data is pushed to Reducers).
|Hadoop shuffle||Spark shuffle|
|happens between Map and Reduce||is triggered by some operation|
|data gets sorted on Map side||data is hashed or sorted on Map side|
|data gets sorted on Reduce side||data may be sorted on Reduce side|
|data spilled on the disks||data spilled on the disks|
|results are pushed to Reducers||results are pulled by Reducers|
|tungsten improvements added|