AI
Spark : Dataset / DataFrame
JungGwig
2022. 7. 29. 16:27
SparkSession
세션 초기화
import org.apache.spark.sql.SparkSession
val ss = SparkSession.builder()
.appName("Spark App Name")
.config("spark.some.config.option", "some-value")
.getOrCreate()
- spark-shell을 이용할 경우 REPL쉘이 스파크 세션 객체를 초기화 해준다.
Hive Metasore Connect
- 사파크 세션은 단독으로 사용할 수 있지만, Hive Metasore와 연결하여 사용할 수도 있다. 이때는 세션 생성시에 "hive.metastore.uris" 값을 설정하면 된다.
val ss = SparkSession.builder()
.appName("Spark App Name")
.config("hive.metastore.uris", "thrift://hive_metastore_ip:port")
.enableHiveSupport()
.getOrCreate()
ss.sql("show databases").show()
DataFrame
데이터 프레임 초기화
- 데이터 프레임은 SparkSession의 read 메소드로 생성할 수 있다.
- read는 json, parquet, orc, text 등 다양한 형식의 데이터를 읽을 수 있다.
- RDD를 이용한 데이터 프레임 초기화
- RDD를 이용해서 데이터 프레임을 생성할 경우 스키마 구조를 지정할 수도 있고, 지정하지 않으면 임시 칼럼명이 지정된다.
- 외부 데이터를 통한 데이터 프레임 초기화
- json 형태의 파일은 구조를 가지고 있기 때문에 자동으로 스키마를 생성한다.
import org.apache.spark.sql._
import org.apache.spark.sql.types._
val conf = new SparkConf().setAppName("sample").setMaster("yarn")
val sc = new SparkContext(conf)
// 배열 RDD --> DF
val wordsRDD = sc.parallelize(Array("a", "b", "c", "d", "a", "a", "b", "b", "c", "d", "d", "d", "d"))
val wordsDF = wordsRDD.toDF("alphabet") // 컬럼명을 'alphabet'으로 지정
// 복합구조의 RDD --> DF
val peopleRDD = sc.parallelize(
Seq( ("David", 150),
("White", 200),
("Paul", 170) )
)
val peopleDF = peopleRDD.toDF("name", "height")
// 스미카를 생성하여 DF init
val peopleSchema = new StructType()
.add(StructField("name", StringType, true))
.add(StructField("height", IntegerType, true))
// StructField(Column Name, Data Type, Nullable, Metadata[Optional])
peopleDF = spark.createDataFrame(peopleRDD, peopleSchema)
// Json File --> DF
peopleDF = spark.read.json("data_path"+"/test.json")
// Text File --> DF
// people.txt
// A, 29
// B, 30
// C, 19
peopleRDD = sc.textFile("data_path"+"/people.txt")
val sepPeopleRDD = peopleRDD.map(line => line.split(',')).map(x => Row(x(0), x(1).trim.toInt))
peopleDF = spark.createDataFrame(sepPeopleRDD, peopleSchema)
데이터 프레임 연산
- 데이터 프레임은 구조화된 형태의 데이터를 명령어 체인을 이용하여 연산 할 수 있고, SQL을 이용할 수도 있다.
// DataFrame 스키마 조회
df.printSchema()
// DataFrame 조회
// 컬럼 데이터에 대한 연산을 하고 싶을때는 '$' 기호를 이용한다.
df.select("column").show()
df.select($"name", $"age" + 1).show() // name, age 순으로 age에 값을 1 더하여 조회
// show 함수 설정 [show() 함수로 DF 출력 시 데이터의 길이와 컬럼 사이즈를 제한]
def show(numRow: Int, truncate: Boolean):
unit = println(showString(numRows, truncate))
show(10, false)
show(100, true)
// DataFrame 필터링
df.filter($"age" > 21).show()
df.select($"name", $"age").filter($"age" > 20).show()
// DataFrame groupBy
df.groupBy("age").count().show()
// DataFrame 컬럼 추가
df.withColumn("isAdult", when($"age" > 19, True).otherwise(False)).show()
// SQL 사용한 DataFrame 연산
// DataFrame --> View
df.createOrReplaceTempView("people")
spark.sql("SELECT * FROM people WHERE isAdult = True").show()
spark.sql("SELECT isAdult, count(isAdult) FROM people GROUP BY isAdult).show()
Dataset
- 데이터셋은 RDD와 유사하지만 객체를 직렬화 할 때 Java Serialization이나 kyro를 사용하지 않고, Spark의 Encoder를 이용하여 RDD 보다 속도가 빠르다.
데이터셋 초기화
// 내부 데이터를 이용한 초기화
val seq = Seq(
("David", 150),
("White", 200),
("Paul", 170)
)
val peopleDS = seq.toDS() // Default Column = _# --> peopleDS.columns = [_1, _2]
peopleDS.select("_1").show()
// Case Class를 이용한 초기화
case class People(name: String, salary: Int)
seq = Seq(
People("David", 150),
People("White", 200),
People("Paul", 170)
)
peopleDS = seq.toDS() // peopleDS.columns = [name, salary]
// RDD --> Dataset
// RDD를 Dataset으로 변환하기 위해서는 RDD --> DataFrame --> Dataset 과정을 거쳐야한다.
import org.apache.spark.sql._
import org.apache.spark.sql.types._
val peopleRDD = sc.textFile("people.txt")
val peopleSchema = new StructType().add(StructField("name", StringType, true)).add(StructField("age", IntegerType, true))
val sepPeopleRdd = peopleRDD.map(line => line.split(",")).map(x => Row(x(0), x(1).trim.toInt))
val peopleDF = spark.createDataFrame(sepPeopleRdd, peopleSchema)
case class People(name: String, age: Long)
peopleDS = peopleDF.as[People] // DataFrame --> Dataset
데이터셋 연산
case class People(name: String, salary: Int)
seq = Seq(
People("David", 150),
People("White", 200),
People("Paul", 170)
)
peopleDS = seq.toDS()
// Dataset 스키마 출력
peopleDS.printSchema()
peopleDS.show()
// Dataset 조회 [ Dataframe과 동일 ]
peopleDS.select("name").show()
peopleDS.select($"name", $"salary" + 1).show()
// Dataset 필터링
peopleDS.filter("salary is not null").show()
peopleDS.select($"name", $"salary").filter($"salary" > 170).show()
// Groupby
peopleDS.filter("salary is not null").groupBy("salary").count().show()
// map 연산
peopleDS.filter("salary is not null").map(p => p.name + ' / ' + p.salary).show()
// SQL을 이용한 데이터 조회
peopleDS.createOrReplaceTempView("people")
spark.sql(
"SELECT name, count(*) AS cnt
FROM people
WHERE age IS NOT NULL
GROUP BY name"
).show()
저장 / 불러오기
데이터 저장
val peopleDF = spark.read.json("/user/people.json")
case class People(name: String, age: Long)
val peopleDS = peopleDF.as[People]
peopleDS.write.save("/user/ds")
peopleDF.write.save("/user/df")
peopleDS.select("name").write.format("json").save("/user/ds_1") // Save to json format
peopleDS.select("name").write.format("csv").save("/user/ds_1") // Save to csv format
peopleDS.select("name").write.format("json").option("compression", "snappy").save("/user/ds_1")
peopleDS.select("name").write.format("json").option("compression", "gzip").save("/user/ds_1")
// Hive Metastore에 연결되어 있다면 Metastore의 정보를 이용해서 저장
peopleDF.select("name", "age").write.saveAsTable("people")
// 버켓팅
peopleDF.write.bucketBy(42, "name")
.sortBy("age")
.saveAsTable("people_bucketed")
// 파티셔닝
peopleDF.write.partitionBy("age")
.format("json")
.save("/user/people/")
// 파티셔닝, 버켓팅
peopleDF.write.partitionBy("age")
.bucketBy(42, "name")
.saveAsTable("people_p_b")
// 버켓팅, 파티셔닝의 경우 Hive에서 등장하는 개념
저장 모드 (SaveMode)
설정 | 비고 |
SaveMode.ErrorIfExists | 파일이 있으면 에러 처리 |
SaveMode.Append | 다른 이름으로 파일 추가 |
SaveMode.Overwrite | 기존 파일을 삭제하고 추가 |
SaveMode.Ignore | 파일이 있으면 저장하지 않고, 에러 처리도 하지 않음 |
import org.apache.spark.sql._
val peopleDF = spark.read.json("/user/people.json")
peopleDF.select("name", "age").write.mode(SaveMode.Overwrite).save("/user/people/")
데이터 불러오기
val peopleDF = spark.read.format("json").load("/user/ds_1/")
peopleDF = spark.read.json("/user/ds_1/")
peopleDF = spark.read.format("csv")
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true").load("/user/people.csv")