Spark performance tuning
Spark job performance is heavily dependent on the sort of task you aim to accomplish and data you’re dealing with. Because of that there is no one magic recipe to follow when creating a job. However there are several things that impact a job execution. Those which I consider are:
- file format selection
- small data files problem
- data caching
- proper partitioning
- limited shuffling
- efficient joins
- limited UDFs
- DataFrames instead of RDDs
- Catalyst optimizations
- session options
Spark job usually starts and ends with data files being read/written to a filesystem. Because of that selection of a proper data format may significantly impact the execution time. Columnar data formats proved to be efficient for processing of big data sets. Most popular among them is Parquet.
Parquet is a binary, column oriented format. It is effective for processing of big files due to its nature and metadata, which it stores. Each data chunk comes with statistics, which allow Spark to skip reading some of the unnecessary rows or columns (so called push-down optimizations). As a result it speeds up data reading, when compared to row oriented data formats. On the other hand, because of those metadata writing takes longer time, as those stats need to be calculated. But as most of Spark installations run on HDFS, which is a write once – read many times filesystem, it is in most cases not a problem. Additionally Parquet supports flexible compression options like Snappy or Gzip.
Since a while now ORC data format is getting more and more popular. At the beginning it was rather more optimized to work with Hive, than Spark. Even so it was introduced to Spark 2.3 and with each version looks more and more promising. It already outperforms Parquet in few scenarios. Additionally it provides more lightweight compression – created files are smaller than Parquet files. It also supports ACID operations. With each data update delta directories are being created and when queried, the newest deltas are incorporated into the results. Big drawback of ORC is that not all of the other big data tools support it still. But definitely a format worth considering.
With Spark 3 there also came the so called Delta lake, which is a storage layer adding functionality on top of Parquet. It provides ACID transactions and allows for data versioning. It aims to give you an experience as if big data was stored in a relational database. You can check my previous post about Delta lake, from back when it was announced.
Selection of proper data format for the job is important. Data scientists like to work with CSVs, which is very effective when files are rather small. However the bigger the data, the more advantages of using one of the columnar formats.
HDFS is not a fan of small data files and so isn’t the Spark. Each file eats up 150 bytes of the NameNode’s memory. Additionally, the more files you read the more partitions Spark is going to create (one file = one partition). That means more executors need to run to perform the job. Moreover, starting executors for small files may take more time than the calculations themselves. Ideally, target file should be approximately of size of a multiple of the HDFS block (which is 128MB by default).
So simply check the files with which you work and consider merging them if they are small or if it takes lots of time for Spark to read them. That also applies to the files building up the DataFrame tables (i.e. in Hive metastore).
Data caching may improve the job performance. That applies to the situation when you aim to use some intermediate calculations for other processing. As each action triggers Spark computations, having several of those may cause multiple jobs to perform the exact same transformations before they arrive to the final results. Caching data allows Spark to reuse what already has been calculated. You can decide where the data should be stored – in memory or on the disk. However, if you cache in memory, whatever does not fit into it, will still be recomputed. In such a case consider caching on the disk.
Mind that it is not a good idea to cache too often though. Too many data writes may kill the performance, if you cache on the disk. When cached in memory, only the latest stuff will be kept anyway, if you use up all the memory space available.
Spark can run 1 concurrent task for every data partition (up to the number of cores). The more partitions there are, the more executors need to run. On one hand parallel execution is exactly what we’re looking for while using Spark. So from this perspective, the more the better. However having too many partitions may cause as many problems as having too few. Lots of partitions may mean having small or even empty ones. For those starting the executors may take more time than the processing itself. Also depending on the operations applied you may end up with lots of shuffles, which is generally not desired.
How many partitions is optimal then? There is no one answer to that. This is a trade-off that needs to be established for each case separately, by looking at the execution times of each task and partition sizes. If some executors take significantly more time to finish than the others it is a clear indication that partitioning should be changed. In such a case your data is most likely skewed and it may be worth to consider adding some additional column, just for the sake of data partitioning.
Partitioning can be also changed using coalesce or repartition functions. First one reduces the number of partitions, second one redistributes same keys to the same partitions and hence implies data shuffling. Mind that coalesce is being pushed down to as early as possible, so make sure that it changes partitioning in exactly the place you aim for. It’s always worth to check job physical execution plan to establish proper balance among the partitions.
It’s also good to know that in Spark 2 default number of partitions after shuffling for DataFrames is 200 (spark.sql.shuffle.partitions parameter), regardless the amount of data. That was changed in Spark 3, where automatic reduction is supposed to be enforced, when needed.
Shuffling is the most costly operation in Spark. It means redistributing data across the cluster nodes, so that it is grouped differently across the partitions. It is triggered by *By or *ByKey operations and transformations like distinct, join, repartition or coalesce. Two things add up to the cost of shuffling. Firstly data is being written to disk before it is being transferred. Secondly, data may need to be sorted before and after shuffling, depending on the amount of data and following operations. For that reason consider structuring your job in such a way that shuffles are avoided as much as possible. For example use coalesce instead of repartition when you want to reduce number of partitions, as coalesce may not trigger the shuffling, whereas repartition almost always does. It’s also good to reduce the amount of data before it is transferred.
Data joins can also be costly due to the shuffling that they imply. In Spark 2 there are 2 joining strategies that can be used – merge join and broadcast hash. First one is the default one, which shuffles both tables so that the same data keys end up on same data nodes. Second one broadcasts smaller table to each node, where the bigger one resides. Smaller table needs to fit into the node memory (2GB by default) in order for this type of join to be possible. There is only one joining hint, that can be used – broadcast. Which means we can either rely on Spark to chose the joining strategy for us, or to force it to broadcast.
With Spark 3 more hints emerged – merge, shuffle_hash, shuffle_replicate_nl, giving us more control over the joining strategy. They all shall be good when joining 2 big tables. Shuffle_hash does not require data sorting as merge, but may lead to out of memory errors when data is skewed.
Selecting proper joining strategy may have significant impact on job performance. Usually Spark selects the optimal way for data processing. However if job takes longer than expected it is worth to try enforcing other strategy.
User defined functions allow us to incorporate some additional functionality into Spark. Unfortunately, when used from Python or R API, they require starting Python/R processes on the nodes and translating jvm objects into those languages. With the help of the Arrow library, which changes row-by-row processing into batch processing, this translation is significantly speeded up. Also Tungsten improvements, thanks to which some operations do not require serialization and deserialization, come with performance help. However, whenever possible, it is better to use Spark native functions instead of UDFs.
Another issue with UDFs is that they are not being anyhow optimised by Spark. They are treated as black boxes. The more you use them, the less efficient job performance you may expect.
There are less and less use cases when it makes sense to use the RDD Spark API. You may check my previous post describing the differences between Spark data abstractions. It used to make sense to use RDDs while working i.e. with binaries, however with Spark 3 binary data format and images can already be read into DataFrames. Operations on RDDs are not being optimised by the Catalyst and hence it is harder to use this API efficiently. So if you do not have a very good reason to use RDDs, simply don’t. DataFrame and Dataset APIs are much more performance-wise efficient.
Catalyst is a Spark query optimizer, translating queries against DataFrames and Datasets into an optimal execution plan. It is responsible i.e. for constant folding, simplification of boolean expressions, piping filters or type of join selection.
Catalyst does a nice job with its optimizations. However it is not always good to fully relay on it for selecting the best possible plan. First of all, Catalyst cannot optimize the UDFs, it treats them as black boxes. Also, it may invoke them more than once. This is why, by default, UDFs are treated as deterministic, as there is no guarantee that they will not be evoked more than once.
Moreover you should remember that Catalyst does not guarantee the order of evaluation of boolean expressions. For that reason you should construct your queries in a proper way, not relying on the order of ‘ands‘ and ‘ors‘. Also may be good to check how many data conversions are happening throughout your job, as each data incompatibility may enforce one.
Spark Session is an entry point to Spark functionality. When establishing one, you may set lots of its properties. Here are some worth mentioning when considering job performance:
- dynamic executor allocation
spark.dynamicAllocation.minExecutors = 5
spark.dynamicAllocation.maxExecutors = 10
spark.dynamicAllocation.enabled = true
By specifying those, we ensure that processing will start as soon as specified minimum of executors is available. With the static approach, you need to wait to have exactly the amount you specified and they will not be replaced till you’re done.
- locality based scheduling
spark.locality.wait = 3s
This parameter specifies the maximal time Spark shall wait to give up on prioritizing to run jobs where data lies. That may imply more shuffles, but will reduce start waiting time.
- executors and java memory
spark.executor.memory = 1G
spark.driver.extraJavaOptions = -Xss64M
spark.executor.extraJavaOptions = -Xss64M
Increasing processing memory may also influence the processing time.
There may be several things that impact the performance of a Spark job. What I always suggest is to:
- choose proper data format
- persist the data whenever you plan to reuse it
- check if data partitions are of equal sizes
- reduce the data before shuffling
- check how tables/files are stored
- verify processing plan of long queries
- mind that tuning one thing may affect the other