Skip to content

Apache Beam JDBC

Apache Beam JDBC

With Apache Beam we can connect to different databases – HBase, Cassandra, MongoDB using specific Beam APIs. We also have a JdbcIO for JDBC connections. Here I show how to connect with MSSQL database using Beam and do some data importing and exporting in Kerberised environment.

HOW TO

I was using Beam 2.3 Java SDK and Spark 2.2 on CDH Hadoop cluster v. 5.12.1 with Kerberos.

For how to start working with Beam check my previous post.

 

Data Import

To import data from database we need to specify:

  • JDBC driver
  • connection string
  • db username
  • db password
  • query for data retrieval
  • data coders
  • ResultSet mapping to database columns

That leads us to:

I didn’t particularly like putting passwords in the code, but it is how it is. You can try putting it in a file and parsing properly.

 

Data Export

Data exporting requires similar properties ad importing:

  • JDBC driver
  • connection string
  • db username
  • db password
  • INSERT statement
  • setting the columns data types

Putting a String text into KV format is rather straightforward:

Before data exporting we need to make sure that the table is available on the database. Beam doesn’t provide an option to automatically create one. You can workaround it with CREATE TABLE statement before INSERT or custom stored procedure, but that requires setting batch size to 1 (.withBatchSize(1)). Otherwise the create statement will be executed several times.

Note: By default exported data gets appended.

Note: Beam doesn’t support bulk exports. There is a batchSize option (1000 by default), which could be set. There is also a custom extension for bulk processing, which requires data grouping. It currently pends an approval to be incorporated into Beam.

 

Running job

To run the processing I had to specify following jars:

  • beam-sdks-java-core-2.3.0.jar
  • beam-runners-spark-2.3.0.jar
  • beam-runners-direct-java-2.3.0.jar
  • beam-runners-core-construction-java-2.3.0.jar
  • beam-runners-core-java-2.3.0.jar
  • beam-sdks-java-io-hadoop-file-system-2.3.0.jar
  • beam-sdks-java-io-hadoop-common-2.3.0.jar
  • snappy-java-1.1.7.1.jar
  • beam-sdks-java-io-jdbc-2.3.0.jar
  • commons-dbcp2-2.2.0.jar
  • commons-pool2-2.5.0.jar
  • sqljdbc4.jar

To avoid conflicting jar versions I had to set spark.driver.userClassPathFirst=true. Without that I was getting errors like:

Instead you can put proper jars in the Spark libpath.
 
I used spark2-submit command to run my job:

My pom.xml contained following dependencies:

Beam JdbcIO works quite nicely, although some additional properties like staging table, data overwriting or bulk exporting would be a cool add-on.

Leave a Reply

Your email address will not be published. Required fields are marked *