AI
Spark : RDD
JungGwig
2022. 7. 27. 16:48
RDD
- 외부 데이터를 읽어서 처리하거나, 자체적으로 컬렉션 데이터를 생성하여 처리
- 데이터 처리는 파티션 단위로 분리하여 작업을 처리
- 연산
- Transformation : 필터링 같은 작업으로 RDD → RDD
- Action : RDD로 작업을 처리하여 결과를 드라이버에 반환하거나, FS에 쓰는 연산
- 스파크는 지연 처리를 지원하여 Transformation을 호출할 때는 작업을 처리하지 않고, Action을 호출하는 시점에 작업을 처리하여 작업의 효율성을 제공
- RDD는 액션이 실행될 때마다 새로운 연산을 처리. 작업의 처리 결과를 재사용하고 싶으면 persist() 메소드를 사용하여 결과를 메모리에 유지
- RDD는 스파크 컨텍스트 객체를 이용하여 생성할 수 있음.
SparkContext Init
- SparkConf 객체를 이용해서 설정값을 생성하고, 이를 이용해서 초기화할 수 있다.
- Spark-shell의 경우 REPL쉘이 스파크 컨텍스트 객체를 생성한다.
RDD Init
내부 데이터 이용
- SparkContext의 parallelize() 메소드를 이용하여 처리
- 생성한 객체는 RDD 타입이고, 해당 객체는 map(), reduce(), filter() 등의 RDD 연산을 이용하여 처리 가능
외부 데이터 이용
- SparkContext의 textFile() 메소드를 이용하여 처리
RDD 연산
Transformations
함수 | 설명 |
map(func) | func로 처리된 새로운 데이터셋 반환 |
filter(func) | func에서 true를 반환한 값으로 필터링 |
flatMap(func) | func는 배열 혹은 Seq를 반환하고, 하나의 배열로 반환 |
distinct([numPartitions]) | 데이터셋의 중복 제거 |
groupByKey([numPartitions]) | 키를 기준으로 그룹핑. (K,V) → (K, Iterable) |
reduceByKey(func, [numPartitions]) | 키를 기준으로 주어진 func로 처리된 결과를 (K,V)로 반환 |
sortByKey([ascending], [numPartitions]) | 키를 기준으로 정렬 |
Actions
함수 | 설명 |
reduce(func) | func를 이용하여 집계(두 개의 인수를 받아서 하나를 반환). 병렬처리 가능 필수 |
collect() | 처리 결과를 배열로 반환. 필터링 등 작은 데이터 집합을 반환하는데 유용 |
count() | 데이터셋의 개수 반환 |
first() | 데이터셋의 첫번째 아이템 반환 |
take(n) | 테이터셋의 첫번째부터 n개의 배열 반환 |
saveAsTextFile(path) | 데이터셋을 텍스트 파일로 지정한 위치에 저장 |
countByKey() | 키를 기준으로 카운트 반환 |
foreach(func) | 데이터셋의 각 엘리먼트를 func로 처리. 보통 Accmulator와 함께 사용 |
추가 Action : https://spark.apache.org/docs/2.0.2/api/scala/index.html#org.apache.spark.rdd.RDD |
함수 전달
- RDD 연산을 처리할 때 매번 구현하지 않고, 함수로 구현하여 작업을 처리할 수 있다.
- 함수를 전달 할 때는 외부의 변수를 이용하지 않는 순수 함수를 이용하는 것을 권장한다.
캐쉬 이용
- RDD는 처리 결과를 메모리나 디스크에 저장하고 다음 계산에 이용할 수 있습니다. 반복잡업의 경우 이 캐쉬를 이용해서 처리 속도를 높일 수 있습니다. 단일 작업의 경우 오버헤드가 발생하여 오히려 처리 시간이 느려질 수 있다.
- persist(), cache() 메소드를 이용하여 캐쉬를 이용할 수 있다. 캐싱한 데이터에 문제가 생기면 자동으로 복구한다.
저장 방법 | |
설정 | 설명 |
MEMORY_ONLY (default) | RDD를 메모리상에 저장. 메모리보다 용량이 크면 일부만 저장하고 필요할 때마다 계산 |
MEMORY_AND_DISK | RDD를 메모리상에 저장. 메모리보다 용량이 크면 일부는 메모리, 일부는 디스크에 저장 |
DISK_ONLY | RDD를 디스크에 저장 |
Accumulator
- 스파크는 PC에서 단독으로 처리되는 것이 아니라 클러스터에서 처리하기 때문에 Closure를 이용하면 결과가 달라질 수 있다.
- foreach()와 같은 반복문 매소드 내에서 로컬 변수를 사용한다면 클러스터 모드의 경우 각 노드에 로컬 변수가 존재하고 해당 변수 값을 이용하기 때문에 원하는 결과가 나오지 않을 수 있다. 이때, 스파크에서는 이를 위해 맵리듀스의 카운터와 유사한 역할을 하는 Accumulator를 이용하여 모든 노드가 공유할 수 있는 변수를 제공한다.
- Accumulator는 SparkContext를 이용해서 생성한다. ex) sc.longAccumulator()
Broadcast
- 맵리듀스의 Distribute Cache와 유사한 역할을 하며 모든 노드에서 공유되는 읽기 전용 값
- 조인에 이용되는 값들을 선언하여 이용할 수 있다.
Shuffle
- 스파크에서 조인, 정렬 작업은 Shuffle 작업을 실행
- 셔플은 파티션간에 그룹화된 데이터를 배포하는 매커니즘
- 임시 파일의 복사, 이동이 있기 때문에 많이 비용이 든다.