1. ホーム
  2. データベース
  3. エムエスエル

Spark SQL 2.4.8 データフレームを操作するための2つの方法

2022-01-18 02:36:17

I. テストデータ

7369,SMITH,CLERK,7902,1980/12/17,800,20
7499,allen,salesman,7698,1981/2/20,1600,300,30
7521,ward,salesman,7698,1981/2/22,1250,500,30
7566,JONES,MANAGER,7839,1981/4/2,2975,20
7654,martin,salesman,7698,1981/9/28,1250,1400,30
7698,BLAKE,MANAGER,7839,1981/5/1,2850,30
7782,CLARK,MANAGER,7839,1981/6/9,2450,10
7788,SCOTT,ANALYST,7566,1987/4/19,3000,20
7839,KING,PRESIDENT,1981/11/17,5000,10
7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30
7876,ADAMS,CLERK,7788,1987/5/23,1100,20
7900,james,clerk,7698,1981/12/3,9500,30
7902,FORD,ANALYST,7566,1981/12/3,3000,20
7934,MILLER,CLERK,7782,1982/1/23,1300,10

II. データフレームの作成

方法1.DSL操作

    {を使用します。 SparkContextとSparkSessionのオブジェクトのインスタンス化 {を使用します。 データの構造情報を定義するStructType型を使ってスキーマを構築する。 {を使用します。 SparkContextオブジェクトを通してファイルを読み込み、RDDを生成する。 {を使用します。 RDD[String]をRDD[Row]に変換する。 {を使用します。 SparkSessionオブジェクトからdataframeを作成する {を使用します。 コード全体は以下の通りです。
package com.scala.demo.sql

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType}

object Demo01 {
  def main(args: Array[String]): Unit = {
    // 1. Create SparkContext and SparkSession objects
    val sc = new SparkContext(new SparkConf().setAppName("Demo01").setMaster("local[2]"))
    val sparkSession = SparkSession.builder().getOrCreate()

    // 2. Use StructType to define Schema
    val mySchema = StructType(List(
      StructField("empno", DataTypes.IntegerType, false),
      StructField("ename", DataTypes.StringType, false),
      StructField("job", DataTypes.StringType, false),
      StructField("mgr", DataTypes.StringType, false),
      StructField("hiredate", DataTypes.StringType, false),
      StructField("sal", DataTypes.IntegerType, false),
      StructField("comm", DataTypes.StringType, false),
      StructField("deptno", DataTypes.IntegerType, false)
    ))
    // 3. Read the data
    val empRDD = sc.textFile("file:///D:\\TestDatas\\\emp.csv")

    // 4. Map it to a ROW object
    val rowRDD = empRDD.map(line => {
      val strings = line.split(",")
      Row(strings(0).toInt, strings(1), strings(2), strings(3), strings(4), strings(5).toInt, strings(6), strings(7).toInt)
    })

    // 5. Create a DataFrame
    val dataFrame = sparkSession.createDataFrame(rowRDD, mySchema)

    // 6. Display the content DSL
	dataFrame.groupBy("deptno").sum("sal").as("result").sort("sum(sal)").show()
  }
}

結果は次のようになります。

方法2:SQL操作

  • SparkContextとSparkSessionのオブジェクトのインスタンス化
  • データの構造情報を定義するケースクラス Emp sample クラスの作成
  • {を使用します。 SparkContextオブジェクトを通してファイルを読み込み、RDD[String]を生成します。
  • RDD[String]をRDD[Emp]に変換する。
  • {を使用します。 スパーク暗黙変換関数を導入する(必須)
  • RDD[Emp]をDataFrameに変換する
  • {を使用します。 DataFrameをビューやテンポラリテーブルとして登録する {を使用します。 SparkSessionオブジェクトのsql関数を呼び出してsql文を記述する {を使用します。 リソースを停止する
  • 具体的なコードは以下の通りです。
package com.scala.demo.sql

import org.apache.spark.rdd.
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType}

// 0. data analysis
// 7499,allen,salesman,7698,1981/2/20,1600,300,30
// 1. Define the Emp sample class
case class Emp(empNo:Int,empName:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptNo:Int)

object Demo02 {
  def main(args: Array[String]): Unit = {
    // 2. Read the data and map it to a Row object
    val sc = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("Demo02"))
    val mapRdd = sc.textFile("file:///D:\\TestDatas\\emp.csv")
      .map(_.split(","))

    val rowRDD:RDD[Emp] = mapRdd.map(line => Emp(line(0).toInt, line(1), line(2), line(3), line(4), line(5).toInt, line(6), line(7).toInt))

    // 3. Create the dataframe
    val spark = SparkSession.builder().getOrCreate()
    // Introduce the spark implicit conversion function
    import spark.implicits._
    // Convert the RDD to a Dataframe
    val dataFrame = rowRDD.toDF

    // 4.2 sql statement operations
    // 1. Register the dataframe as a temporary table
    dataFrame.createOrReplaceTempView("emp")
    // 2. write sql statements to perform the operation
    spark.sql("select deptNo,sum(sal) as total from emp group by deptNo order by total desc").show()

    // Close the resource
    spark.stop()
    sc.stop()
  }
}

結果は次のようになります。

今回はSpark SQL 2.4.8のDataframeの2つの操作方法について紹介しましたが、より関連するSpark SQL操作Dataframeの内容はBinaryDevelopの過去の記事を検索するか、以下の関連記事を引き続き閲覧してください。