Apache Beam JDBC
With Apache Beam we can connect to different databases – HBase, Cassandra, MongoDB using specific Beam APIs. We also have a JdbcIO for JDBC connections. Here I show how to connect with MSSQL database using Beam and do some data importing and exporting in Kerberised environment.
Contents
HOW TO
I was using Beam 2.3 Java SDK and Spark 2.2 on CDH Hadoop cluster v. 5.12.1 with Kerberos.
For how to start working with Beam check my previous post.
Data Import
To import data from database we need to specify:
- JDBC driver
- connection string
- db username
- db password
- query for data retrieval
- data coders
- ResultSet mapping to database columns
That leads us to:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
JDBCImportExport.ImportExportOptions options = PipelineOptionsFactory .fromArgs(args).withValidation() .as(JDBCImportExport.ImportExportOptions .class); Pipeline p = Pipeline.create(options); PCollection<KV<String, Integer>> data = p .apply(JdbcIO.<KV<String, Integer>>read() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create( "my_driver", "db_connection_string") .withUsername("my_username") .withPassword("my_password")) .withQuery("select name, age from Person.person") .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())) .withRowMapper(new JdbcIO.RowMapper<KV<String, Integer>>() { public KV<String,Integer> mapRow(ResultSet resultSet) throws Exception { return KV.of(resultSet.getString(1), resultSet.getInt(2)); } }) ); |
I didn’t particularly like putting passwords in the code, but it is how it is. You can try putting it in a file and parsing properly.
Data Export
Data exporting requires similar properties ad importing:
- JDBC driver
- connection string
- db username
- db password
- INSERT statement
- setting the columns data types
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
JDBCImportExport.ImportExportOptions options = PipelineOptionsFactory .fromArgs(args).withValidation() .as(JDBCImportExport.ImportExportOptions .class); Pipeline p = Pipeline.create(options); PCollection data = p.apply(TextIO.read().from("hdfs://BeamData/*")); data.apply(MapElements.via(new JDBCImportExport.FormatToColumnsFn())) .apply(JdbcIO.<KV<String, Integer>>write() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create( "my_driver", "db_connection_string") .withUsername("my_username") .withPassword("my_password")) .withStatement("insert into Person.person values(?, ?)") .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<KV<String, Integer>>() { public void setParameters(KV<String, Integer> element, PreparedStatement query) throws SQLException { query.setString(1, element.getKey()); query.setInt(2, element.getValue()); } }) ); |
Putting a String text into KV format is rather straightforward:
1 2 3 4 5 6 7 |
public static class FormatToColumnsFn extends SimpleFunction<String, KV<String, Integer>> { @Override public KV<String, Integer> apply(String input) { String[] columns = input.split(": "); return KV.of(columns[0], Integer.valueOf(columns[1])); } } |
Before data exporting we need to make sure that the table is available on the database. Beam doesn’t provide an option to automatically create one. You can workaround it with CREATE TABLE statement before INSERT or custom stored procedure, but that requires setting batch size to 1 (.withBatchSize(1)). Otherwise the create statement will be executed several times.
Note: By default exported data gets appended.
Note: Beam doesn’t support bulk exports. There is a batchSize option (1000 by default), which could be set. There is also a custom extension for bulk processing, which requires data grouping. It currently pends an approval to be incorporated into Beam.
Running job
To run the processing I had to specify following jars:
- beam-sdks-java-core-2.3.0.jar
- beam-runners-spark-2.3.0.jar
- beam-runners-direct-java-2.3.0.jar
- beam-runners-core-construction-java-2.3.0.jar
- beam-runners-core-java-2.3.0.jar
- beam-sdks-java-io-hadoop-file-system-2.3.0.jar
- beam-sdks-java-io-hadoop-common-2.3.0.jar
- snappy-java-1.1.7.1.jar
- beam-sdks-java-io-jdbc-2.3.0.jar
- commons-dbcp2-2.2.0.jar
- commons-pool2-2.5.0.jar
- sqljdbc4.jar
To avoid conflicting jar versions I had to set spark.driver.userClassPathFirst=true. Without that I was getting errors like:
1 2 3 |
User class threw exception: java.lang.RuntimeException: java.lang.NoSuchMethodError: org.apache.hive.hcatalog.common.HCatUtil.getHiveMetastoreClient (Lorg/apache/hadoop/hive/conf/HiveConf;)Lorg/apache/hadoop/hive/metastore/IMetaStoreClient; |
1 2 3 |
User class threw exception: java.lang.RuntimeException: java.lang.NoSuchMethodError: org.apache.hive.common.util .ShutdownHookManager.addShutdownHook(Ljava/lang/Runnable;) |
Instead you can put proper jars in the Spark libpath.
I used spark2-submit command to run my job:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
spark2-submit --jars beam-sdks-java-core-2.3.0.jar,beam-runners-spark-2.3.0.jar, beam-runners-direct-java-2.3.0.jar,beam-runners-core-construction-java-2.3.0.jar, beam-runners-core-java-2.3.0.jar,beam-sdks-java-io-hadoop-file-system-2.3.0.jar, beam-sdks-java-io-hadoop-common-2.3.0.jar,snappy-java-1.1.7.1.jar, beam-sdks-java-io-jdbc-2.3.0.jar,commons-dbcp2-2.2.0.jar, commons-pool2-2.5.0.jar,sqljdbc4.jar --conf spark.driver.userClassPathFirst=true --class jdbc.JDBCImportExport --master yarn --queue=root.my_queue --deploy-mode cluster my_jar.jar --driver=com.microsoft.sqlserver.jdbc.SQLServerDriver --runner=SparkRunner --sparkMaster=yarn |
My pom.xml contained following dependencies:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
<dependency> <groupid>org.apache.beam</groupid> <artifactid>beam-sdks-java-io-jdbc</artifactid> <version>2.3.0</version> </dependency> <dependency> <groupid>org.xerial.snappy</groupid> <artifactid>snappy-java</artifactid> <version>1.1.7.1</version> </dependency> <dependency> <groupid>org.apache.beam</groupid> <artifactid>beam-sdks-java-core</artifactid> <version>2.3.0</version> </dependency> <dependency> <groupid>org.apache.beam</groupid> <artifactid>beam-runners-direct-java</artifactid> <version>2.3.0</version> </dependency> <dependency> <groupid>org.apache.beam</groupid> <artifactid>beam-runners-spark</artifactid> <version>2.3.0</version> </dependency> <dependency> <groupid>org.apache.beam</groupid> <artifactid>beam-sdks-java-io-hadoop-file-system</artifactid> <version>2.3.0</version> </dependency> <dependency> <groupid>org.apache.hadoop</groupid> <artifactid>hadoop-common</artifactid> <version>2.6.0</version> <exclusions> <exclusion> <groupid>jdk.tools</groupid> <artifactid>jdk.tools</artifactid> </exclusion> </exclusions> </dependency> <dependency> <groupid>org.apache.beam</groupid> <artifactid>beam-sdks-java-io-hadoop-common</artifactid> <version>2.3.0</version> </dependency> |
Beam JdbcIO works quite nicely, although some additional properties like staging table, data overwriting or bulk exporting would be a cool add-on.