Skip to content

Apache Beam – getting started

Apache Beam – getting started

Apache Beam is an open source unified programming model to define and execute data processing pipelines, including ETL, batch and stream processing. It is a processing tool which allows you to create data pipelines in Java or Python without specifying on which engine the code will run. So the same code can be run on MapReduce, Spark, Flink, Apex or some other engine. Of course not all functionalities make sense for all engines.

More info about Beam can be found here.

Apache Beam is a growing project, which changes quite fast. When I first heard about it 2 years ago, I could barely used it. Right now it can read many different file formats and can connect with many big data tools like HBase, Hive, Kudu or Solr. I decided to check how mature it actually is.

Contents

HOW TO

Getting started was quite easy. I decided to go with Java 8 and was running tests on CDH Hadoop cluster, version 5.12.1. I used Beam 2.3 and Spark 2.2.

First I started with the example Word count. To make it run and interact with HDFS I had to:

  • include following Beam jars:

    • beam-sdks-java-core-2.3.0.jar
    • beam-runners-spark-2.3.0.jar
    • beam-runners-direct-java-2.3.0.jar
    • beam-runners-core-construction-java-2.3.0.jar
    • beam-runners-core-java-2.3.0.jar
    • beam-sdks-java-io-hadoop-file-system-2.3.0.jar

  • set spark.driver.userClassPathFirst=true to make Spark use the jars provided by me instead of those in libpath

For running Beam with Spark I was using spark2-submit command:


In order to interact (read and write) with HDFS I had to use HadoopFileSystemOptions instead of PipelineOptions:


My pom.xml contained following dependencies:


After successfully running the example processing I checked if it’s possible to run it through Oozie workflow and also also collect the statistics of such run.

 

Running with Oozie

Spark2 Oozie action is not supported by CDH 5.12.1 version. That’s why I couldn’t test it.

I tried to run processing as a Java action but by default the direct runner was chosen and not Spark. I found no way of changing that.

Another idea was to run it as Shell action, but that was resulting with error:

 

Beam Metrics

Beam has so called Beam Metrics, which allow you to define custom metrics to be collected. Unfortunately those are only scoped to the transforms within the pipeline and there’s no way to use them outside. I haven’t found a way to retrieve the number of records written to the file as this was beyond the transform scope.

Example metric:

 
I also tested how Beam interacts with some of the tools that I recently used. Results of those tests can be found in my next posts.

Leave a Reply

Your email address will not be published. Required fields are marked *