Pig で RowNumber を使う

Pocket

Oracle や PostgreSQL に入っている Row Number 関数。
Pig でも使いたいけど、UDF 書かないといけないのよね。
と思っていたら、こちらのサイトにソースが落っこちていた

この UDF で少しつまずいたので簡単な使い方を説明。
拾ってきたソースの先頭に、package pigUDF; を追加し、
outputSchema 付近の処理がデフォルトだと cdh3 だと動かなかったので修正。
こちらのサイトのソースを参考にして修正した。
これで rowNumber() 実行後に必ず STORE しないと行けない問題を解決。

package pigUDF;

import java.io.IOException;
import java.util.Iterator;
import org.apache.pig.EvalFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.data.DataType;

public class rowNumber extends EvalFunc<DataBag> {
  TupleFactory mTupleFactory = TupleFactory.getInstance();
  BagFactory mBagFactory = BagFactory.getInstance();
  public DataBag exec(Tuple input) throws IOException {
    try {
      DataBag output = mBagFactory.newDefaultBag();
      DataBag bg = (DataBag)input.get(0);
      Iterator it = bg.iterator();
      Long count = new Long(1);
      while(it.hasNext())
      {
        Tuple t = (Tuple)it.next();
              t.append(count);
              output.add(t);
              count = count + 1;
            }
      return output;
    }
    catch (ExecException ee) {
        // error handling here.
        throw ee;
    }
  }

    @Override
    public Schema outputSchema(Schema input)
    {
        try {
            Schema.FieldSchema inputFieldSchema = input.getField(0);

            if (inputFieldSchema.type != DataType.BAG)
            {
                throw new RuntimeException("Expected a BAG as input");
            }

            Schema inputBagSchema = inputFieldSchema.schema;
            Schema outputBagSchema = inputBagSchema.clone();

            outputBagSchema.add(new Schema.FieldSchema("rownumber", DataType.LONG));
            return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass()
                                                                .getName()
                                                                .toLowerCase(), input),
                                                outputBagSchema,
                                                DataType.BAG));
        }
        catch (CloneNotSupportedException e) {
            throw new RuntimeException(e);
        }
        catch (FrontendException e) {
            throw new RuntimeException(e);
        }
    }
}

これを hoge.jar としてホームディレクトリに準備しておく

準備した hoge.tsv は下記のようなもの。

2012-04-30     hoge
2012-05-31     fuga
2013-01-04     sunny

これを HDFS にアップロードしておく。

$ hadoop fs -copyFromLocal hoge.tsv .

続いて準備する rownumber.pig は下記のような感じ

register hoge.jar;
define RowCounter pigUDF.rowCounter();

data = LOAD 'hoge.tsv' AS (date:chararray,name:chararray);
gr = GROUP data ALL;
fr = FOREACH gr GENERATE FLATTEN(RowCounter(data));
DUMP fr;

これを実行した結果は下記のような感じ

$ pig -f rownumber.pig
(2012-04-30,hoge,1)
(2012-05-31,fuga,2)
(2013-01-04,sunny,3)

最後に RowCounter の結果が追加されているのがわかる。

追記:
 2013/04/05 rowNumber の outputSchema の処理を変更

コメントを残す