Apache Beam and HCatalog
HCatalog gives a flexibility to read and write data to Hive metastore tables without specifying the tables schemas. Apache Beam provides a transform which allows querying the Hive data. It’s called HCatalogIO. Here I show how to use it in Kerberised environment.
Contents
HOW TO
I was using Beam 2.3 Java SDK and Spark 2.2 on CDH Hadoop cluster v. 5.12.1 with Kerberos.
For how to start working with Beam check my previous post.
Using HCatalogIO may look like this:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
//HCatOptions may extend PipelineOptions or HadoopFileSystemOptions HCatReadWrite.HCatOptions options = PipelineOptionsFactory.fromArgs(args) .withValidation() .as(HCatReadWrite.HCatOptions.class); Pipeline p = Pipeline.create(options); //We need to provide the hive.metastore.uris Map<String, String> configProperties = new HashMap<String, String>(); configProperties.put("hive.metastore.uris","my_hive_metastore_uri"); p.apply(HCatalogIO.read() .withConfigProperties(configProperties) .withDatabase("my_database") .withTable("my_input_table")) .apply(HCatalogIO.write() .withConfigProperties(configProperties) .withDatabase("my_database") .withTable("my_output_table")); |
Nothing fancy happens here, just simple reading from my_input_table and writing it back to my_output_table. Of course you may add some transforms in between.
Kerberos authentication
With HCatalogIO the trickiest part is to authenticate with Kerberos. In order to do that I had to:
- set following Spark properties:
• spark.driver.extraJavaOptions=-Djavax.security.auth.useSubjectCredsOnly=false
• spark.executor.extraJavaOptions=-Djavax.security.auth.useSubjectCredsOnly=false
to prevent the error:12Exception in thread "main" org.apache.spark.SparkException: Keytab file:hdfs://.../mykeytab.keytab does not exist
- pass the keytab file 2 times (each time with different name). Once with files option and secondly with keytab option. First one needs to available on HDFS with proper reading permissions set, while the second one needs to be stored locally due to pending issue.
Keytabs need to have different names, otherwise I was getting:123Exception in thread "main" java.lang.IllegalArgumentException: Attemptto add (file:/.../my_keytab.keytab#tmp.keytab) multiple times to thedistributed cache. - pass the principal parameter
- specify hive.metastore.uris in order to connect to metastore
Old Hive version
Due to old Hive version (1.1) distributed in CDH, I had to modify the HCatalogIO class in Beam SDK to use HCatUtil.getHiveClient method instead of HCatUtil.getHiveMetastoreClient.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
//modified part public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws Exception { Configuration conf = new Configuration(); for (Entry<String, String> entry : spec.getConfigProperties() .entrySet()) { conf.set(entry.getKey(), entry.getValue()); } HiveMetaStoreClient client = null; try { HiveConf hiveConf = HCatUtil.getHiveConf(conf); client = HCatUtil.getHiveClient(hiveConf); Table table = HCatUtil.getTable(client, spec.getDatabase(), spec.getTable()); return StatsUtils.getFileSizeForTable(hiveConf, table); } finally { if (client != null) { client.close(); } } } |
Without that I was getting errors like:
1 |
User class threw exception: java.lang.RuntimeException: java.lang.LinkageError: loader constraint violation: when resolving method "org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(Lorg/apache/hadoop/hive/conf/HiveConf;Lorg/apache/hadoop/hive/ql/metadata/Table;)J" the class loader (instance of org/apache/spark/util/ChildFirstURLClassLoader) of the current class, org/apache/beam/sdk/io/hcatalog/HCatalogIO$BoundedHCatalogSource, and the class loader (instance of sun/misc/Launcher$AppClassLoader)for the method's defining class, org/apache/hadoop/hive/ql/stats/StatsUtils, have different Class objects for the type org/apache/hadoop/hive/conf/HiveConf used in the signature |
1 |
User class threw exception: java.lang.RuntimeException: com.google.common.util.concurrent.ExecutionError: java.lang.NoSuchMethodError: org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(Lorg/apache/hadoop/hive/conf/HiveConf;[Ljava/lang/Class;[Ljava/lang/Object;Ljava/lang/String;)Lorg/apache/hadoop/hive/metastore/IMetaStoreClient; |
1 |
User class threw exception: java.lang.RuntimeException: com.google.common.util.concurrent.ExecutionError: java.lang.NoSuchFieldError: METASTORE_CLIENT_SOCKET_LIFETIME |
1 |
java.io.InvalidClassException: org.apache.hadoop.hive.metastore.api.Table; local class incompatible: stream classdesc serialVersionUID = 7046373721250106722, local class serialVersionUID = 398473631015277182 |
For Beam 2.4 there is a workaround described on official Beam webpage.
Libraries
Additionally I had to attach following libraries:
- beam-sdks-java-core-2.3.0.jar
- beam-runners-spark-2.3.0.jar
- beam-runners-direct-java-2.3.0.jar
- beam-runners-core-construction-java-2.3.0.jar
- beam-runners-core-java-2.3.0.jar
- beam-sdks-java-io-hadoop-file-system-2.3.0.jar
- beam-sdks-java-io-hcatalog-2.3.0.jar
- beam-sdks-java-io-hadoop-common-2.3.0.jar
- hive-hcatalog-core-1.1.0-cdh5.12.1.jar because of
1ERROR yarn.ApplicationMaster: User class threw exception: java.lang.NoClassDefFoundError: org/apache/hive/hcatalog/data/transfer/ReaderContext
- snappy-java-1.1.7.1.jar because of
1Final app status: FAILED, exitCode: 15, (reason: User class threw exception: java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.maxCompressedLength(I)I)
Running job
I used spark2-submit command to run my job:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
spark2-submit --jars beam-sdks-java-core-2.3.0.jar, beam-runners-spark-2.3.0.jar, beam-runners-direct-java-2.3.0.jar, beam-runners-core-construction-java-2.3.0.jar, beam-runners-core-java-2.3.0.jar, beam-sdks-java-io-hadoop-file-system-2.3.0.jar, beam-sdks-java-io-hcatalog-2.3.0.jar, beam-sdks-java-io-hadoop-common-2.3.0.jar, hive-hcatalog-core-1.1.0-cdh5.12.1.jar, snappy-java-1.1.7.1.jar --conf spark.driver.userClassPathFirst=true --conf spark.driver.extraJavaOptions= -Djavax.security.auth.useSubjectCredsOnly=false --conf spark.executor.extraJavaOptions= -Djavax.security.auth.useSubjectCredsOnly=false --files hdfs://.../my_keytab1.keytab --principal my_name --keytab my_keytab2.keytab --class hcatalog.HCatReadWrite --master yarn --queue=my_queue --deploy-mode cluster my_jar.jar --runner=SparkRunner --sparkMaster=yarn |
My pom.xml contained following dependencies:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
<dependency> <groupid>org.apache.beam</groupid> <artifactid>beam-sdks-java-core</artifactid> <version>2.3.0</version> </dependency> <dependency> <groupid>org.apache.beam</groupid> <artifactid>beam-runners-direct-java</artifactid> <version>2.3.0</version> </dependency> <dependency> <groupid>org.apache.beam</groupid> <artifactid>beam-runners-spark</artifactid> <version>2.3.0</version> </dependency> <dependency> <groupid>org.apache.beam</groupid> <artifactid>beam-sdks-java-io-hadoop-file-system</artifactid> <version>2.3.0</version> </dependency> <dependency> <groupid>org.apache.hadoop</groupid> <artifactid>hadoop-common</artifactid> <version>2.6.0</version> </dependency> <dependency> <groupid>org.apache.beam</groupid> <artifactid>beam-sdks-java-io-hcatalog</artifactid> <version>2.3.0</version> </dependency> <dependency> <groupid>org.apache.beam</groupid> <artifactid>beam-sdks-java-io-hadoop-common</artifactid> <version>2.3.0</version> </dependency> <dependency> <groupid>org.apache.hive.hcatalog</groupid> <artifactid>hive-hcatalog-core</artifactid> <version>2.1.0</version> <exclusions> <exclusion> <groupid>org.apache.hive</groupid> <artifactid>hive-exec</artifactid> </exclusion> </exclusions> </dependency> <dependency> <groupid>org.apache.hive</groupid> <artifactid>hive-exec</artifactid> <version>2.1.0</version> </dependency> |
Features
I tested several features of HCatalogIO:
- Reading/writing to tables in text format
I had no problems with reading or writing data in text format. I also had no issues with Avro format.
Note: Table needs to be specified upfront. - Reading tables in Parquet format
I could read Parquet data tables regardless if they were partitioned or not. Surprisingly withFilter reading option works only on partitioning columns and not on the non-partitioning ones. That would be cool if it did. - Writing to Parquet tables
This doesn’t work. It seems to be related not to Beam itself but to the fact that currently HCatalog simply doesn’t write to Parquet, which is a known issue.12Caused by: java.lang.RuntimeException: Should never be usedat org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getRecordWriter(MapredParquetOutputFormat.java:76) - Writing to partitioned table
To partition the data you need to define partitions manually like that:123456789Map<String, String> partitions = new HashMap<>();partitions.put("partition1","value1");partitions.put("partition2","value2");p.apply(HCatalogIO.write().withConfigProperties(configProperties).withDatabase("my_database").withTable("my_output_table").withPartition(partitions));Note: While writing there is no filtering applied – everything gets written to the specified partition!
- Overwriting partitions
This also doesn’t work. Instead of being overwritten the data gets appended. Additionally while writing to specified partition data isn’t automatically filtered out based on partitioning condition – all passed data is written to specified partition (data filtering needs to be done manually). Side case is that many empty folders are generated while writing to partitioned table. Table to write to needs to be specified upfront, cannot be automatically generated. - Dynamic partitioning
There is no dynamic partitioning support. Partitions to write to need to be manually specified upfront.If you specify less partitions than the table has, you get:
1User class threw exception: org.apache.beam.sdk.Pipeline$PipelineExecutionException: org.apache.hive.hcatalog.common.HCatException : 9001 : Exception occurred while processing HCat request : Failed while writing. Cause : org.apache.hive.hcatalog.common.HCatException : 2010 : Invalid partition values specified : Unable to configure dynamic partitioning for storage handler, mismatch between number of partition values obtained[0] and number of partition values required[1]If you specifying an empty partition, all the results are written to _HIVE_DEFAULT_PARTITION_ partition.
If you specify no partitions, then you get:
1org.apache.beam.sdk.util.UserCodeException: org.apache.hive.hcatalog.common.HCatException : 9001 : Exception occurred while processing HCat request : Failed while writing. Cause : org.apache.hive.hcatalog.common.HCatException : 2010 : Invalid partition values specified : Unable to configure dynamic partitioning for storage handler, mismatch between number of partition values obtained[0] and number of partition values required[2]
So I would say that HCatalogIO is not very mature yet and for many typical processing cases many workarounds are needed.
I’ve the same problem with Dynamic Paritioning – just doesn’t seem to work.
Do you have reference to any official Beam documentation that supports this statement though ?
Great post.
How you attach the JARs? Are they available in the server? File system / HDFS
Or you mean they are already included in the uber JAR?
Example when you say:
–jars beam-sdks-java-core-2.3.0.jar
Where is that beam-sdks-java-core-2.3.0.jar?
Inside your uber JAR, according to your POM file?
Hi,
HDFS, but there are few options available: https://spark.apache.org/docs/latest/submitting-applications.html
Hi Alice,
Is there a way to use Beam + HCatalogIO without thrift server? Instead of thrift server just use Hive Metastore Database details or JDBC? Reason being we are on GCP and we don’t have a thrift server running all the time.
Thanks,
Paresh