1. ホーム
  2. scala

[解決済み] DataFrameのパーティショニングはどのように定義するのですか?

2022-05-16 01:06:22

質問

Spark 1.4.0からSpark SQLとDataFramesを使い始めました。 ScalaでDataFramesにカスタムパーティショナーを定義したいのですが、その方法がわかりません。

私が作業しているデータテーブルの1つは、以下の例のようなアカウントごとのトランザクションのリストを含んでいます。

Account   Date       Type       Amount
1001    2014-04-01  Purchase    100.00
1001    2014-04-01  Purchase     50.00
1001    2014-04-05  Purchase     70.00
1001    2014-04-01  Payment    -150.00
1002    2014-04-01  Purchase     80.00
1002    2014-04-02  Purchase     22.00
1002    2014-04-04  Payment    -120.00
1002    2014-04-04  Purchase     60.00
1003    2014-04-02  Purchase    210.00
1003    2014-04-03  Purchase     15.00

少なくとも最初は、計算のほとんどはアカウント内のトランザクション間で行われます。 そのため、あるアカウントのすべてのトランザクションが同じ Spark パーティションにあるように、データをパーティション化したいと思います。

しかし、これを定義する方法が見当たりません。 DataFrame クラスには 'repartition(Int)' というメソッドがあり、作成するパーティションの数を指定することができます。 しかし、RDDに指定できるような、DataFrameのカスタムパーティショナーを定義するためのメソッドは見当たりません。

ソースデータは Parquet で保存されています。 DataFrame を Parquet に書き込むときに、パーティション分割する列を指定できることがわかりましたので、おそらく Parquet に 'Account' 列でデータをパーティション分割するように指示することができます。 しかし、何百万ものアカウントがある可能性があり、私が正しく理解していれば、Parquet はアカウントごとに個別のディレクトリを作成することになるため、合理的な解決策とは思えませんでした。

Spark にこの DataFrame をパーティショニングさせ、Account のすべてのデータが同じパーティションにあるようにする方法はありますか?

どのように解決するのですか?

Spark 2.3.0を使用しています。

SPARK-22614 は、レンジパーティショニングを公開します。

val partitionedByRange = df.repartitionByRange(42, $"k")

partitionedByRange.explain
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k ASC NULLS FIRST], 42
// +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6]
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- Project [_1#2 AS k#5, _2#3 AS v#6]
//    +- LocalRelation [_1#2, _2#3]
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- LocalRelation [k#5, v#6]
// 
// == Physical Plan ==
// Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42)
// +- LocalTableScan [k#5, v#6]

SPARK-22389 は、外部フォーマットパーティショニングを データソース API v2 .

スパーク >= 1.6.0

Spark >= 1.6では、クエリやキャッシュにカラムによるパーティショニングを使用することが可能です。参照してください。 SPARK-11410 スパーク-4849 を使用して repartition メソッドを使用しています。

val df = Seq(
  ("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")

val partitioned = df.repartition($"k")
partitioned.explain

// scala> df.repartition($"k").explain(true)
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Physical Plan ==
// TungstenExchange hashpartitioning(k#7,200), None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- Scan PhysicalRDD[_1#5,_2#6]

とは異なり RDDs スパーク Dataset (を含む Dataset[Row] を含む DataFrame ) は、今のところカスタムパーティショナーを使用することができません。人工的なパーティショニングカラムを作成することで一般的に対処できますが、同じ柔軟性を与えることはできません。

Spark 1.6.0 を参照してください。

できることの一つは、入力データをあらかじめパーティション分けしてから DataFrame

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.HashPartitioner

val schema = StructType(Seq(
  StructField("x", StringType, false),
  StructField("y", LongType, false),
  StructField("z", DoubleType, false)
))

val rdd = sc.parallelize(Seq(
  Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
  Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
))

val partitioner = new HashPartitioner(5) 

val partitioned = rdd.map(r => (r.getString(0), r))
  .partitionBy(partitioner)
  .values

val df = sqlContext.createDataFrame(partitioned, schema)

以降 DataFrame からの生成は RDD からの生成は、既存のパーティションレイアウトを保持する単純なマップフェーズのみを必要とします*。

assert(df.rdd.partitions == partitioned.partitions)

同じように、既存の DataFrame :

sqlContext.createDataFrame(
  df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
  df.schema
)

というわけで、不可能ではなさそうです。問題は、それが全く意味をなさないかどうかです。私は、ほとんどの場合、それは意味がないと主張します。

  1. 再パーティショニングは高価な処理です。典型的なシナリオでは、ほとんどのデータをシリアル化し、シャッフルし、デシリアライズする必要があります。一方、事前にパーティショニングされたデータから恩恵を受けることができる操作の数は比較的少なく、内部 API がこの特性を活用するように設計されていない場合は、さらに制限されます。

    • はいくつかのシナリオでジョインしますが、内部サポートが必要になります。
    • マッチングパーティショナーによるウィンドウ関数呼び出し。上記と同じで、1つのウィンドウ定義に限定されます。しかし、すでに内部でパーティショニングされているので、事前のパーティショニングは冗長になるかもしれません。
    • による単純な集計。 GROUP BY - を使うことで、一時的なバッファのメモリ使用量を減らすことができます**が、全体的なコストはかなり高くなります。多かれ少なかれ groupByKey.mapValues(_.reduce) (現在の動作) と reduceByKey (パーティション分割前)の比較。実際に役に立つとは思えません。
    • によるデータ圧縮は SqlContext.cacheTable . ランレングスエンコーディングを使用しているようなので OrderedRDDFunctions.repartitionAndSortWithinPartitions を適用すると圧縮率が向上する可能性があります。
  2. パフォーマンスは、キーの分布に大きく依存します。分布が偏っている場合、リソースの利用が最適化されないことになります。最悪の場合、ジョブを終了することはまったく不可能になります。

  3. 高レベルの宣言的なAPIを使用することの要点は、低レベルの実装の詳細から自分自身を分離することです。すでに @dwysakowicz ロミ・クンツマン の仕事であり、最適化は 触媒オプティマイザ . それはかなり洗練された獣であり、私は本当にあなたがその内部にはるかに深くダイビングせずに簡単にそれを改善することができます疑う。

関連する概念

JDBCソースでのパーティショニング :

JDBC データソースのサポート predicates 引数 . 以下のように使用することができます。

sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)

これは、述語ごとに1つのJDBCパーティションを作成します。もし、個々の述語を使って作られたセットが不連続でない場合、結果のテーブルに重複が見られることに注意してください。

partitionBy メソッドで DataFrameWriter :

スパーク DataFrameWriter 提供 partitionBy このメソッドは、書き込み時にデータを分割するために使用されます。これは、提供されたカラムのセットを使用して、書き込み時にデータを分離します。

val df = Seq(
  ("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6)
).toDF("k", "v")

df.write.partitionBy("k").json("/tmp/foo.json")

これは、キーに基づくクエリのためのreadでの述語のプッシュダウンを可能にします。

val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
df1.where($"k" === "bar")

と同じですが,これは DataFrame.repartition . 特に集合体のような

val cnts = df1.groupBy($"k").sum()

が必要な場合でも TungstenExchange :

cnts.explain

// == Physical Plan ==
// TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])
// +- TungstenExchange hashpartitioning(k#90,200), None
//    +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])
//       +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json

bucketBy メソッドで DataFrameWriter (Spark >= 2.0)を参照してください。

bucketBy と同じような用途があります。 partitionBy と同様の用途がありますが、テーブルに対してのみ有効です ( saveAsTable ). バケット情報は結合を最適化するために使用することができます。

// Temporarily disable broadcast joins
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

df.write.bucketBy(42, "k").saveAsTable("df1")
val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2")
df2.write.bucketBy(42, "k").saveAsTable("df2")

// == Physical Plan ==
// *Project [k#41, v#42, v2#47]
// +- *SortMergeJoin [k#41], [k#46], Inner
//    :- *Sort [k#41 ASC NULLS FIRST], false, 0
//    :  +- *Project [k#41, v#42]
//    :     +- *Filter isnotnull(k#41)
//    :        +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v:int>
//    +- *Sort [k#46 ASC NULLS FIRST], false, 0
//       +- *Project [k#46, v2#47]
//          +- *Filter isnotnull(k#46)
//             +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v2:double>


* 以下のとおりです。 パーティション・レイアウト データの分布だけを意味します。 partitioned RDDにはもはやパーティショナーはない。 ** 初期投影がないと仮定します。集計が列の小さなサブセットのみをカバーする場合、おそらく何の利益もありません。