ABOUT ME

Today
Yesterday
Total
  • Spark : Dataset / DataFrame
    AI 2022. 7. 29. 16:27

    SparkSession

     

    세션 초기화

    import org.apache.spark.sql.SparkSession
    
    val ss = SparkSession.builder()
                .appName("Spark App Name")
                .config("spark.some.config.option", "some-value")
                .getOrCreate()
    • spark-shell을 이용할 경우 REPL쉘이 스파크 세션 객체를 초기화 해준다. 

     


    Hive Metasore Connect

    • 사파크 세션은 단독으로 사용할 수 있지만, Hive Metasore와 연결하여 사용할 수도 있다. 이때는 세션 생성시에 "hive.metastore.uris" 값을 설정하면 된다.
    val ss = SparkSession.builder()
            .appName("Spark App Name")
            .config("hive.metastore.uris", "thrift://hive_metastore_ip:port")
            .enableHiveSupport()
            .getOrCreate()
            
    ss.sql("show databases").show()

     


     

    DataFrame

     

    데이터 프레임 초기화

    • 데이터 프레임은 SparkSession의 read 메소드로 생성할 수 있다.
    • read는 json, parquet, orc, text 등 다양한 형식의 데이터를 읽을 수 있다.
    • RDD를 이용한 데이터 프레임 초기화
      • RDD를 이용해서 데이터 프레임을 생성할 경우 스키마 구조를 지정할 수도 있고, 지정하지 않으면 임시 칼럼명이 지정된다.
    • 외부 데이터를 통한 데이터 프레임 초기화
      • json 형태의 파일은 구조를 가지고 있기 때문에 자동으로 스키마를 생성한다.
    import org.apache.spark.sql._
    import org.apache.spark.sql.types._
    
    val conf = new SparkConf().setAppName("sample").setMaster("yarn")
    val sc = new SparkContext(conf)
    
    // 배열 RDD --> DF
    val wordsRDD = sc.parallelize(Array("a", "b", "c", "d", "a", "a", "b", "b", "c", "d", "d", "d", "d"))
    val wordsDF = wordsRDD.toDF("alphabet") // 컬럼명을 'alphabet'으로 지정
    
    
    // 복합구조의 RDD --> DF
    val peopleRDD = sc.parallelize(
      Seq( ("David", 150),
           ("White", 200),
           ("Paul",  170) )
    )
    
    val peopleDF = peopleRDD.toDF("name", "height")
    
    
    // 스미카를 생성하여 DF init
    val peopleSchema = new StructType()
                            .add(StructField("name", StringType, true))
                            .add(StructField("height", IntegerType, true))
                            // StructField(Column Name, Data Type, Nullable, Metadata[Optional])
                
    peopleDF = spark.createDataFrame(peopleRDD, peopleSchema)
    
    
    // Json File --> DF
    peopleDF = spark.read.json("data_path"+"/test.json")
    
    
    // Text File --> DF
    // people.txt
    // A, 29
    // B, 30
    // C, 19
    
    peopleRDD = sc.textFile("data_path"+"/people.txt")
    val sepPeopleRDD = peopleRDD.map(line => line.split(',')).map(x => Row(x(0), x(1).trim.toInt))
    peopleDF = spark.createDataFrame(sepPeopleRDD, peopleSchema)

     

    데이터 프레임 연산

    • 데이터 프레임은 구조화된 형태의 데이터를 명령어 체인을 이용하여 연산 할  수 있고, SQL을 이용할 수도 있다.
    // DataFrame 스키마 조회
    df.printSchema()
    
    
    // DataFrame 조회
    // 컬럼 데이터에 대한 연산을 하고 싶을때는 '$' 기호를 이용한다.
    df.select("column").show()
    df.select($"name", $"age" + 1).show() // name, age 순으로 age에 값을 1 더하여 조회
    
    
    // show 함수 설정 [show() 함수로 DF 출력 시 데이터의 길이와 컬럼 사이즈를 제한]
    def show(numRow: Int, truncate: Boolean):
    	unit = println(showString(numRows, truncate))
       
    show(10, false)
    show(100, true)
    
    
    // DataFrame 필터링
    df.filter($"age" > 21).show()
    df.select($"name", $"age").filter($"age" > 20).show()
    
    
    // DataFrame groupBy
    df.groupBy("age").count().show()
    
    
    // DataFrame 컬럼 추가
    df.withColumn("isAdult", when($"age" > 19, True).otherwise(False)).show()
    
    
    // SQL 사용한 DataFrame 연산
    
    // DataFrame --> View
    df.createOrReplaceTempView("people")
    
    spark.sql("SELECT * FROM people WHERE isAdult = True").show()
    spark.sql("SELECT isAdult, count(isAdult) FROM people GROUP BY isAdult).show()

     


    Dataset

    • 데이터셋은 RDD와 유사하지만 객체를 직렬화 할 때 Java Serialization이나 kyro를 사용하지 않고, Spark의 Encoder를 이용하여 RDD 보다 속도가 빠르다.

     

    데이터셋 초기화

    // 내부 데이터를 이용한 초기화
    
    val seq = Seq(
        ("David", 150),
        ("White", 200),
        ("Paul", 170)
    )
    
    val peopleDS = seq.toDS() // Default Column = _# --> peopleDS.columns = [_1, _2]
    peopleDS.select("_1").show()
    
    
    // Case Class를 이용한 초기화
    case class People(name: String, salary: Int)
    seq = Seq(
        People("David", 150),
        People("White", 200),
        People("Paul", 170)
    )
    peopleDS = seq.toDS() // peopleDS.columns = [name, salary]
    
    
    // RDD --> Dataset
    // RDD를 Dataset으로 변환하기 위해서는 RDD --> DataFrame --> Dataset 과정을 거쳐야한다.
    import org.apache.spark.sql._
    import org.apache.spark.sql.types._
    
    val peopleRDD = sc.textFile("people.txt")
    val peopleSchema = new StructType().add(StructField("name",   StringType, true)).add(StructField("age", IntegerType, true))
    val sepPeopleRdd = peopleRDD.map(line => line.split(",")).map(x => Row(x(0), x(1).trim.toInt))
    val peopleDF = spark.createDataFrame(sepPeopleRdd, peopleSchema)
    
    case class People(name: String, age: Long)
    peopleDS = peopleDF.as[People] // DataFrame --> Dataset

     

    데이터셋 연산

    case class People(name: String, salary: Int)
    seq = Seq(
        People("David", 150),
        People("White", 200),
        People("Paul", 170)
    )
    peopleDS = seq.toDS()
    
    
    // Dataset 스키마 출력
    peopleDS.printSchema()
    peopleDS.show()
    
    
    // Dataset 조회 [ Dataframe과 동일 ]
    peopleDS.select("name").show()
    peopleDS.select($"name", $"salary" + 1).show()
    
    
    // Dataset 필터링
    peopleDS.filter("salary is not null").show()
    peopleDS.select($"name", $"salary").filter($"salary" > 170).show()
    
    
    // Groupby
    peopleDS.filter("salary is not null").groupBy("salary").count().show()
    
    
    // map 연산
    peopleDS.filter("salary is not null").map(p => p.name + ' / ' + p.salary).show()
    
    
    
    // SQL을 이용한 데이터 조회
    peopleDS.createOrReplaceTempView("people")
    spark.sql(
        "SELECT name, count(*) AS cnt
        FROM people
        WHERE age IS NOT NULL 
        GROUP BY name" 
    ).show()

     


    저장 / 불러오기

    데이터 저장

    val peopleDF = spark.read.json("/user/people.json")
    case class People(name: String, age: Long)
    val peopleDS = peopleDF.as[People]
    
    peopleDS.write.save("/user/ds")
    peopleDF.write.save("/user/df")
    
    peopleDS.select("name").write.format("json").save("/user/ds_1") // Save to json format
    peopleDS.select("name").write.format("csv").save("/user/ds_1") // Save to csv format
    
    peopleDS.select("name").write.format("json").option("compression", "snappy").save("/user/ds_1")
    peopleDS.select("name").write.format("json").option("compression", "gzip").save("/user/ds_1")
    
    // Hive Metastore에 연결되어 있다면 Metastore의 정보를 이용해서 저장
    peopleDF.select("name", "age").write.saveAsTable("people")
    
    // 버켓팅
    peopleDF.write.bucketBy(42, "name")
      .sortBy("age")
      .saveAsTable("people_bucketed")
    
    // 파티셔닝
    peopleDF.write.partitionBy("age")
      .format("json")
      .save("/user/people/")
    
    // 파티셔닝, 버켓팅 
    peopleDF.write.partitionBy("age")
      .bucketBy(42, "name")
      .saveAsTable("people_p_b")
      
    // 버켓팅, 파티셔닝의 경우 Hive에서 등장하는 개념

     

    저장 모드 (SaveMode)

    설정 비고
    SaveMode.ErrorIfExists 파일이 있으면 에러 처리
    SaveMode.Append 다른 이름으로 파일 추가
    SaveMode.Overwrite 기존 파일을 삭제하고 추가
    SaveMode.Ignore 파일이 있으면 저장하지 않고, 에러 처리도 하지 않음
    import org.apache.spark.sql._
    
    val peopleDF = spark.read.json("/user/people.json")
    peopleDF.select("name", "age").write.mode(SaveMode.Overwrite).save("/user/people/")

     

    데이터 불러오기

    val peopleDF = spark.read.format("json").load("/user/ds_1/")
    
    peopleDF = spark.read.json("/user/ds_1/")
    
    peopleDF = spark.read.format("csv")
                    .option("sep", ",")
                    .option("inferSchema", "true")
                    .option("header", "true").load("/user/people.csv")

    'AI' 카테고리의 다른 글

    Spark : Config  (0) 2022.08.01
    Spark : RDD  (0) 2022.07.27
    Spark : 구조  (0) 2022.07.26
    Spark : Basic  (0) 2022.07.26
    MLOps 입력값 드리프트  (0) 2022.07.13

    댓글

Designed by Tistory.