Hadoop 의 기본예제인 WordCount 를 기반으로 설명한다.
프로그래밍은 아래와 같은 4가지 부분에 대하여 작성한다.
기본 분산처리된 데이터는 각 네임노드의 mapper 와 combiner 의 context에 저장된 값이 reducer 로 취합되며
reduce 취합시에는 key 로 정렬된다.
흐름은 아래와 같다.
Job 등록 및 시작 ↓ Mapper ↓ Combiner ↓ shuffle ↓ Reducer ↓ 최종결과 hdfs에 저장 |
Mapper 작성
Mapper를 작성시에는 Mapper <KEYIN, VALUEIN, KEYOUT, VALUEOUT> 클래스의 KEYIN, VALUEIN, KEYOUT, VALUEOUT의 4가지를 정의 하여 상속한다. Map 의 경우에는 KEYIN key, VALUEIN value, Context context 의 3가지 파라미터로 구현하며, Type 은 클래스에서 정의 한 타입과 동일하게 구현한다.
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1); private Text word = new Text();
public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } |
Mapper 구현 부분의 map 함수가 파일을 TextInputFormat 형태로, 데이터의 한줄(line)씩 읽은 값이 파리미터 Text value 로 받는다. 이 값을StringTokenizer 를 이용하여, 줄 (line) 을 token 으로 나누고 key-value pair 인 <word, 1> 를 Combiner 또는 Reducer 에 전달한다.
Combiner 작성
Combiner 는 Reduce 의 동일한 ReduceContext <KEYIN, VALUEIN, KEYOUT, VALUEOUT>를 상속하여 구현한다.
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } |
WordCount 의 combine를 정의하며. map 의 결과물 (output) 은 local combiner 에게 전달되어 정렬되고, 합산되어 Reducer 에 전달된다.
Reducer 작성
Reduce 의 동일한 ReduceContext <KEYIN, VALUEIN, KEYOUT, VALUEOUT>를 상속하여 구현한다.
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } |
Reducer 구현 부분의 reduce 함수는 각 key 별로 shuffle되어 같은 값으로 모아진 Iterable<IntWritable> values 를 카운팅하여 해당 Text 값을 key 로 개수를 최종값으로 저장한다.
Job 등록
위와 같은 방식으로 작성된 Mapper, Combiner, Reducer 는 Job 으로 등록하여 실행한다.
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } } |
run 함수는 input/output 경로나, key-value 의 타입, input/output 의 포맷등을 JobConf 를 통해서 job 의 여러 성질을 정의한다.그리고 JobClient.runJob 을 호출하여 job 을 실행하고 진행상황을 모니터한다.
이 경우에는 combiner 와 reducer 에 같은 클래스를 등록했다. 두 클래스를 다루는 자세한 내용은 이후에 다루도록 하겠다
'IT > 빅데이터(bigData)' 카테고리의 다른 글
여러 기능의 맵리듀스를 하나의 jar 로 묶을때 (0) | 2014.11.22 |
---|---|
Hadoop 의 Counter 이야기 (0) | 2014.11.22 |
Reduce 에서 counter 조회 (0) | 2014.11.22 |
hadoop counter 사용 (0) | 2014.11.21 |
하이브(hive)에 대한 외부접속 (0) | 2014.11.21 |