Pig で RowNumber を使う

Pocket

Oracle や PostgreSQL に入っている Row Number 関数。
Pig でも使いたいけど、UDF 書かないといけないのよね。
と思っていたら、こちらのサイトにソースが落っこちていた

この UDF で少しつまずいたので簡単な使い方を説明。
拾ってきたソースの先頭に、package pigUDF; を追加し、
outputSchema 付近の処理がデフォルトだと cdh3 だと動かなかったので修正。
こちらのサイトのソースを参考にして修正した。
これで rowNumber() 実行後に必ず STORE しないと行けない問題を解決。

<br />
package pigUDF;</p>
<p>import java.io.IOException;<br />
import java.util.Iterator;<br />
import org.apache.pig.EvalFunc;<br />
import org.apache.pig.backend.executionengine.ExecException;<br />
import org.apache.pig.data.BagFactory;<br />
import org.apache.pig.data.DataBag;<br />
import org.apache.pig.data.Tuple;<br />
import org.apache.pig.data.TupleFactory;<br />
import org.apache.pig.impl.logicalLayer.FrontendException;<br />
import org.apache.pig.impl.logicalLayer.schema.Schema;<br />
import org.apache.pig.data.DataType;</p>
<p>public class rowNumber extends EvalFunc&lt;DataBag&gt; {<br />
  TupleFactory mTupleFactory = TupleFactory.getInstance();<br />
  BagFactory mBagFactory = BagFactory.getInstance();<br />
  public DataBag exec(Tuple input) throws IOException {<br />
    try {<br />
      DataBag output = mBagFactory.newDefaultBag();<br />
      DataBag bg = (DataBag)input.get(0);<br />
      Iterator it = bg.iterator();<br />
      Long count = new Long(1);<br />
      while(it.hasNext())<br />
      {<br />
        Tuple t = (Tuple)it.next();<br />
              t.append(count);<br />
              output.add(t);<br />
              count = count + 1;<br />
            }<br />
      return output;<br />
    }<br />
    catch (ExecException ee) {<br />
        // error handling here.<br />
        throw ee;<br />
    }<br />
  }</p>
<p>    @Override<br />
    public Schema outputSchema(Schema input)<br />
    {<br />
        try {<br />
            Schema.FieldSchema inputFieldSchema = input.getField(0);</p>
<p>            if (inputFieldSchema.type != DataType.BAG)<br />
            {<br />
                throw new RuntimeException(&quot;Expected a BAG as input&quot;);<br />
            }</p>
<p>            Schema inputBagSchema = inputFieldSchema.schema;<br />
            Schema outputBagSchema = inputBagSchema.clone();</p>
<p>            outputBagSchema.add(new Schema.FieldSchema(&quot;rownumber&quot;, DataType.LONG));<br />
            return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass()<br />
                                                                .getName()<br />
                                                                .toLowerCase(), input),<br />
                                                outputBagSchema,<br />
                                                DataType.BAG));<br />
        }<br />
        catch (CloneNotSupportedException e) {<br />
            throw new RuntimeException(e);<br />
        }<br />
        catch (FrontendException e) {<br />
            throw new RuntimeException(e);<br />
        }<br />
    }<br />
}<br />

これを hoge.jar としてホームディレクトリに準備しておく

準備した hoge.tsv は下記のようなもの。

<br />
2012-04-30     hoge<br />
2012-05-31     fuga<br />
2013-01-04     sunny<br />

これを HDFS にアップロードしておく。

<br />
$ hadoop fs -copyFromLocal hoge.tsv .<br />

続いて準備する rownumber.pig は下記のような感じ

<br />
register hoge.jar;<br />
define RowCounter pigUDF.rowCounter();</p>
<p>data = LOAD 'hoge.tsv' AS (date:chararray,name:chararray);<br />
gr = GROUP data ALL;<br />
fr = FOREACH gr GENERATE FLATTEN(RowCounter(data));<br />
DUMP fr;<br />

これを実行した結果は下記のような感じ

<br />
$ pig -f rownumber.pig<br />
(2012-04-30,hoge,1)<br />
(2012-05-31,fuga,2)<br />
(2013-01-04,sunny,3)<br />

最後に RowCounter の結果が追加されているのがわかる。

追記:
 2013/04/05 rowNumber の outputSchema の処理を変更

コメントを残す