1. ホーム
  2. python

[解決済み] (py)sparkのすべてのデータフレームの非存在化

2022-02-10 15:14:17

質問

私は、現在の状態を永続化したいいくつかのポイントがあるスパークアプリケーションです。これは通常、大きなステップの後、または複数回使用したい状態をキャッシュすることです。データフレームで2回目のキャッシュを呼び出すと、新しいコピーがメモリにキャッシュされるようです。私のアプリケーションでは、これはスケールアップする際のメモリの問題につながります。私の現在のテストでは、あるデータフレームは最大で100MB程度ですが、中間結果の累積サイズは、エグゼキュータに割り当てられたメモリを超えて大きくなっていきます。この挙動を示す小さな例を以下に示します。

cache_test.py:

from pyspark import SparkContext, HiveContext

spark_context = SparkContext(appName='cache_test')
hive_context = HiveContext(spark_context)

df = (hive_context.read
      .format('com.databricks.spark.csv')
      .load('simple_data.csv')
     )
df.cache()
df.show()

df = df.withColumn('C1+C2', df['C1'] + df['C2'])
df.cache()
df.show()

spark_context.stop()

simple_data.csvです。

1,2,3
4,5,6
7,8,9

アプリケーションのUIを見ると、新しいカラムのあるデータフレームに加え、元のデータフレームのコピーがあります。元のコピーを削除するには df.unpersist() withColumn の行の前にあります。これは、キャッシュされた中間結果を削除するために推奨される方法でしょうか(つまり、すべての cache() ).

また、キャッシュされたオブジェクトをすべてパージすることは可能でしょうか。私のアプリケーションでは、単にすべてのメモリをパージして次のファイルに移ることができる自然なブレイクポイントがあります。入力ファイルごとに新しいsparkアプリケーションを作成することなく、これを実行したいのです。

よろしくお願いします。

どのように解決するのですか?

Spark 2.x

を使用することができます。 Catalog.clearCache :

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate
...
spark.catalog.clearCache()

スパーク1.x

を使用することができます。 SQLContext.clearCache というメソッドがあります。

インメモリキャッシュから、キャッシュされたすべてのテーブルを削除します。

from pyspark.sql import SQLContext
from pyspark import SparkContext

sqlContext = SQLContext.getOrCreate(SparkContext.getOrCreate())
...
sqlContext.clearCache()