SparkSession
세션 초기화
import org.apache.spark.sql.SparkSession
val ss = SparkSession.builder()
.appName("Spark App Name")
.config("spark.some.config.option", "some-value")
.getOrCreate()
- spark-shell을 이용할 경우 REPL쉘이 스파크 세션 객체를 초기화 해준다.
Hive Metasore Connect
- 사파크 세션은 단독으로 사용할 수 있지만, Hive Metasore와 연결하여 사용할 수도 있다. 이때는 세션 생성시에 "hive.metastore.uris" 값을 설정하면 된다.
val ss = SparkSession.builder()
.appName("Spark App Name")
.config("hive.metastore.uris", "thrift://hive_metastore_ip:port")
.enableHiveSupport()
.getOrCreate()
ss.sql("show databases").show()
DataFrame
데이터 프레임 초기화
- 데이터 프레임은 SparkSession의 read 메소드로 생성할 수 있다.
- read는 json, parquet, orc, text 등 다양한 형식의 데이터를 읽을 수 있다.
- RDD를 이용한 데이터 프레임 초기화
- RDD를 이용해서 데이터 프레임을 생성할 경우 스키마 구조를 지정할 수도 있고, 지정하지 않으면 임시 칼럼명이 지정된다.
- 외부 데이터를 통한 데이터 프레임 초기화
- json 형태의 파일은 구조를 가지고 있기 때문에 자동으로 스키마를 생성한다.
import org.apache.spark.sql._
import org.apache.spark.sql.types._
val conf = new SparkConf().setAppName("sample").setMaster("yarn")
val sc = new SparkContext(conf)
val wordsRDD = sc.parallelize(Array("a", "b", "c", "d", "a", "a", "b", "b", "c", "d", "d", "d", "d"))
val wordsDF = wordsRDD.toDF("alphabet")
val peopleRDD = sc.parallelize(
Seq( ("David", 150),
("White", 200),
("Paul", 170) )
)
val peopleDF = peopleRDD.toDF("name", "height")
val peopleSchema = new StructType()
.add(StructField("name", StringType, true))
.add(StructField("height", IntegerType, true))
peopleDF = spark.createDataFrame(peopleRDD, peopleSchema)
peopleDF = spark.read.json("data_path"+"/test.json")
peopleRDD = sc.textFile("data_path"+"/people.txt")
val sepPeopleRDD = peopleRDD.map(line => line.split(',')).map(x => Row(x(0), x(1).trim.toInt))
peopleDF = spark.createDataFrame(sepPeopleRDD, peopleSchema)
데이터 프레임 연산
- 데이터 프레임은 구조화된 형태의 데이터를 명령어 체인을 이용하여 연산 할 수 있고, SQL을 이용할 수도 있다.
df.printSchema()
df.select("column").show()
df.select($"name", $"age" + 1).show()
def show(numRow: Int, truncate: Boolean):
unit = println(showString(numRows, truncate))
show(10, false)
show(100, true)
df.filter($"age" > 21).show()
df.select($"name", $"age").filter($"age" > 20).show()
df.groupBy("age").count().show()
df.withColumn("isAdult", when($"age" > 19, True).otherwise(False)).show()
df.createOrReplaceTempView("people")
spark.sql("SELECT * FROM people WHERE isAdult = True").show()
spark.sql("SELECT isAdult, count(isAdult) FROM people GROUP BY isAdult).show()
Dataset
- 데이터셋은 RDD와 유사하지만 객체를 직렬화 할 때 Java Serialization이나 kyro를 사용하지 않고, Spark의 Encoder를 이용하여 RDD 보다 속도가 빠르다.
데이터셋 초기화
val seq = Seq(
("David", 150),
("White", 200),
("Paul", 170)
)
val peopleDS = seq.toDS()
peopleDS.select("_1").show()
case class People(name: String, salary: Int)
seq = Seq(
People("David", 150),
People("White", 200),
People("Paul", 170)
)
peopleDS = seq.toDS()
import org.apache.spark.sql._
import org.apache.spark.sql.types._
val peopleRDD = sc.textFile("people.txt")
val peopleSchema = new StructType().add(StructField("name", StringType, true)).add(StructField("age", IntegerType, true))
val sepPeopleRdd = peopleRDD.map(line => line.split(",")).map(x => Row(x(0), x(1).trim.toInt))
val peopleDF = spark.createDataFrame(sepPeopleRdd, peopleSchema)
case class People(name: String, age: Long)
peopleDS = peopleDF.as[People]
데이터셋 연산
case class People(name: String, salary: Int)
seq = Seq(
People("David", 150),
People("White", 200),
People("Paul", 170)
)
peopleDS = seq.toDS()
peopleDS.printSchema()
peopleDS.show()
peopleDS.select("name").show()
peopleDS.select($"name", $"salary" + 1).show()
peopleDS.filter("salary is not null").show()
peopleDS.select($"name", $"salary").filter($"salary" > 170).show()
peopleDS.filter("salary is not null").groupBy("salary").count().show()
peopleDS.filter("salary is not null").map(p => p.name + ' / ' + p.salary).show()
peopleDS.createOrReplaceTempView("people")
spark.sql(
"SELECT name, count(*) AS cnt
FROM people
WHERE age IS NOT NULL
GROUP BY name"
).show()
저장 / 불러오기
데이터 저장
val peopleDF = spark.read.json("/user/people.json")
case class People(name: String, age: Long)
val peopleDS = peopleDF.as[People]
peopleDS.write.save("/user/ds")
peopleDF.write.save("/user/df")
peopleDS.select("name").write.format("json").save("/user/ds_1")
peopleDS.select("name").write.format("csv").save("/user/ds_1")
peopleDS.select("name").write.format("json").option("compression", "snappy").save("/user/ds_1")
peopleDS.select("name").write.format("json").option("compression", "gzip").save("/user/ds_1")
peopleDF.select("name", "age").write.saveAsTable("people")
peopleDF.write.bucketBy(42, "name")
.sortBy("age")
.saveAsTable("people_bucketed")
peopleDF.write.partitionBy("age")
.format("json")
.save("/user/people/")
peopleDF.write.partitionBy("age")
.bucketBy(42, "name")
.saveAsTable("people_p_b")
저장 모드 (SaveMode)
설정 |
비고 |
SaveMode.ErrorIfExists |
파일이 있으면 에러 처리 |
SaveMode.Append |
다른 이름으로 파일 추가 |
SaveMode.Overwrite |
기존 파일을 삭제하고 추가 |
SaveMode.Ignore |
파일이 있으면 저장하지 않고, 에러 처리도 하지 않음 |
import org.apache.spark.sql._
val peopleDF = spark.read.json("/user/people.json")
peopleDF.select("name", "age").write.mode(SaveMode.Overwrite).save("/user/people/")
데이터 불러오기
val peopleDF = spark.read.format("json").load("/user/ds_1/")
peopleDF = spark.read.json("/user/ds_1/")
peopleDF = spark.read.format("csv")
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true").load("/user/people.csv")