Spark 3 highlights
Recently Apache Spark 3.1.1 was released. Let’s take a look into some of the new features provided within Spark version 3.
HIGHLIGHTS
Adaptive query execution
That means allowing Spark to change the execution plan during runtime, when run statistics are being updated. In other words after some processing steps are already done and stats from them are obtained, the execution plan can be automatically updated to leverage this additional information. AQE improves the workflow created by the Catalyst optimizer. Current optimizations include:
-
Dynamically coalescing shuffle partitions
If it turns out that after shuffling there are some small partitions, those are automatically combined in order not to generate the additional overhead of starting new processes.
-
Dynamically switching join strategies
Join strategy may be not ideally chosen if there are some preceding operations for which it is hard to establish proper statistics. This can be reading from compressed files or some UDF processing. The size of the joining tables may be then miscalculated in such a situation. With AQE Spark can now asses and change joining strategy after running the calculations and re-estimating tables sizes.
-
Dynamically optimizing skew joins
Joining time is always determined by the largest data partition. The bigger the partitions the longer data joining time. If some partition is significantly bigger than the others, it now will be automatically split into smaller portions and join will be perform on such each portion. It will result in more tasks doing the actual job, but the performance of each of those tasks should be more similar. And ultimately job should finish faster.
Note: By default AQE is turned off, but you can enable it by setting spark.sql.adaptive.enabled to true.
Modified hints for Python UDFs
With Spark 2.3 Arrow was introduced, which allows for faster processing of native Python code through Python UDFs. Those so called Pandas UDFs require us to specify which type of processing we want to do. We can choose between:
- Series -> Series, hint PandasUDFType.SCALAR
- Pandas DF -> Pandas DF, hint PandasUDFType.GROUPED_MAP
- Pandas Series -> Scalar, hint PandasUDFType.GROUPED_AGG
With Spark 3 release additional one popped up:
- Iterator[Pandas Series] -> Iterator[Pandas Series], hint PandasUDFType.SCALAR_ITER
Those hints were provided as follows:
1 2 3 |
@pandas_udf("long", PandasUDFType.SCALAR) def function(v): ... |
Now we have more pythonic way, specifying input and output function types, so we can actually forget the hints:
1 2 3 |
@pandas_udf("long") def function(v: pd.Series) -> pd.Series: ... |
More info can be found here.
Arrow improvements
Those are visible especially in SparkR as we have them in Python since a while already. Performance enhancements were incorporated into vectorized versions of gapply(), dapply(), createDataFrame() and collect() functions. With this change processing happens much faster as instead of row-by-row calculations are done on batches of records.
Dynamic partition pruning
Projection pruning and predicate pushdown can be observed in Spark since Catalyst introduction. Projection pruning relates to eliminating columns not needed for the processing as soon as possible. Predicate pushdown ensures that not needed rows are not dragged along (in simpler words – data filtering happens as early as possible). Both of those are done statically at compilation.
Dynamic partition pruning, on the other hand, happens at runtime when additional information about the data is acquired. It is especially useful when we want to join fact and dimension tables and filter based on the dimension columns. Before this feature big fact filtering was done after the big join. Now fact filtering keys can be established based on dimension filtering dynamically, which results in filtering before join (even during table scan if possible) and faster join itself.
This dynamic pruning needs to be enabled: spark.sql.optimizer.dynamicPartitionPruning.enabled = True.
New join hints
In Spark 2 we had one join hint – for running broadcast hash join. With Spark 3 three more arrived with new joining strategies: sort merge join, shuffle hash join, and the shuffle nested loop join. That means that currently we have:
-
broadcast hash / nested loop join – BROADCAST
useful when one of the tables is small (below spark.sql.autoBroadcastJoinThreshold = 10MB). Then this table gets transferred to all the nodes containing other table’s partitions and join is performed.
-
sort merge join – MERGE
performs sorting and shuffling of both tables, so works well if both tables are rather big.
-
shuffle hash join – SHUFFLE_HASH
performs tables hashing instead of sorting, can handle large tables, but may run out of memory if data is skewed.
-
shuffle nested loop join – SHUFFLE_REPLICATE_NL
performs cross join and hence iterates over both of the tables.
Formatted execution plan
Explain command provides us with the query execution plan, which is extremely helpful while debugging code performance issues. With Spark 3 we can view this plan in 4 different modes:
-
EXTENDED – Generates parsed, analysed and optimized logical plans together with physical plan.
Parsed logical plan is an unresolved plan that is created based on the provided query. Analysed logical plan comes after code is verified and semantics is resolved. Then optimized logical plan is created by Catalyst, by applying code optimizations. In the end the physical plan is created, which presents the final execution plan.
-
FORMATTED – Generates two sections: a physical plan outline and node details.
It gives simplified and more readable plan version, with names of the operators and their inputs and outputs.
-
COST – besides physical plan, if plan statistics are available, generates a logical plan with the statistics.
With that you can see what are the estimates for the data sizes at different steps of execution.
-
CODEGEN – shows generated java code that will be executed
With that you can see what are the estimates for the data sizes at different steps of execution.
Usage example can be found in the docs.
Clearer error messages in Python
Up till Spark 3 Python errors were wrapped in Java errors, which made code troubleshooting quite complex. With the new release error messages got exposed, making them more Pythonic.
Improved Python documentation
Long awaited change, which can be found here.
Dropped support for Python 2 and R < 3.4
Requires no comment 😉
For more news check the Spark release notes.