Pig HBase lookups
Pig can nicely read from and write data to HBase, which can be done as I described here. Additionally we may use Pig UDF to manage data in HBase – like retrieving some values for a given key. There is one difficulty though – Zookeeper manages the number of concurrent connections done to HBase and if our application exceeds that, then the whole job will simply fail with message:
1 2 3 4 5 6 7 8 |
Exception in thread "Thread-26" javax.jdo.JDODataStoreException org.apache.hadoop.hbase.ZooKeeperConnectionException: HBase is able to connect to ZooKeeper but the connection closes immediately. This could be a sign that the server has too many connections (200 is the default). Consider inspecting your ZK server logs for that error and then make sure you are reusing HBaseConfiguration as often as you can. Caused by: org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /hbase |
In order to prevent that you may manage the connections setting with parameter maxClientCnxns or enforce smaller number of tasks running concurrently.
1 |
maxClientCnxns = 200 |
Here I post some simple Pig UDF doing lookups in HBase. It:
- connects to HBase
- retrieves the names of the table’s columns in order to create the output tuple
- fetches the row from HBase for key passed to the LookUp function
- appends fetched data to the processed tuple
In this case there’s one connection established for each running mapper. So still at some point this may go beyond the specified connection limit.
Contents
HOW TO
I used CDH 5.8.4 with Kerberos.
If you work with Maven there are 4 libraries that need to be imported (or more in case of more sophisticated code):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
<dependency> <groupid>org.apache.hbase</groupid> <artifactid>hbase-client</artifactid> <version>1.2.0-cdh5.8.4</version> </dependency> <dependency> <groupid>org.apache.hbase</groupid> <artifactid>hbase-common</artifactid> <version>1.2.0-cdh5.8.4</version> </dependency> <dependency> <groupid>org.apache.hadoop</groupid> <artifactid>hadoop-common</artifactid> <version>2.6.0-cdh5.8.4</version> </dependency> <dependency> <groupid>org.apache.pig</groupid> <artifactid>pig</artifactid> <version>0.17.0</version> </dependency> |
Pig Code
In order to run Pig UDF we obviously need so me Pig code. This can be done via Grunt shell or with Oozie. Let’s assume that we want to retrieve some persons’ pesels based on last name, country and id number. First we need to define the LookUp function using the constructor, which will establish connection to the HBase. Then we can execute the function as with any Pig UDF.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
register udf-1.0-SNAPSHOT.jar; DEFINE LookUp LookUp('hbase_namespace:hbase_table'); personal_data = LOAD 'input' USING PigStorage('\t') AS( country: chararray, first_name: chararray, last_name:chararray, address: chararray, age: int, id_number: int ); personal_with_lookups = FOREACH personal_data GENERATE country: chararray, first_name: chararray, last_name:chararray, address: chararray, age: int, id_number: int ,FLATTEN(LookUp(country,last_name,id_number).pesel_number) as pesel ; STORE personal_with_lookups INTO 'output' USING PigStorage('\t'); |
Java Code
Here I created 3 classes – one for handling HBase communication, second for UDF logic and third handling the lookup key for retrieving the data. Each Pig record being processed is combined with data from the HBase table and a flag LOOKUP_MATCH specifying if the data was found or not. I assumed the each HBase row key has a following pattern: country_last_name_id_number. Also each column with name ending with “number” has a int datatype and the rest have chararray type.
Class for handling HBase connection:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; import java.util.ArrayList; import java.util.List; public class HBaseConnector { private static Connection connection; public static Table connect(String tableName) throws IOException { //initiating HBase connection Configuration conf = HBaseConfiguration.create(); connection = ConnectionFactory.createConnection(conf); return connection.getTable(TableName.valueOf(tableName)); } public static List<String> getData(Table table, String lookUpKey) throws IOException{ List<String> results = new ArrayList<>(); //retrieve the result for specified rowKey Get g = new Get(Bytes.toBytes(lookUpKey)); Result result = table.get(g); if (!result.isEmpty()) { //iterate through the result and get column values. Each Cell is a value for column for (Cell cell : result.listCells()) { String value = Bytes.toString(CellUtil.cloneValue(cell)); results.add(value); } } return results; } public static List<String> getColumnNames(Table table) throws IOException{ // retrieve the names of columns from the table List<String> columns = new ArrayList<>(); Scan scanner = new Scan(); scanner.setMaxResultSize(1); ResultScanner resultScanner = table.getScanner(scanner); Result result = resultScanner.next(); for (Cell cell : result.listCells()) { String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell)); columns.add(qualifier); } return columns; } public static void closeConnection() throws IOException{ //closing connection connection.close(); } } |
LookUp key class – for lookup key creation:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
import java.io.IOException; import java.util.List; public class LookUpKey { public static String createLookUpKey (List<Object> tupleFields) throws IOException { String lookUpKey = ""; for(Object tupleField : tupleFields){ String tupleString = tupleField.toString(); lookUpKey = lookUpKey + "_" + tupleString; } return lookUpKey.substring(1); } } |
Class with Pig UDF logic:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
import org.apache.hadoop.hbase.client.Table; import org.apache.pig.EvalFunc; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.logicalLayer.schema.SchemaUtil; import java.io.IOException; import java.util.ArrayList; import java.util.List; public class LookUp extends EvalFunc<Tuple> { public static final String LOOKUP_MATCH = "lookup_match"; private Table table; private Schema resultSchema; public LookUp(String tableName) { super(); if (tableName == null || tableName.isEmpty()) { throw new IllegalArgumentException("Wrong table name for the lookup provided: " + tableName); } try { this.table = HBaseConnector.connect(tableName); this.resultSchema = createSchema(table); } catch (IOException e) { throw new RuntimeException("Unable to retrieve data from Hbase ", e); } } @Override public Tuple exec(Tuple tuple) throws IOException { if (tuple == null || tuple.size() == 0) { throw new IllegalArgumentException("no fields for the lookup key provided. At least one field needs to be specified"); } List<Object> tupleFields = tuple.getAll(); String lookUpKey = ""; List<String> dataResult; try { lookUpKey = LookUpKey.createLookUpKey(tupleFields); } catch (IOException e) { throw new RuntimeException("Unable to create a lookup key for tuple " + tuple, e); } try { dataResult = HBaseConnector.getData(table, lookUpKey); } catch (Exception e) { throw new RuntimeException("Unable to retrieve data from HBase ", e); } return generateResultTuple(dataResult); } public Tuple generateResultTuple(List<String> dataResult) throws FrontendException { //create empty Tuple Tuple resultTuple = TupleFactory.getInstance().newTuple(0); Schema fieldSchema = resultSchema.getField(0).schema; for (int i = 0; i < fieldSchema.size(); i++) { String fieldName = fieldSchema.getField(i).alias.toLowerCase(); if (fieldName.endsWith("number")) { resultTuple.append(dataResult.isEmpty() ? -1 : Integer.parseInt(dataResult.get(i))); } else if (fieldName.equals(LOOKUP_MATCH)) { resultTuple.append(dataResult.isEmpty() ? false : true); } else { resultTuple.append(dataResult.isEmpty() ? "" : dataResult.get(i)); } } return resultTuple; } public Schema createSchema(Table table) { List<String> fieldNames = new ArrayList<>(); List<Byte> fieldTypes = new ArrayList<>(); try { fieldNames = HBaseConnector.getColumnNames(table); } catch (IOException e) { throw new RuntimeException("Unable to retrieve the tuple schema ", e); } for (String fieldName : fieldNames) { if (fieldName.toLowerCase().endsWith("number")) { fieldTypes.add(DataType.INTEGER); } else { fieldTypes.add(DataType.CHARARRAY); } } //adding info about the lookup fieldNames.add(LOOKUP_MATCH); fieldTypes.add(DataType.BOOLEAN); Schema outputSchema = new Schema(); try { outputSchema = SchemaUtil.newTupleSchema(fieldNames, fieldTypes); } catch (FrontendException e) { throw new RuntimeException(e); } return outputSchema; } @Override public Schema outputSchema(Schema input) { return resultSchema; } @Override //method run after the processing public void finish() { try { HBaseConnector.closeConnection(); } catch (IOException e) { throw new RuntimeException(e); } } } |