Hadoop 의 기본예제인 WordCount 를 기반으로 설명한다.

프로그래밍은 아래와 같은 4가지 부분에 대하여 작성한다.

 

기본 분산처리된 데이터는 각 네임노드의 mapper 와 combiner 의 context에 저장된 값이 reducer 로 취합되며

reduce 취합시에는 key 로 정렬된다.

 

흐름은 아래와 같다.

 

 Job 등록 및 시작


Mapper

Combiner

shuffle 

Reducer

 최종결과 hdfs에 저장

 

 


Mapper 작성

Mapper를 작성시에는 Mapper <KEYIN,  VALUEIN,  KEYOUT, VALUEOUT> 클래스의 KEYIN, VALUEIN, KEYOUT, VALUEOUT의 4가지를 정의 하여 상속한다.  Map 의 경우에는 KEYIN key, VALUEIN value, Context context  3가지 파라미터로 구현하며, Type 은 클래스에서 정의 한 타입과 동일하게 구현한다.

 public static class TokenizerMapper

       extends Mapper<Object, Text, Text, IntWritable>{

 

    private final static IntWritable one = new IntWritable(1);

    private Text word = new Text();

 

    public void map(Object key, Text value, Context context

                    ) throws IOException, InterruptedException {

      StringTokenizer itr = new StringTokenizer(value.toString());

      while (itr.hasMoreTokens()) {

        word.set(itr.nextToken());

        context.write(word, one);

      }

    }

}

 

 

 

Mapper 구현 부분의 map 함수가 파일을 TextInputFormat 형태로데이터의 한줄(line)씩 읽은 값이 파리미터 Text value 로 받는다이 값을StringTokenizer 를 이용하여 (line)  token 으로 나누고 key-value pair  <word, 1>  Combiner 또는 Reducer 에 전달한다.

 

 

Combiner 작성

Combiner  Reduce 의 동일한 ReduceContext <KEYIN, VALUEIN, KEYOUT, VALUEOUT>를 상속하여 구현한다.

 public static class IntSumReducer

       extends Reducer<Text,IntWritable,Text,IntWritable> {

    private IntWritable result = new IntWritable();

 

    public void reduce(Text key, Iterable<IntWritable> values,

                       Context context

                       ) throws IOException, InterruptedException {

      int sum = 0;

      for (IntWritable val : values) {

        sum += val.get();

      }

      result.set(sum);

      context.write(key, result);

    }

  }

 

 

WordCount  combine를 정의하며.  map 의 결과물 (output)  local combiner 에게 전달되어 정렬되고합산되어 Reducer 에 전달된다.

 

 

Reducer 작성

Reduce 의 동일한 ReduceContext <KEYIN, VALUEIN, KEYOUT, VALUEOUT>를 상속하여 구현한다.

 public static class IntSumReducer

       extends Reducer<Text,IntWritable,Text,IntWritable> {

    private IntWritable result = new IntWritable();

 

    public void reduce(Text key, Iterable<IntWritable> values,

                       Context context

                       ) throws IOException, InterruptedException {

      int sum = 0;

      for (IntWritable val : values) {

        sum += val.get();

      }

      result.set(sum);

      context.write(key, result);

    }

  }

 

 

 

Reducer 구현 부분의 reduce 함수는 각 key 별로 shuffle되어 같은 값으로 모아진 Iterable<IntWritable> values 를 카운팅하여 해당 Text 값을 key 로 개수를 최종값으로 저장한다.

 

 

Job 등록

위와 같은 방식으로 작성된 Mapper, Combiner, Reducer  Job 으로 등록하여 실행한다.

  public static void main(String[] args) throws Exception {

    Configuration conf = new Configuration();

    Job job = Job.getInstance(conf, "word count");

    job.setJarByClass(WordCount.class);

    job.setMapperClass(TokenizerMapper.class);

    job.setCombinerClass(IntSumReducer.class);

    job.setReducerClass(IntSumReducer.class);

    job.setOutputKeyClass(Text.class);

    job.setOutputValueClass(IntWritable.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));

    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    System.exit(job.waitForCompletion(true) ? 0 : 1);

  }

} 

 

 

run 함수는 input/output 경로나, key-value 의 타입, input/output 의 포맷등을 JobConf 를 통해서 job 의 여러 성질을 정의한다.그리고 JobClient.runJob 을 호출하여 job 을 실행하고 진행상황을 모니터한다.


이 경우에는 combiner 와 reducer 에 같은 클래스를 등록했다. 두 클래스를 다루는 자세한 내용은 이후에 다루도록 하겠다

+ Recent posts