AI

Spark : RDD

JungGwig 2022. 7. 27. 16:48

RDD

  • 외부 데이터를 읽어서 처리하거나, 자체적으로 컬렉션 데이터를 생성하여 처리
  • 데이터 처리는 파티션 단위로 분리하여 작업을 처리
  • 연산
    • Transformation : 필터링 같은 작업으로 RDD → RDD
    • Action : RDD로 작업을 처리하여 결과를 드라이버에 반환하거나, FS에 쓰는 연산
    • 스파크는 지연 처리를 지원하여 Transformation을 호출할 때는 작업을 처리하지 않고, Action을 호출하는 시점에 작업을 처리하여 작업의 효율성을 제공
  • RDD는 액션이 실행될 때마다 새로운 연산을 처리. 작업의 처리 결과를 재사용하고 싶으면 persist() 메소드를 사용하여 결과를 메모리에 유지
  • RDD는 스파크 컨텍스트 객체를 이용하여 생성할 수 있음.

 


SparkContext Init

  • SparkConf 객체를 이용해서 설정값을 생성하고, 이를 이용해서 초기화할 수 있다.
  • Spark-shell의 경우 REPL쉘이 스파크 컨텍스트 객체를 생성한다.

 


RDD Init

내부 데이터 이용

  • SparkContext의 parallelize() 메소드를 이용하여 처리
  • 생성한 객체는 RDD 타입이고, 해당 객체는 map(), reduce(), filter() 등의 RDD 연산을 이용하여 처리 가능

외부 데이터 이용

  • SparkContext의 textFile() 메소드를 이용하여 처리

 


RDD 연산

Transformations

함수 설명
map(func) func로 처리된 새로운 데이터셋 반환
filter(func) func에서 true를 반환한 값으로 필터링
flatMap(func) func는 배열 혹은 Seq를 반환하고, 하나의 배열로 반환
distinct([numPartitions]) 데이터셋의 중복 제거
groupByKey([numPartitions]) 키를 기준으로 그룹핑. (K,V) → (K, Iterable)
reduceByKey(func, [numPartitions]) 키를 기준으로 주어진 func로 처리된 결과를 (K,V)로 반환
sortByKey([ascending], [numPartitions]) 키를 기준으로 정렬

Actions

함수 설명
reduce(func) func를 이용하여 집계(두 개의 인수를 받아서 하나를 반환). 병렬처리 가능 필수
collect() 처리 결과를 배열로 반환. 필터링 등 작은 데이터 집합을 반환하는데 유용
count() 데이터셋의 개수 반환
first() 데이터셋의 첫번째 아이템 반환
take(n) 테이터셋의 첫번째부터 n개의 배열 반환
saveAsTextFile(path) 데이터셋을 텍스트 파일로 지정한 위치에 저장
countByKey() 키를 기준으로 카운트 반환
foreach(func) 데이터셋의 각 엘리먼트를 func로 처리. 보통 Accmulator와 함께 사용
추가 Action : https://spark.apache.org/docs/2.0.2/api/scala/index.html#org.apache.spark.rdd.RDD

함수 전달

  • RDD 연산을 처리할 때 매번 구현하지 않고, 함수로 구현하여 작업을 처리할 수 있다.
  • 함수를 전달 할 때는 외부의 변수를 이용하지 않는 순수 함수를 이용하는 것을 권장한다.

캐쉬 이용

  • RDD는 처리 결과를 메모리나 디스크에 저장하고 다음 계산에 이용할 수 있습니다. 반복잡업의 경우 이 캐쉬를 이용해서 처리 속도를 높일 수 있습니다. 단일 작업의 경우 오버헤드가 발생하여 오히려 처리 시간이 느려질 수 있다.
  • persist(), cache() 메소드를 이용하여 캐쉬를 이용할 수 있다. 캐싱한 데이터에 문제가 생기면 자동으로 복구한다. 
저장 방법
설정 설명
MEMORY_ONLY (default) RDD를 메모리상에 저장. 메모리보다 용량이 크면 일부만 저장하고 필요할 때마다 계산
MEMORY_AND_DISK RDD를 메모리상에 저장. 메모리보다 용량이 크면 일부는 메모리, 일부는 디스크에 저장
DISK_ONLY RDD를 디스크에 저장

Accumulator

  • 스파크는 PC에서 단독으로 처리되는 것이 아니라 클러스터에서 처리하기 때문에 Closure를 이용하면 결과가 달라질 수 있다.
  • foreach()와 같은 반복문 매소드 내에서 로컬 변수를 사용한다면 클러스터 모드의 경우 각 노드에 로컬 변수가 존재하고 해당 변수 값을 이용하기 때문에 원하는 결과가 나오지 않을 수 있다. 이때, 스파크에서는 이를 위해 맵리듀스의 카운터와 유사한 역할을 하는 Accumulator를 이용하여 모든 노드가 공유할 수 있는 변수를 제공한다.
  • Accumulator는 SparkContext를 이용해서 생성한다. ex) sc.longAccumulator()

Broadcast

  • 맵리듀스의 Distribute Cache와 유사한 역할을 하며 모든 노드에서 공유되는 읽기 전용 값
  • 조인에 이용되는 값들을 선언하여 이용할 수 있다.

Shuffle

  • 스파크에서 조인, 정렬 작업은 Shuffle 작업을 실행
  • 셔플은 파티션간에 그룹화된 데이터를 배포하는 매커니즘
  • 임시 파일의 복사, 이동이 있기 때문에 많이 비용이 든다.

 

참고 : https://wikidocs.net/book/2350