1. ホーム
  2. scala

[解決済み】Spark Exponential Moving Averageについて

2022-02-06 06:09:05

質問

ID、日付、価格を持つ時系列価格データのデータフレームがあります。

価格カラムの指数移動平均を計算し、データフレームに新しいカラムとして追加する必要があります。

以前からSparkの窓関数は使っていて、今回のユースケースには合っているように見えたのですが、EMAの計算式を考えると

EMA: {Price - EMA(previous day)} x multiplier + EMA(previous day)

ここで

multiplier = (2 / (Time periods + 1)) //let's assume Time period is 10 days for now

実際に列の上をウィンドウで移動しながら、列の前の計算された値にアクセスするにはどうしたらいいか、ちょっと混乱しました。 単純な移動平均では、ウィンドウ内の要素を平均化しながら新しい列を計算すればよいので、簡単です。

var window = Window.partitionBy("ID").orderBy("Date").rowsBetween(-windowSize, Window.currentRow)
dataFrame.withColumn(avg(col("Price")).over(window).alias("SMA"))

しかし、EMAの場合は、各ステップで前の計算値が必要なので、少し複雑になるようです。

も見てみました。 Pysparkの加重移動平均 しかし、Spark/Scala用のアプローチが必要で、10日または30日のEMAのために。

何かアイデアはありますか?

解決方法は?

最後に、指数移動平均がpandasのdataframeでどのように実装されているかを分析しました。上で説明した再帰的な式の他に、SQLやwindow関数で実装するのが難しい(再帰的なので)別の式があります。 課題追跡システム :

y[t] = (x[t] + (1-a)*x[t-1] + (1-a)^2*x[t-2] + ... + (1-a)^n*x[t-n]) /
       ((1-a)^0 + (1-a)^1 + (1-a)^2 + ... + (1-a)^n).

これを踏まえて、さらにSpark実装のヘルプとして こちら というのとほぼ同等の実装になりました。 pandas_dataframe.ewm(span=window_size).mean() .

def exponentialMovingAverage(partitionColumn: String, orderColumn: String, column: String, windowSize: Int): DataFrame = {
  val window = Window.partitionBy(partitionColumn)
  val exponentialMovingAveragePrefix = "_EMA_"

  val emaUDF = udf((rowNumber: Int, columnPartitionValues: Seq[Double]) => {
    val alpha = 2.0 / (windowSize + 1)
    val adjustedWeights = (0 until rowNumber + 1).foldLeft(new Array[Double](rowNumber + 1)) { (accumulator, index) =>
      accumulator(index) = pow(1 - alpha, rowNumber - index); accumulator
    }
    (adjustedWeights, columnPartitionValues.slice(0, rowNumber + 1)).zipped.map(_ * _).sum / adjustedWeights.sum
  })
  dataFrame.withColumn("row_nr", row_number().over(window.orderBy(orderColumn)) - lit(1))
    .withColumn(s"$column$exponentialMovingAveragePrefix$windowSize", emaUDF(col("row_nr"), collect_list(column).over(window)))
    .drop("row_nr")
}

(指数移動平均を計算する列の型は Double と仮定しています。)

他の方の参考になれば幸いです。