Skip to content

Apache Beam and HCatalog

Apache Beam and HCatalog

HCatalog gives a flexibility to read and write data to Hive metastore tables without specifying the tables schemas. Apache Beam provides a transform which allows querying the Hive data. It’s called HCatalogIO. Here I show how to use it in Kerberised environment.

Contents

HOW TO

I was using Beam 2.3 Java SDK and Spark 2.2 on CDH Hadoop cluster v. 5.12.1 with Kerberos.

For how to start working with Beam check my previous post.

 
Using HCatalogIO may look like this:

Nothing fancy happens here, just simple reading from my_input_table and writing it back to my_output_table. Of course you may add some transforms in between.

 
 

Kerberos authentication

With HCatalogIO the trickiest part is to authenticate with Kerberos. In order to do that I had to:

  • set following Spark properties:
    spark.driver.extraJavaOptions=-Djavax.security.auth.useSubjectCredsOnly=false
    spark.executor.extraJavaOptions=-Djavax.security.auth.useSubjectCredsOnly=false
    to prevent the error:
  • pass the keytab file 2 times (each time with different name). Once with files option and secondly with keytab option. First one needs to available on HDFS with proper reading permissions set, while the second one needs to be stored locally due to pending issue.
    Keytabs need to have different names, otherwise I was getting:

  • pass the principal parameter

  • specify hive.metastore.uris in order to connect to metastore

 

Old Hive version

Due to old Hive version (1.1) distributed in CDH, I had to modify the HCatalogIO class in Beam SDK to use HCatUtil.getHiveClient method instead of HCatUtil.getHiveMetastoreClient.

Without that I was getting errors like:

For Beam 2.4 there is a workaround described on official Beam webpage.

 

Libraries

Additionally I had to attach following libraries:

  • beam-sdks-java-core-2.3.0.jar
  • beam-runners-spark-2.3.0.jar
  • beam-runners-direct-java-2.3.0.jar
  • beam-runners-core-construction-java-2.3.0.jar
  • beam-runners-core-java-2.3.0.jar
  • beam-sdks-java-io-hadoop-file-system-2.3.0.jar
  • beam-sdks-java-io-hcatalog-2.3.0.jar
  • beam-sdks-java-io-hadoop-common-2.3.0.jar
  • hive-hcatalog-core-1.1.0-cdh5.12.1.jar because of
  • snappy-java-1.1.7.1.jar because of

 

Running job

I used spark2-submit command to run my job:


My pom.xml contained following dependencies:

 

Features

I tested several features of HCatalogIO:

  • Reading/writing to tables in text format
    I had no problems with reading or writing data in text format. I also had no issues with Avro format.
    Note: Table needs to be specified upfront.

  • Reading tables in Parquet format
    I could read Parquet data tables regardless if they were partitioned or not. Surprisingly withFilter reading option works only on partitioning columns and not on the non-partitioning ones. That would be cool if it did.

  • Writing to Parquet tables
    This doesn’t work. It seems to be related not to Beam itself but to the fact that currently HCatalog simply doesn’t write to Parquet, which is a known issue.

  • Writing to partitioned table
    To partition the data you need to define partitions manually like that:

    Note: While writing there is no filtering applied – everything gets written to the specified partition!


  • Overwriting partitions
    This also doesn’t work. Instead of being overwritten the data gets appended. Additionally while writing to specified partition data isn’t automatically filtered out based on partitioning condition – all passed data is written to specified partition (data filtering needs to be done manually). Side case is that many empty folders are generated while writing to partitioned table. Table to write to needs to be specified upfront, cannot be automatically generated.

  • Dynamic partitioning
    There is no dynamic partitioning support. Partitions to write to need to be manually specified upfront.

    If you specify less partitions than the table has, you get:

    If you specifying an empty partition, all the results are written to _HIVE_DEFAULT_PARTITION_ partition.

    If you specify no partitions, then you get:

 
So I would say that HCatalogIO is not very mature yet and for many typical processing cases many workarounds are needed.

5 thoughts on “Apache Beam and HCatalog

Leave a Reply

Your email address will not be published. Required fields are marked *