Mapper 와 Reducer (Combiner도 Reducer 를 상속하여 구현한다) 는 job 에 의해

 

내부의 run 메소드가 실행된다.

 

Mapper 의 경우

 public void run(Context context) throws IOException, InterruptedException {

    setup(context);

    try {

      while (context.nextKeyValue()) {

        map(context.getCurrentKey(), context.getCurrentValue(), context);

      }

    } finally {

      cleanup(context);

    }

  }

 

 

위와 같은 로직이 동작하며, mapper 를 상속하여 구현하는 map 이 호출되기 전에 setup 이

완료후에는 cleanup 이 호출되는 것을 확인할 수 있다.

 

setup -> map (해당 datanode 에서 읽은 파일의 라인수 만큼 반복) -> cleanup


즉 이런 식으로 호출된다.

 

즉 initialize 형태의 로직이 필요하다면 setup 을 상속해서

완료이후의 로직이 필요하다면 cleanup을 상속해서 처리 할 수 있다.

'IT > 빅데이터(bigData)' 카테고리의 다른 글

Hive 의 Serde 1  (0) 2014.11.25
MapReduce 데이터 흐름 분석  (0) 2014.11.25
여러 기능의 맵리듀스를 하나의 jar 로 묶을때  (0) 2014.11.22
Hadoop 의 Counter 이야기  (0) 2014.11.22
Reduce 에서 counter 조회  (0) 2014.11.22

 

여러 기능의 맵리듀스를 하나의 jar 로 묶을때 다음과 같이 ProgramDriver 에 해당 class 들을 등록하고

ProgramDriver  에 각 클래스를 등록한 class 를 jar 의 main 클래스로 잡아서 jar 로 압축한다.

 

실행시 해당 jar 에 등록된 alias 로 실행한다.

$ hadoop jar analyserDriver.jar max   < 기타 max 에서 구현된 파라미터 >

 

 public class AnalyserDriver {

 

   public static void main(String argv[]) {

     int exitCode = -1;

     ProgramDriver programDriver = new ProgramDriver();

     try {

          programDriver.addClass("standarddeviation", StandardDeviationDriver.class, 

                "A map/reduce program that calculate the standarddeviation in the input files.");

 programDriver.addClass("mode", ModeDriver.class, 

                "A map/reduce program that calculates the mode in the input files.");

programDriver.addClass("max", Max.class, 

  "A map/reduce program that max the total row in the input files.");

          programDriver.addClass("min", Min.class, 

                "A map/reduce program that min the total row in the input files.");

          programDriver.addClass("median", Median.class,

                "A map/reduce program that median the total row in the input files.");

        exitCode = programDriver.run(argv); 

    } catch (Throwable e) {

          e.printStackTrace();

    }

    System.exit(exitCode);

  }

}

 

 

'IT > 빅데이터(bigData)' 카테고리의 다른 글

MapReduce 데이터 흐름 분석  (0) 2014.11.25
Mapper 클래스 구성 및 사용  (0) 2014.11.22
Hadoop 의 Counter 이야기  (0) 2014.11.22
Reduce 에서 counter 조회  (0) 2014.11.22
hadoop counter 사용  (0) 2014.11.21

mapreduce 프로그래밍을 하면 counter가 필요한 곳은 대부분 reducer 이다

counter 는 job 안에 존재하며,

reducer의 context 가 가진 counter 를 조회한다면 그 값은 mapper 들에서 취합된 값이 아니므로 항상 0 이 나온다

 

따라서, reducer 를 동작중인 job 을 획득하여 counter 를 조회하여야 한다.

보통 reduce 메소드가 돌기전에 획득해 놓고 사용하기 때문에 setup 에서 구현한다.

 

 long totalVal = 0;


 @Override

 public void setup(Context context) throws IOException, InterruptedException{

   Configuration conf = context.getConfiguration();

   Cluster cluster = new Cluster(conf);

  Job currentJob = cluster.getJob(context.getJobID());

  totalCount = currentJob.getCounters().findCounter(MATCH_COUNTER.TOTAL_COUNT).getValue();  }

 

Hadoop 의 기본예제인 WordCount 를 기반으로 설명한다.

프로그래밍은 아래와 같은 4가지 부분에 대하여 작성한다.

 

기본 분산처리된 데이터는 각 네임노드의 mapper 와 combiner 의 context에 저장된 값이 reducer 로 취합되며

reduce 취합시에는 key 로 정렬된다.

 

흐름은 아래와 같다.

 

 Job 등록 및 시작


Mapper

Combiner

shuffle 

Reducer

 최종결과 hdfs에 저장

 

 


Mapper 작성

Mapper를 작성시에는 Mapper <KEYIN,  VALUEIN,  KEYOUT, VALUEOUT> 클래스의 KEYIN, VALUEIN, KEYOUT, VALUEOUT의 4가지를 정의 하여 상속한다.  Map 의 경우에는 KEYIN key, VALUEIN value, Context context  3가지 파라미터로 구현하며, Type 은 클래스에서 정의 한 타입과 동일하게 구현한다.

 public static class TokenizerMapper

       extends Mapper<Object, Text, Text, IntWritable>{

 

    private final static IntWritable one = new IntWritable(1);

    private Text word = new Text();

 

    public void map(Object key, Text value, Context context

                    ) throws IOException, InterruptedException {

      StringTokenizer itr = new StringTokenizer(value.toString());

      while (itr.hasMoreTokens()) {

        word.set(itr.nextToken());

        context.write(word, one);

      }

    }

}

 

 

 

Mapper 구현 부분의 map 함수가 파일을 TextInputFormat 형태로데이터의 한줄(line)씩 읽은 값이 파리미터 Text value 로 받는다이 값을StringTokenizer 를 이용하여 (line)  token 으로 나누고 key-value pair  <word, 1>  Combiner 또는 Reducer 에 전달한다.

 

 

Combiner 작성

Combiner  Reduce 의 동일한 ReduceContext <KEYIN, VALUEIN, KEYOUT, VALUEOUT>를 상속하여 구현한다.

 public static class IntSumReducer

       extends Reducer<Text,IntWritable,Text,IntWritable> {

    private IntWritable result = new IntWritable();

 

    public void reduce(Text key, Iterable<IntWritable> values,

                       Context context

                       ) throws IOException, InterruptedException {

      int sum = 0;

      for (IntWritable val : values) {

        sum += val.get();

      }

      result.set(sum);

      context.write(key, result);

    }

  }

 

 

WordCount  combine를 정의하며.  map 의 결과물 (output)  local combiner 에게 전달되어 정렬되고합산되어 Reducer 에 전달된다.

 

 

Reducer 작성

Reduce 의 동일한 ReduceContext <KEYIN, VALUEIN, KEYOUT, VALUEOUT>를 상속하여 구현한다.

 public static class IntSumReducer

       extends Reducer<Text,IntWritable,Text,IntWritable> {

    private IntWritable result = new IntWritable();

 

    public void reduce(Text key, Iterable<IntWritable> values,

                       Context context

                       ) throws IOException, InterruptedException {

      int sum = 0;

      for (IntWritable val : values) {

        sum += val.get();

      }

      result.set(sum);

      context.write(key, result);

    }

  }

 

 

 

Reducer 구현 부분의 reduce 함수는 각 key 별로 shuffle되어 같은 값으로 모아진 Iterable<IntWritable> values 를 카운팅하여 해당 Text 값을 key 로 개수를 최종값으로 저장한다.

 

 

Job 등록

위와 같은 방식으로 작성된 Mapper, Combiner, Reducer  Job 으로 등록하여 실행한다.

  public static void main(String[] args) throws Exception {

    Configuration conf = new Configuration();

    Job job = Job.getInstance(conf, "word count");

    job.setJarByClass(WordCount.class);

    job.setMapperClass(TokenizerMapper.class);

    job.setCombinerClass(IntSumReducer.class);

    job.setReducerClass(IntSumReducer.class);

    job.setOutputKeyClass(Text.class);

    job.setOutputValueClass(IntWritable.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));

    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    System.exit(job.waitForCompletion(true) ? 0 : 1);

  }

} 

 

 

run 함수는 input/output 경로나, key-value 의 타입, input/output 의 포맷등을 JobConf 를 통해서 job 의 여러 성질을 정의한다.그리고 JobClient.runJob 을 호출하여 job 을 실행하고 진행상황을 모니터한다.


이 경우에는 combiner 와 reducer 에 같은 클래스를 등록했다. 두 클래스를 다루는 자세한 내용은 이후에 다루도록 하겠다

+ Recent posts