udf 란?

hive에서 사용하는 function 을 사용자가 직접구현하는 사용자 정의 펑션(user-defined-function의 약자)이다.

즉 user 하이브에서 사용할 function 을 구현하는 것이다. 일반 데이터베이스의 function 을(오라클 to_date, sum 등)을 직접 구현한다고 생각하면 된다.

 

syntax>

select <user-defined-function>(<colum_number>) from <table> ;


구현소스 

@Description(

  name="SimpleUDFExample",

  value="returns 'even x' or return 'odd x', where x is whatever you give it (Int)",

  extended="SELECT simpleudfexample('world') from foo limit 1;"

  )

class SimpleUDFExample extends UDF {

  

    public Text evaluate(IntWritable input) {

        if(input == null) return null;

        return new Text( (input.get()%2 == 0 ?"even ":"odd " )+ input.toString());

    }

}

  

보여주고자 하는 타입에 따라 return type을 설정하고, 매개변수는 function 의 입력 파라미터의 수 및 타입이 맞아야 한다.

매개변수는 Writable 을 상속한 클래스여야 한다.


 public Text evaluate(Text param1, IntWritable param2.... ) {

위와 같이 증가 시킬 수 있다.


evaluate 는 리턴은 void 는 안된다.( 리턴값이 function 의 실행 결과 같이 된다.)

 


hive 에서의 사용 예제


예시 데이타>

hive> select noticeseq from t_test;

1

2

3

4

 

 등록후 실행>

hive> ADD JAR ./hivetemplate.jar; 

* 해당 udf가 구현된 jar를 hive 에 등록한다.

hive> CREATE TEMPORARY FUNCTION examudf as 'SimpleUDFExample';

* 해당 udf 구현클래스를 function 으로 생성한다.

hive> select examudf(noticeseq ) from t_test;

odd 1

even 2

odd 3

even 4


'IT > 빅데이터(bigData)' 카테고리의 다른 글

hive 에서 형변환 함수  (0) 2014.12.02
Hive 의 Serde 2  (0) 2014.11.25
Hive 의 Serde 1  (0) 2014.11.25
MapReduce 데이터 흐름 분석  (0) 2014.11.25
Mapper 클래스 구성 및 사용  (0) 2014.11.22

Hive  Serde


Serde 란 ?

직렬화와 역직렬화를 뜻하며, HDFS  HIVE 간의 데이터 교환을 뜻한다.

역직렬화(Deserializer)

HDFS files  ▶ InputFileFormat  <key, value>  Deserializer  Row object

HDFS 의 파일을 Hive 테이블에 저장

 

직렬화(Serializer)

Row object  Serializer  <key, value>  OutputFileFormat  HDFS files

Hive 테이블의 정보를 HDFS 의 파일로 저장


'IT > 빅데이터(bigData)' 카테고리의 다른 글

하이브(hive) udf  (0) 2014.11.25
Hive 의 Serde 2  (0) 2014.11.25
MapReduce 데이터 흐름 분석  (0) 2014.11.25
Mapper 클래스 구성 및 사용  (0) 2014.11.22
여러 기능의 맵리듀스를 하나의 jar 로 묶을때  (0) 2014.11.22

mapreduce 프로그래밍을 하면 counter가 필요한 곳은 대부분 reducer 이다

counter 는 job 안에 존재하며,

reducer의 context 가 가진 counter 를 조회한다면 그 값은 mapper 들에서 취합된 값이 아니므로 항상 0 이 나온다

 

따라서, reducer 를 동작중인 job 을 획득하여 counter 를 조회하여야 한다.

보통 reduce 메소드가 돌기전에 획득해 놓고 사용하기 때문에 setup 에서 구현한다.

 

 long totalVal = 0;


 @Override

 public void setup(Context context) throws IOException, InterruptedException{

   Configuration conf = context.getConfiguration();

   Cluster cluster = new Cluster(conf);

  Job currentJob = cluster.getJob(context.getJobID());

  totalCount = currentJob.getCounters().findCounter(MATCH_COUNTER.TOTAL_COUNT).getValue();  }

 

mapper 는 기본적으로 자신의 데이터노드에서 동작하기 때문에 여러 데이터노드의 경우 서로의 값의 공유할 수가 없다.

그에 때문에 hadoop 에서는 counter 를 통해 서로의 정보를 업데이트하여 reducer 에게 전달할 수 있다.

(단 long에 대한 증가/감소만 가능)

 

카운터의 등록은 enum 으로 등록하여야 하며, 필요한 enum 을 개발자는 생성해야 한다.

 

 아래와 같은 enum 을 생성했다며

 

 public static enum MATCH_COUNTER {

  TOTAL_COUNT

 };

 

  mapper 에서는 아래와 같은 로직을 통해 특정 값을 증가 시킬 수 있다.

  @Override

  public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

    StringTokenizer itr = new StringTokenizer(value.toString());

        while (itr.hasMoreTokens()) {

   word.set(itr.nextToken());

             context.write(word, one);

             context.getCounter(MATCH_COUNTER.TOTAL_COUNT).increment(1); 

             // counter 증가

       }

  }

 

 

이렇게 증가된 값은 아래와 같이 사용할 수 있다.

 public static void main(String[] args) throws Exception {

    Configuration conf = new Configuration();

    Job job = Job.getInstance(conf, "word count");

    .... 중략

    job.waitForCompletion(true)

    long totalCnt = job.getCounters().findCounter(MATCH_COUNTER.TOTAL_COUNT)).getValue();  

     ... 중략

  }

 

 

 

 namenode 1  :  outdata { "a","b","c"} 3개

 namenode 2  :  outdata { "d,"f"} 2개

 

counter 의 값은 5가 된다.

Hadoop 의 기본예제인 WordCount 를 기반으로 설명한다.

프로그래밍은 아래와 같은 4가지 부분에 대하여 작성한다.

 

기본 분산처리된 데이터는 각 네임노드의 mapper 와 combiner 의 context에 저장된 값이 reducer 로 취합되며

reduce 취합시에는 key 로 정렬된다.

 

흐름은 아래와 같다.

 

 Job 등록 및 시작


Mapper

Combiner

shuffle 

Reducer

 최종결과 hdfs에 저장

 

 


Mapper 작성

Mapper를 작성시에는 Mapper <KEYIN,  VALUEIN,  KEYOUT, VALUEOUT> 클래스의 KEYIN, VALUEIN, KEYOUT, VALUEOUT의 4가지를 정의 하여 상속한다.  Map 의 경우에는 KEYIN key, VALUEIN value, Context context  3가지 파라미터로 구현하며, Type 은 클래스에서 정의 한 타입과 동일하게 구현한다.

 public static class TokenizerMapper

       extends Mapper<Object, Text, Text, IntWritable>{

 

    private final static IntWritable one = new IntWritable(1);

    private Text word = new Text();

 

    public void map(Object key, Text value, Context context

                    ) throws IOException, InterruptedException {

      StringTokenizer itr = new StringTokenizer(value.toString());

      while (itr.hasMoreTokens()) {

        word.set(itr.nextToken());

        context.write(word, one);

      }

    }

}

 

 

 

Mapper 구현 부분의 map 함수가 파일을 TextInputFormat 형태로데이터의 한줄(line)씩 읽은 값이 파리미터 Text value 로 받는다이 값을StringTokenizer 를 이용하여 (line)  token 으로 나누고 key-value pair  <word, 1>  Combiner 또는 Reducer 에 전달한다.

 

 

Combiner 작성

Combiner  Reduce 의 동일한 ReduceContext <KEYIN, VALUEIN, KEYOUT, VALUEOUT>를 상속하여 구현한다.

 public static class IntSumReducer

       extends Reducer<Text,IntWritable,Text,IntWritable> {

    private IntWritable result = new IntWritable();

 

    public void reduce(Text key, Iterable<IntWritable> values,

                       Context context

                       ) throws IOException, InterruptedException {

      int sum = 0;

      for (IntWritable val : values) {

        sum += val.get();

      }

      result.set(sum);

      context.write(key, result);

    }

  }

 

 

WordCount  combine를 정의하며.  map 의 결과물 (output)  local combiner 에게 전달되어 정렬되고합산되어 Reducer 에 전달된다.

 

 

Reducer 작성

Reduce 의 동일한 ReduceContext <KEYIN, VALUEIN, KEYOUT, VALUEOUT>를 상속하여 구현한다.

 public static class IntSumReducer

       extends Reducer<Text,IntWritable,Text,IntWritable> {

    private IntWritable result = new IntWritable();

 

    public void reduce(Text key, Iterable<IntWritable> values,

                       Context context

                       ) throws IOException, InterruptedException {

      int sum = 0;

      for (IntWritable val : values) {

        sum += val.get();

      }

      result.set(sum);

      context.write(key, result);

    }

  }

 

 

 

Reducer 구현 부분의 reduce 함수는 각 key 별로 shuffle되어 같은 값으로 모아진 Iterable<IntWritable> values 를 카운팅하여 해당 Text 값을 key 로 개수를 최종값으로 저장한다.

 

 

Job 등록

위와 같은 방식으로 작성된 Mapper, Combiner, Reducer  Job 으로 등록하여 실행한다.

  public static void main(String[] args) throws Exception {

    Configuration conf = new Configuration();

    Job job = Job.getInstance(conf, "word count");

    job.setJarByClass(WordCount.class);

    job.setMapperClass(TokenizerMapper.class);

    job.setCombinerClass(IntSumReducer.class);

    job.setReducerClass(IntSumReducer.class);

    job.setOutputKeyClass(Text.class);

    job.setOutputValueClass(IntWritable.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));

    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    System.exit(job.waitForCompletion(true) ? 0 : 1);

  }

} 

 

 

run 함수는 input/output 경로나, key-value 의 타입, input/output 의 포맷등을 JobConf 를 통해서 job 의 여러 성질을 정의한다.그리고 JobClient.runJob 을 호출하여 job 을 실행하고 진행상황을 모니터한다.


이 경우에는 combiner 와 reducer 에 같은 클래스를 등록했다. 두 클래스를 다루는 자세한 내용은 이후에 다루도록 하겠다

+ Recent posts