오라클 등에서는 to_number 와 같은 함수를 지원하는데 

그에 대해서 하이브는 cast 라는 함수를 지원한다.

사용법은 다음과 같다.



syntax> cast(컬럼 as 타입)

타입은 int, double, string 과 같은 형태를 말한다.


* 예시

 select stddev_pop(cast(population as double)) stddev ,

          variance(cast(population as double)) variance

 from lar_ca where population is not null;


기타로 stddev_pop 는 표준편차를

variance 은 분산을 구하는 함수이다.

'IT > 빅데이터(bigData)' 카테고리의 다른 글

하이브(hive) udf  (0) 2014.11.25
Hive 의 Serde 2  (0) 2014.11.25
Hive 의 Serde 1  (0) 2014.11.25
MapReduce 데이터 흐름 분석  (0) 2014.11.25
Mapper 클래스 구성 및 사용  (0) 2014.11.22

udf 란?

hive에서 사용하는 function 을 사용자가 직접구현하는 사용자 정의 펑션(user-defined-function의 약자)이다.

즉 user 하이브에서 사용할 function 을 구현하는 것이다. 일반 데이터베이스의 function 을(오라클 to_date, sum 등)을 직접 구현한다고 생각하면 된다.

 

syntax>

select <user-defined-function>(<colum_number>) from <table> ;


구현소스 

@Description(

  name="SimpleUDFExample",

  value="returns 'even x' or return 'odd x', where x is whatever you give it (Int)",

  extended="SELECT simpleudfexample('world') from foo limit 1;"

  )

class SimpleUDFExample extends UDF {

  

    public Text evaluate(IntWritable input) {

        if(input == null) return null;

        return new Text( (input.get()%2 == 0 ?"even ":"odd " )+ input.toString());

    }

}

  

보여주고자 하는 타입에 따라 return type을 설정하고, 매개변수는 function 의 입력 파라미터의 수 및 타입이 맞아야 한다.

매개변수는 Writable 을 상속한 클래스여야 한다.


 public Text evaluate(Text param1, IntWritable param2.... ) {

위와 같이 증가 시킬 수 있다.


evaluate 는 리턴은 void 는 안된다.( 리턴값이 function 의 실행 결과 같이 된다.)

 


hive 에서의 사용 예제


예시 데이타>

hive> select noticeseq from t_test;

1

2

3

4

 

 등록후 실행>

hive> ADD JAR ./hivetemplate.jar; 

* 해당 udf가 구현된 jar를 hive 에 등록한다.

hive> CREATE TEMPORARY FUNCTION examudf as 'SimpleUDFExample';

* 해당 udf 구현클래스를 function 으로 생성한다.

hive> select examudf(noticeseq ) from t_test;

odd 1

even 2

odd 3

even 4


'IT > 빅데이터(bigData)' 카테고리의 다른 글

hive 에서 형변환 함수  (0) 2014.12.02
Hive 의 Serde 2  (0) 2014.11.25
Hive 의 Serde 1  (0) 2014.11.25
MapReduce 데이터 흐름 분석  (0) 2014.11.25
Mapper 클래스 구성 및 사용  (0) 2014.11.22



예제파일 : TemplateSerde.java


Serde 구현

 

기존방법

권고방법

비고

차이점

인터페이스를 상속하여 구현(implements)

곧 Deprecated 예정 

추상화 클래스를 상속하여 구현(extends)

인터페이스에서 추상화 클래스로 변경됨(클래스 위치: org.apache.hadoop.

hive.serde2)

직렬화/역직렬화

모두 구현시 상속

SerDe 인터페이스 상속

곧 Deprecated 예정 

 

AbstractSerDe 추상화클래스 상속하여 구현(extends)

동일 구현 메소드

Initialize

getSerializedClass

serialize

deserialize

getObjectInspector

getSerDeStats

직렬화만 구현

Serializer 인터페이스 상속

AbstractSerializer 추상화클래스 상속하여 구현(extends)

구현 메소드

Initialize

getSerializedClass

serialize

getSerDeStats

역직렬화만 구현

Deserializer 인터페이스 상속

 

AbstractDeserializer 추상화클래스 상속하여 구현(extends)

 

구현 메소드

Initialize

deserialize

getObjectInspector

getSerDeStats

 

1). 공통구현 메소드

 

- Initialize

 

해당 테이블의 컬럼 개수컬럼의 Type 정보를 얻을 수 있으며기본 구분자는 “,” 를 사용한다. 

 

 

- getSerDeStats

 

row 수 사이즈 등을 저장한 정보를 제공(불필요시에 Null 을 리턴)

 

 

 

2) 직렬화 구현 메소드

 

- getSerializedClass

 

HDFS 에 저장시 사용될 Writable을 상속한 클래스를 정의하여 리턴한다.

 

- serialize

 

테이블이 조회되어 한 row 에 대한 정보가 들어오며 해당 정보를 조합하여 HDFS 에 저장될 Line 을 생성한다.

 

 

 

3) 역직렬화 구현 메소드

 

- getObjectInspector

 

Table 에 저장될 각 컬럼의 타입을 저장하여 리턴한다.

 

- deserialize

 

HDFS 에서 조회된 한 Line 별 정보가 입력되며 해당 값을 필요에 따라 Parsing 하여 테이블에 저장될 구조의 array 를 만들어서 리턴한다.

 

 

 

2실행

 

1) 직렬화

 

※ 직렬화 jar 의 경우 HDFS 에서 사용하기 때문에 경로에 대한 path 또는 HDFS 에서 사용하는 jar 에 포함 시켜야함

 

1-1) 실행

 

INSERT OVERWRITE LOCAL DIRECTORY '저장될 LOCAL 경로’ ROW FORMAT SERDE ‘등록한 직렬화클래스’ SELECT * FROM 테이블명;

 

※ INSERT OVERWRITE DIRECTORY 처럼 LOCAL 을 생략시 HDFS 에 저장됨단 구현한 직렬화 클래스가사용안됨

 

1-2) 확인

 

LOCAL syntax 사용 여부에 따라 로컬 경로 또는 hdfs 의 경로로 해당 파일 조회(000000_0 으로 해당 폴더에 파일이 생성됨)


 

2) 역직렬화

 

2-1) 해당 Serde 을 하이브에 등록한다.

 

Add jar ./파일명.jar

 

2-2) Table 을 생성한다.

 

Create table IF NOT EXISTS 테이블명 {

 

Id STRING,

 

name STRING

 

..(이와 같은 형태로 테이블의 구조를 생성)

 

}

 

ROW FORMAT SERDE ‘등록할 역직렬화가 구현된 클래스


3) HDFS 의 데이터 로드

 

Syntax> LOAD data <LOCAL> inpath <file path> into table [tablename]

 

LOAD DATA INPATH 'HDFS 상의 데이터 경로' OVERWRITE INTO TABLE 생성한 테이블명;


4) 확인

SELECT * from 생성한 테이블명;




'IT > 빅데이터(bigData)' 카테고리의 다른 글

hive 에서 형변환 함수  (0) 2014.12.02
하이브(hive) udf  (0) 2014.11.25
Hive 의 Serde 1  (0) 2014.11.25
MapReduce 데이터 흐름 분석  (0) 2014.11.25
Mapper 클래스 구성 및 사용  (0) 2014.11.22

Hive  Serde


Serde 란 ?

직렬화와 역직렬화를 뜻하며, HDFS  HIVE 간의 데이터 교환을 뜻한다.

역직렬화(Deserializer)

HDFS files  ▶ InputFileFormat  <key, value>  Deserializer  Row object

HDFS 의 파일을 Hive 테이블에 저장

 

직렬화(Serializer)

Row object  Serializer  <key, value>  OutputFileFormat  HDFS files

Hive 테이블의 정보를 HDFS 의 파일로 저장


'IT > 빅데이터(bigData)' 카테고리의 다른 글

하이브(hive) udf  (0) 2014.11.25
Hive 의 Serde 2  (0) 2014.11.25
MapReduce 데이터 흐름 분석  (0) 2014.11.25
Mapper 클래스 구성 및 사용  (0) 2014.11.22
여러 기능의 맵리듀스를 하나의 jar 로 묶을때  (0) 2014.11.22

 

 

 

 

test.txt 에 아래와 같은 데이터가 있다면

1,2,1,2

B,A,B,A

3,3,2,3

A,B,B,B

 

$ hadoop fs -put test.txt /user/test.txt

를 통해 hdpf 에 저장되어 있다 가정하자

이때 아래와 같이 2개의 datanode 로 데이터가 분산되어 있다면

 

datanode 1의 데이터는

1,2,1,2

B,A,B,A

 

datanode 2의 데이터는

3,3,2,3

A,B,B,B

 

이렇게 분산되어 있다라고 가정하자.

 

 

 

이 데이터의 최빈도(가장 많은 단어 mode)를 획득하려고 한다. 그래서 아래와 같이 구현했다고 가정하자

mapper 에서는 각 데이터를 분할하고

reduce 에서는 각 데이터의 최종합을 구하도록  mapreduce 

 

 

mapper 의 input 은 해당 데이터의 line 별로 입력된다.

보통 입력된 데이터를 필요한 포맷으로 split 하여 사용한다. 여기서는 , 로 split 한다.

 

즉 datanode 1 의 mapper 에서는  

1)   1,2,1,2

2)   B,A,B,A

이러한 데이터로 두번 mapper 의 map 메소드가 호출되고

 

즉 datanode 2 역시

1)   3,3,2,3

2)   A,B,B,B

이러한 데이터로 두번 mapper 의 map 메소드가 호출된다.

 

 이동작을 표로 구분하면 다음과 같다.

 

1. combiner 가 없는 경우

 

동작

데이터노드1

데이터노드 2

HDFS 데이터

1,2,1,2

B,A,B,A

3,3,2,3

A,B,B,B

Mapper 의 map 메소드 입력

1,2,1,2

3,3,2,3

B,A,B,A

A,B,B,B

Mapper 의 map 메소드 출력

(1, NullWritable)

(3, NullWritable)

(2, NullWritable)

(3, NullWritable)

(1, NullWritable)

(2, NullWritable)

(2, NullWritable)

(3, NullWritable)

(B, NullWritable)

(A, NullWritable)

(A, NullWritable)

(B, NullWritable)

(B, NullWritable)

(B, NullWritable)

(A, NullWritable)

(B, NullWritable)

Shuffle(key 로 sorting)

Reducer 

reduce 메소드의 입력

NullWritable => N 으로 기술

(1, [N,N])

(2, [N,N,N])

(3, [N,N,N])

(A, [N,N,N])

(B, [N,N,N,N,N])

Reducer 

reduce 메소드의 출력(최빈도) 

(B, 5)

 

 

 

 2. combiner 를 구현한 경우

combiner 에서는 value 를 각 단어의 개수를 저장함

 

동작

데이터노드1

데이터노드 2

HDFS 데이터

1,2,1,2

B,A,B,A

3,3,2,3

A,B,B,B

Mapper 의 map 메소드 입력

1,2,1,2

3,3,2,3

B,A,B,A

A,B,B,B

Mapper 의 map 메소드 출력

(1, NullWritable)

(3, NullWritable)

(2, NullWritable)

(3, NullWritable)

(1, NullWritable)

(2, NullWritable)

(2, NullWritable)

(3, NullWritable)

(B, NullWritable)

(A, NullWritable)

(A, NullWritable)

(B, NullWritable)

(B, NullWritable)

(B, NullWritable)

(A, NullWritable)

(B, NullWritable)

Combiner 

reduce 메소드의 입력

NullWritable => N 으로 기술

 

(1, [N,N])

(2, [N])

(2, [N,N])

(3, [N,N,N])

(A, [N,N])

(A, [N])

(B, [N,N])

(B, [N,N,N])

Combiner 

reduce 메소드의 출력

 

(1, 2)

(2, 1)

(2, 2)

(3, 3)

(A, 2)

(A, 1)

(B, 2)

(B, 3)

Shuffle(key 로 sorting)

Reducer 

reduce 메소드의 입력

(1, [2])

(2, [2, 1])

(3, [3])

(4, [1])

(A, [2, 1])

(B, [2, 3])

Reducer 

reduce 메소드의 출력 (최빈도) 

(B, 5)

 

'IT > 빅데이터(bigData)' 카테고리의 다른 글

Hive 의 Serde 2  (0) 2014.11.25
Hive 의 Serde 1  (0) 2014.11.25
Mapper 클래스 구성 및 사용  (0) 2014.11.22
여러 기능의 맵리듀스를 하나의 jar 로 묶을때  (0) 2014.11.22
Hadoop 의 Counter 이야기  (0) 2014.11.22

Mapper 와 Reducer (Combiner도 Reducer 를 상속하여 구현한다) 는 job 에 의해

 

내부의 run 메소드가 실행된다.

 

Mapper 의 경우

 public void run(Context context) throws IOException, InterruptedException {

    setup(context);

    try {

      while (context.nextKeyValue()) {

        map(context.getCurrentKey(), context.getCurrentValue(), context);

      }

    } finally {

      cleanup(context);

    }

  }

 

 

위와 같은 로직이 동작하며, mapper 를 상속하여 구현하는 map 이 호출되기 전에 setup 이

완료후에는 cleanup 이 호출되는 것을 확인할 수 있다.

 

setup -> map (해당 datanode 에서 읽은 파일의 라인수 만큼 반복) -> cleanup


즉 이런 식으로 호출된다.

 

즉 initialize 형태의 로직이 필요하다면 setup 을 상속해서

완료이후의 로직이 필요하다면 cleanup을 상속해서 처리 할 수 있다.

'IT > 빅데이터(bigData)' 카테고리의 다른 글

Hive 의 Serde 1  (0) 2014.11.25
MapReduce 데이터 흐름 분석  (0) 2014.11.25
여러 기능의 맵리듀스를 하나의 jar 로 묶을때  (0) 2014.11.22
Hadoop 의 Counter 이야기  (0) 2014.11.22
Reduce 에서 counter 조회  (0) 2014.11.22

 

여러 기능의 맵리듀스를 하나의 jar 로 묶을때 다음과 같이 ProgramDriver 에 해당 class 들을 등록하고

ProgramDriver  에 각 클래스를 등록한 class 를 jar 의 main 클래스로 잡아서 jar 로 압축한다.

 

실행시 해당 jar 에 등록된 alias 로 실행한다.

$ hadoop jar analyserDriver.jar max   < 기타 max 에서 구현된 파라미터 >

 

 public class AnalyserDriver {

 

   public static void main(String argv[]) {

     int exitCode = -1;

     ProgramDriver programDriver = new ProgramDriver();

     try {

          programDriver.addClass("standarddeviation", StandardDeviationDriver.class, 

                "A map/reduce program that calculate the standarddeviation in the input files.");

 programDriver.addClass("mode", ModeDriver.class, 

                "A map/reduce program that calculates the mode in the input files.");

programDriver.addClass("max", Max.class, 

  "A map/reduce program that max the total row in the input files.");

          programDriver.addClass("min", Min.class, 

                "A map/reduce program that min the total row in the input files.");

          programDriver.addClass("median", Median.class,

                "A map/reduce program that median the total row in the input files.");

        exitCode = programDriver.run(argv); 

    } catch (Throwable e) {

          e.printStackTrace();

    }

    System.exit(exitCode);

  }

}

 

 

'IT > 빅데이터(bigData)' 카테고리의 다른 글

MapReduce 데이터 흐름 분석  (0) 2014.11.25
Mapper 클래스 구성 및 사용  (0) 2014.11.22
Hadoop 의 Counter 이야기  (0) 2014.11.22
Reduce 에서 counter 조회  (0) 2014.11.22
hadoop counter 사용  (0) 2014.11.21

기본 카운터의 구현은 이전 포스트에서 다른적이 있다. 

 

hadoop 에서는 이미 counter 를 많이 사용하고 있다.

hadoop 에 jar를 올려 실행하면 실행결과 최종에 나오는 값들이 모두 카운터를 통해 구현된 값이다. 

해당 jar 실행시 아래와 같은 최종 결과가 나온다

counter 는 총 43개 이며 해당 카운터에 mapreduce 프로그램이 동작할 사용한 주요 값들을 확인할 수 있다.

대표적으로 map, combine, reduce 에 input 된 record 수와 output된 record 수를 확인 할 수 있있으며, 실행시간 shuffle 된 byte 수도 확인이 가능한다.

이것을 통해 튜닝포인트를 잡을 수가 있게 된다.

 

기본 실행 결과 예시> combiner 를 구현하지 않은 경우

INFO mapreduce.Job: Counters: 43

        File System Counters

                FILE: Number of bytes read=37834707

                FILE: Number of bytes written=76663746

                FILE: Number of read operations=0

                FILE: Number of large read operations=0

                FILE: Number of write operations=0

                HDFS: Number of bytes read=1397535171

                HDFS: Number of bytes written=21

                HDFS: Number of read operations=36

                HDFS: Number of large read operations=0

                HDFS: Number of write operations=2

        Job Counters

                Launched map tasks=11

                Launched reduce tasks=1

                Data-local map tasks=11

                Total time spent by all maps in occupied slots (ms)=88360

                Total time spent by all reduces in occupied slots (ms)=11532

        Map-Reduce Framework

                Map input records=2007594

                Map output records=2007593

                Map output bytes=33819515

                Map output materialized bytes=37834767

                Input split bytes=1474

                Combine input records=0

                Combine output records=0

                Reduce input groups=4

                Reduce shuffle bytes=37834767

                Reduce input records=2007593

                Reduce output records=1

                Spilled Records=4015186

                Shuffled Maps =11

                Failed Shuffles=0

                Merged Map outputs=11

                GC time elapsed (ms)=1029

                CPU time spent (ms)=26570

                Physical memory (bytes) snapshot=10722308096

                Virtual memory (bytes) snapshot=23048646656

                Total committed heap usage (bytes)=12089032704

        Shuffle Errors

                BAD_ID=0

                CONNECTION=0

                IO_ERROR=0

                WRONG_LENGTH=0

                WRONG_MAP=0

                WRONG_REDUCE=0

        File Input Format Counters

                Bytes Read=1397533697

        File Output Format Counters

                Bytes Written=21

위의 결과를 보면 combiner 가 구현되지 않았기 때문에 combine input records 와 output records 가 0 이며,

reduce 에서 shuffle 된 bytes 와 records 수가 매우 높은 것을 알 수 있다.

그에 따라 reduce 에서 해당 값을 처리하기 위해 11532 ms 를 소모했다.

 

아래 결과는 combiner 를 구현한 경우이다.

INFO mapreduce.Job: Counters: 43

        File System Counters

                FILE: Number of bytes read=908

                FILE: Number of bytes written=998468

                FILE: Number of read operations=0

                FILE: Number of large read operations=0

                FILE: Number of write operations=0

                HDFS: Number of bytes read=1397535171

                HDFS: Number of bytes written=21

                HDFS: Number of read operations=36

                HDFS: Number of large read operations=0

                HDFS: Number of write operations=2

        Job Counters

                Launched map tasks=11

                Launched reduce tasks=1

                Data-local map tasks=11

                Total time spent by all maps in occupied slots (ms)=86586

                Total time spent by all reduces in occupied slots (ms)=5220

        Map-Reduce Framework

                Map input records=2007594

                Map output records=2007593

                Map output bytes=33819515

                Map output materialized bytes=968

                Input split bytes=1474

                Combine input records=2007593

                Combine output records=44

                Reduce input groups=4

                Reduce shuffle bytes=968

                Reduce input records=44

                Reduce output records=1

                Spilled Records=88

                Shuffled Maps =11

                Failed Shuffles=0

                Merged Map outputs=11

                GC time elapsed (ms)=1057

                CPU time spent (ms)=24930

                Physical memory (bytes) snapshot=10712588288

                Virtual memory (bytes) snapshot=23029968896

                Total committed heap usage (bytes)=12059148288

        Shuffle Errors

                BAD_ID=0

                CONNECTION=0

                IO_ERROR=0

                WRONG_LENGTH=0

                WRONG_MAP=0

                WRONG_REDUCE=0

        File Input Format Counters

                Bytes Read=1397533697

        File Output Format Counters

                Bytes Written=21

첫번째의 결과와 다른 것은 combiner (각 datanode 에서 동작하여 분산처리됨)의 구현을 통해 reduce 에서 취합되는 데이터의 양을 최소화 시켰다. 즉 combine output 은 44 records 로 줄였으며, 그로 인해 reduce 의 input shuffle 등의 값이 현저하게 주는 것을 확인할 수 있다.

결과를 보면 maps 의 소모시간은 조금 증가하였지만, reduce 의 소모시간은 반으로 준 것을 확인할 수 있다. 

이와 같이 counter 는 개발자의 데이터 공유뿐만 아니라 mapreduce 동작의 기본값을 확인 할 수 있기 때문에 활용도가 매우 높다.

이런 기본 counter 값도 reduce 동작중 획득하여 사용할 수 있다.

mapreduce 프로그래밍을 하면 counter가 필요한 곳은 대부분 reducer 이다

counter 는 job 안에 존재하며,

reducer의 context 가 가진 counter 를 조회한다면 그 값은 mapper 들에서 취합된 값이 아니므로 항상 0 이 나온다

 

따라서, reducer 를 동작중인 job 을 획득하여 counter 를 조회하여야 한다.

보통 reduce 메소드가 돌기전에 획득해 놓고 사용하기 때문에 setup 에서 구현한다.

 

 long totalVal = 0;


 @Override

 public void setup(Context context) throws IOException, InterruptedException{

   Configuration conf = context.getConfiguration();

   Cluster cluster = new Cluster(conf);

  Job currentJob = cluster.getJob(context.getJobID());

  totalCount = currentJob.getCounters().findCounter(MATCH_COUNTER.TOTAL_COUNT).getValue();  }

 

mapper 는 기본적으로 자신의 데이터노드에서 동작하기 때문에 여러 데이터노드의 경우 서로의 값의 공유할 수가 없다.

그에 때문에 hadoop 에서는 counter 를 통해 서로의 정보를 업데이트하여 reducer 에게 전달할 수 있다.

(단 long에 대한 증가/감소만 가능)

 

카운터의 등록은 enum 으로 등록하여야 하며, 필요한 enum 을 개발자는 생성해야 한다.

 

 아래와 같은 enum 을 생성했다며

 

 public static enum MATCH_COUNTER {

  TOTAL_COUNT

 };

 

  mapper 에서는 아래와 같은 로직을 통해 특정 값을 증가 시킬 수 있다.

  @Override

  public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

    StringTokenizer itr = new StringTokenizer(value.toString());

        while (itr.hasMoreTokens()) {

   word.set(itr.nextToken());

             context.write(word, one);

             context.getCounter(MATCH_COUNTER.TOTAL_COUNT).increment(1); 

             // counter 증가

       }

  }

 

 

이렇게 증가된 값은 아래와 같이 사용할 수 있다.

 public static void main(String[] args) throws Exception {

    Configuration conf = new Configuration();

    Job job = Job.getInstance(conf, "word count");

    .... 중략

    job.waitForCompletion(true)

    long totalCnt = job.getCounters().findCounter(MATCH_COUNTER.TOTAL_COUNT)).getValue();  

     ... 중략

  }

 

 

 

 namenode 1  :  outdata { "a","b","c"} 3개

 namenode 2  :  outdata { "d,"f"} 2개

 

counter 의 값은 5가 된다.

+ Recent posts