Spark and Hive
Spark gives us the ability to use SQL for data processing. With that we can connect with JDBC and ODBC to pretty much any database or use structured data formats like avro, parquet, orc. We can also connect to Hive and use all the structures we have there. In Spark 2.0 entry points to SQL (SQLContext) and Hive (HiveContext) were substituted with one object – SparkSession. SparkSession allows you to read and write to Hive, use HiveSQL language and Hive UDFs. No all of the Hive features are supported yet, but probably they will at some point (here is a list).
If you have the Spark and Hive setup ready, accessing Hive from Spark is pretty straightforward.
HOW TO
I was using Spark 2.2, Hive 1.1 with Kerberos.
If you’re using Maven you need to add following dependencies:
1 2 3 4 5 6 7 8 9 10 |
<dependency> <groupid>org.apache.spark</groupid> <artifactid>spark-sql_2.11</artifactid> <version><em>verision</em></version> </dependency> <dependency> <groupid>org.apache.hive</groupid> <artifactid>hive-metastore</artifactid> <version><em>version</em></version> </dependency> |
First you need to initialise the SparkSession with Hive support. If you want to use Hive partitioning you also need to set the Hive partitioning properties.
1 2 3 4 5 6 7 |
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .config("hive.metastore.uris", "hive_metastore_uri") .enableHiveSupport() .config("hive.exec.dynamic.partition", "true") .config("hive.exec.dynamic.partition.mode", "nonstrict") .getOrCreate() |
Those properties can also be set after the creation of SparkSession.
1 2 |
spark.sqlContext.setConf("hive.exec.dynamic.partition", "true") spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict") |
The Hive tables can either be created with Hive or through the SparkSQL commands.
1 2 3 4 5 6 7 8 9 |
#Spark spark.sql("CREATE EXTERNAL TABLE output_table1 (col1 int, col2 string, col3 int) STORED AS PARQUET LOCATION my_location") #Hive CREATE EXTERNAL TABLE output_table1 (col1 int, col2 string, col3 int) STORED AS PARQUET LOCATION my_location ; |
And now we can read and write:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
val df1 = spark.sql("SELECT * FROM output_table1") #Spark no partition write df1.write.option("path", "my_table_path") .format("parquet") .mode("overwrite") .saveAsTable("output_table1") #Spark partitioned write df1.write.option("path", "my_table_path") .partitionBy("col1","col2") .format("parquet") .mode("overwrite") .saveAsTable("output_table2") |
Instead of using the write option you may use sql command with insert:
1 2 3 4 |
df1.createOrReplaceTempView("df1_table") spark.sql(INSERT OVERWRITE TABLE output_table2 PARTITION(col1 = 2000, col2 = "monday") SELECT col3, col1, col2 FROM df1_table) |
Note: Without specifying the path option Spark tries to write into the default data warehouse, even if the table definition has other path specified.
Note: For Cloudera distribution of Hive and Spark before 5.14 there is a (bug). You cannot data read from table into which you wrote without path specification.
Note: By default Spark will use its own Parquet support instead of Hive SerDe for better performance. It is controlled by spark.sql.hive.convertMetastoreParquet parameter.