Pig Java UDF
We can define Pig UDF in few languages: Java, Jython, JavaScript, Ruby, Groovy and Python. But currently the biggest choice of options we have in Java, so I’ll stick to it in this post.
When it comes to Pig Java UDFs we can distinguish few types, depending on the type of operation we want to perform. But lets start with the common part. Once the UDF is ready we need to include its jar in the Pig script:
1 |
REGISTER myudf.jar |
Remember to include the package name when using the UDF, or specify the alias for it with DEFINE, like that:
1 |
DEFINE myFunction mypackage.myUDFunction(); |
Contents
HOW TO
Let’s take a look at the UDF’s types by following different use cases:
1. One parameter function, working on a single row
Such function takes one value and replaces it with some other value.
1 |
B = FOREACH A GENERATE myFunction(fieldA); |
To define such UDF you need to extend EvalFunc class and implement the logic in exec method. In <> following EvalFunc specify which data type will be returned.
Example:
1 2 3 4 5 6 7 8 9 10 11 12 |
public class LOWERCASE extends EvalFunc<String> { public String exec(Tuple inputTuple) throws IOException { if (inputTuple == null || input.size() == 0) return null; try{ String str = (String)inputTuple.get(0); return str.toLowerCase(); }catch(Exception e){ //exception logic } } } |
2. Function working on a set of columns or the whole tuple, for a single row
This function can replace some values by mixing them all together.
1 |
B = FOREACH A GENERATE FLATTEN(myFunction($0..$21)); |
Here you need to specify all needed columns of the tuple. FLATTEN function ensures that the result will have a columnar unnested representation. Similar to previous case you need to extend EvalFunc (this time Tuple will be returned) and implement exec method.
Example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
public class CONCATENATE extends EvalFunc<Tuple> { public Tuple exec(Tuple inputTuple) throws IOException { if (inputTuple == null || input.size() == 0) return null; try{ Tuple outputTuple = TupleFactory.getInstance().newTuple(1); String first = (String)inputTuple.get(0); String second = (String)inputTuple.get(1); String concatenated = first + “ | ” + second; return outputTuple.set(0, concatenated); }catch(Exception e){ //exception logic } } } |
3. Function working on a set of Tuples – DataBag
In this case we talk about the aggregating function.
1 2 3 4 5 6 7 8 9 |
B = GROUP A BY ( col1, col2 ); C = FOREACH B GENERATE col1, col2, myFunction(A); |
To create an aggregating UDF working on set of tuples we actually need to implement a set of functions. This comes from the parallel MapReduce processing. Separate function for map, combine and reduce step is needed.
For this purpose we need to create a class which implements Algebraic interface that consist of definition of three classes derived from EvalFunc: Initial (for map phase), Intermed (for combine phase), Final (for reduce phase). All of this classes need to implement its own exec method. Additionally Algebraic interface has 3 methods to be implemented:
1 2 3 |
public String getInitial(); public String getIntermed(); public String getFinal(); |
Here is an example derived from the Pig documentation:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
public class COUNT extends EvalFunc (Long) implements Algebraic{ public Long exec(Tuple input) throws IOException {return count(input);} public String getInitial() {return Initial.class.getName();} public String getIntermed() {return Intermed.class.getName();} public String getFinal() {return Final.class.getName();} static public class Initial extends EvalFunc (Tuple) { public Tuple exec(Tuple input) throws IOException {return TupleFactory.getInstance().newTuple(count(input));} } static public class Intermed extends EvalFunc (Tuple) { public Tuple exec(Tuple input) throws IOException {return TupleFactory.getInstance().newTuple(sum(input));} } static public class Final extends EvalFunc (Long) { public Tuple exec(Tuple input) throws IOException {return sum(input);} } static protected Long count(Tuple input) throws ExecException { Object values = input.get(0); if (values instanceof DataBag) return ((DataBag)values).size(); else if (values instanceof Map) return new Long(((Map)values).size()); } static protected Long sum(Tuple input) throws ExecException, NumberFormatException { DataBag values = (DataBag)input.get(0); long sum = 0; for (Iterator (Tuple) it = values.iterator(); it.hasNext();) { Tuple t = it.next(); sum += (Long)t.get(0); } return sum; } } |
4. Function working on a small set of Tuples – DataBag
Function which requires some custom logic to be applied combining values from just few rows.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
grouped_data = GROUP data BY ( column_a, column_b, column_c, column_d) ; processed_data = FOREACH grouped_data { sorted = ORDER data BY column_d, column_e, column_f; calculated = myFunction(sorted); GENERATE FLATTEN(calculated); }; |
If you want to apply some custom logic on a set of tuples (like tuples with a given key field) you need to ensure that they all go to the same reducer. The simplest way to do it is by using a GROUP function. You may also control the order of the grouped tuples coming to given UDF with ORDER function.
Note: Created groups shouldn’t be too big, otherwise some of your reducers may be overwhelmed or running very long.
To process such ordered tuple set your UDF needs to extend EvalFunc and implement exec method.
Example:
1 2 3 4 5 6 7 |
public class myFunction extends EvalFunc<DataBag> { public DataBag exec(Tuple tuple) throws IOException { DataBag inputBag = (DataBag) tuple.get(0); for (Tuple inputRow : inputBag) { //some custom logic } } |
5. Filtering function
This function can be used for filtering out some records, when applied to the record level. When applied to the DataBag level, checks if the DataBag matches some given condition.
1 |
B = FILTER A BY myFunction(fieldA); |
To create a filtering function you need to extend FilterFunc class and implement exec method returning Boolean.
Example:
1 2 3 4 5 6 7 8 |
public class TOOLONG extends FilterFunc { public Boolean exec(Tuple inputTuple) throws IOException { if (inputTuple == null || input.size() < 10) return false; else return true; } } |
Bookmarked!, I love it!