Apache Beam and HBase
HBase is a NoSql database, which allows you to store data in many different formats (like pictures, pdfs, textfiles and others) and gives the ability to do fast and efficient data lookups. HBase has two APIs to chose from – Java API and HBase Shell. We can also connect HBase with some different tools like Hive or Phoenix and use SQL. HBase also integrates with Apache Beam via HBaseIO transform. Here I show how to connect HBase and Beam in Kerberised environment.
Contents
HOW TO
I was using Beam 2.3 Java SDK and Spark 2.2 on CDH Hadoop cluster v. 5.12.1 with Kerberos.
For how to start working with Beam check my previous post.
Here’s an example use of HBaseIO:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
//HBaseOptions may extend PipelineOptions or HadoopFileSystemOptions HBaseReadWrite.HBaseOptions options = PipelineOptionsFactory .fromArgs(args).withValidation() .as(HBaseReadWrite.HBaseOptions.class); Pipeline p = Pipeline.create(options); //We need to create a Configuration object to pass some options Configuration myConfiguration = new Configuration(true); myConfiguration.set("hbase.security.authentication", "kerberos"); myConfiguration.set("hbase.zookeeper.quorum", "one,two,three"); PCollection inputData = p.apply("read", HBaseIO.read() .withConfiguration(myConfiguration) .withTableId("my_input_table") PCollection dataToWrite = inputData .apply(new MyTransform()); newData.apply("write", HBaseIO.write() .withConfiguration(myConfiguration) .withTableId("my_output_table"); |
Tricky thing is that HBaseIO.read() returns a Result object, whereas for writing we need to have a Mutation object. It means we need to specify some transform in between:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
public static class MyTransform extends PTransform<PCollection,PCollection>{ @Override public PCollection expand(PCollection data) { PCollection column = data.apply( ParDo.of(new DoFn<Result, Mutation>() { @DoFn.ProcessElement public void processElement(ProcessContext c) throws IOException { Cell cell = c.element() .getColumnLatestCell(Bytes.toBytes("key"),Bytes.toBytes("first_column")); Put put = new Put(c.element().getRow()); put.add(cell); c.output(put); } })); return column; } } |
Read data can be limited either with withScan, withKeyRange or withFilter methods:
1 2 3 4 5 6 7 8 9 10 |
Scan scanner = new Scan(); scanner.setMaxResultSize(2); p.apply("read", HBaseIO.read() .withConfiguration(myConfiguration) .withTableId("my_input_table") .withScan(scanner)) //.withKeyRange(new byte[1],new byte[3])) -- for certain key range //.withFilter(new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("my_key")))) -- for lookups |
More info can be found here.
Kerberos authentication
To make HBaseIO work with Kerberos I had to:
- pass the keytab file 2 times (each time with different name). Once with files option and secondly with keytab option. First one needs to available on HDFS with proper reading permissions set, while the second one needs to be stored locally due to pending issue.
Keytabs need to have different names, otherwise I was getting:123Exception in thread "main" java.lang.IllegalArgumentException:Attempt to add (file:/.../my_keytab.keytab#tmp.keytab) multiple timesto the distributed cache. - pass the principal parameter
- set hbase.security.authentication=kerberos
- specify hbase.zookeeper.quorum property
Libraries
Following jars were needed for my processing:
- beam-sdks-java-io-hadoop-common-2.3.0.jar
- snappy-java-1.1.7.1.jar
- beam-sdks-java-io-hbase-2.4.0.jar
- hbase-client-1.2.6.jar
- hbase-server-1.2.6.jar
- hbase-common-1.2.6.jar
- hbase-protocol-1.2.6.jar
- htrace-core-3.1.0-incubating.jar
Note: Passed HBase jars cannot have version higher than 1.2.6 (this one is used by Beam 2.3). Using newer ones may result in errors:
1 2 |
ClassCastException: org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl cannot be cast to org.apache.hadoop.yarn.api.records.Priority |
1 2 |
User class threw exception: java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/shaded/com/google/common/collect/ListMultimap |
Running job
I used spark2-submit command to run my job:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
spark2-submit --jars beam-sdks-java-core-2.3.0.jar,beam-runners-spark-2.3.0.jar, beam-runners-direct-java-2.3.0.jar,beam-runners-core-construction-java-2.3.0.jar, beam-runners-core-java-2.3.0.jar,beam-sdks-java-io-hadoop-file-system-2.3.0.jar, beam-sdks-java-io-hadoop-common-2.3.0.jar,snappy-java-1.1.7.1.jar, beam-sdks-java-io-hbase-2.4.0.jar,hbase-client-1.2.6.jar,hbase-server-1.2.6.jar, hbase-common-1.2.6.jar,hbase-protocol-1.2.6.jar,htrace-core-3.1.0-incubating.jar --conf spark.driver.userClassPathFirst=true --principal my_name --keytab my_keytab1.keytab --files hdfs://.../my_keytab2.keytab --class hbase.HBaseReadWrite --master yarn --queue=my_queue --deploy-mode cluster my_jar.jar --runner=SparkRunner --sparkMaster=yarn |
My pom.xml contained following dependencies:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
<dependency> <groupid>org.apache.beam</groupid> <artifactid>beam-sdks-java-core</artifactid> <version>2.3.0</version> </dependency> <dependency> <groupid>org.apache.beam</groupid> <artifactid>beam-runners-direct-java</artifactid> <version>2.3.0</version> </dependency> <dependency> <groupid>org.apache.beam</groupid> <artifactid>beam-runners-spark</artifactid> <version>2.3.0</version> </dependency> <dependency> <groupid>org.apache.beam</groupid> <artifactid>beam-sdks-java-io-hadoop-file-system</artifactid> <version>2.3.0</version> </dependency> <dependency> <groupid>org.apache.hadoop</groupid> <artifactid>hadoop-common</artifactid> <version>3.0.0</version> </dependency> <dependency> <groupid>org.apache.beam</groupid> <artifactid>beam-sdks-java-io-hadoop-common</artifactid> <version>2.3.0</version> </dependency> <dependency> <groupid>org.apache.beam</groupid> <artifactid>beam-sdks-java-io-hbase</artifactid> <version>2.4.0</version> </dependency> <dependency> <groupid>org.apache.spark</groupid> <artifactid>spark-sql_2.11</artifactid> <version>2.3.0</version> </dependency> |
Features
With HBaseIO I tested 3 scenarios.
- Reading data from HBase
I had no issues with reading data from HBase.
- Writing data to HBase
Also no issues detected while writing to HBase.
Note: HBase table needs to be specified upfront.
- HBase lookups for retrieving data by key
This scenario also passed the test. Beam very nicely integrates with HBase and allows to easily retrieve data by specifying record row key.
In my opinion Beam and HBase integrate really nicely. I spotted no issue and no missing features along the way.