1. ホーム
  2. scala

[解決済み] scala.concurrent.Promiseのユースケースは何ですか?

2023-01-29 01:58:57

質問

私が読んでいるのは SIP-14 を読んでいて、そのコンセプトは Future の概念は完璧に理にかなっており、理解しやすいものです。しかし、以下の2つの疑問があります。 Promise :

  1. SIPでは Depending on the implementation, it may be the case that p.future == p . これはどうしたらいいのでしょうか?これは FuturePromise は2つの異なるタイプではないのですか?

  2. どのような場合に Promise ? 例として producer and consumer のコードです。

    import scala.concurrent.{ future, promise }
    val p = promise[T]
    val f = p.future
    
    val producer = future {
        val r = produceSomething()
        p success r
        continueDoingSomethingUnrelated()
    }
    val consumer = future {
        startDoingSomething()
        f onSuccess {
            case r => doSomethingWithResult()
        }
    }
    
    

は読みやすいですが、本当にこのように書く必要があるのでしょうか?ということで、Futureのみ、Promiseは無しで実装してみました。

val f = future {
   produceSomething()
}

val producer = future {
   continueDoingSomethingUnrelated()
}

startDoingSomething()

val consumer = future {
  f onSuccess {
    case r => doSomethingWithResult()
  }
}

与えられた例との違い、Promiseが必要な理由は何でしょうか?

どのように解決するのですか?

PromiseとFutureは補完的な概念です。Futureは将来のいつかに取り出される値で、そのイベントが発生したときにそれを使って何かをすることができます。したがって、これは計算の読み取りまたは出力エンドポイントであり、そこから値を取得するものです。

プロミスは類似しており、計算の書き込み側です。プロミスは計算結果を格納する場所であり、そのプロミスからプロミスに格納された結果を読み出すために使用される未来を取得します。プロミスを完了させると、失敗でも成功でも、関連するFutureに付けられたすべての動作が開始されます。

最初の質問についてですが、どうしてプロミスpに対して p.future == p . これは単項目バッファのようなもので、最初は空っぽで、後から一つの値を保存して、それが永遠にその中身となるコンテナだと想像できます。さて、あなたの視点によって、これは約束であり未来でもあります。バッファに値を書き込もうとする人にとっては約束です。バッファに値が書き込まれるのを待っている人にとっては未来です。

具体的には,Scala の concurrent API の場合,Promise trait を見てみると ここで を見ると、Promiseコンパニオンオブジェクトのメソッドがどのように実装されているかが分かります。

object Promise {

  /** Creates a promise object which can be completed with a value.
   *  
   *  @tparam T       the type of the value in the promise
   *  @return         the newly created `Promise` object
   */
  def apply[T](): Promise[T] = new impl.Promise.DefaultPromise[T]()

  /** Creates an already completed Promise with the specified exception.
   *  
   *  @tparam T       the type of the value in the promise
   *  @return         the newly created `Promise` object
   */
  def failed[T](exception: Throwable): Promise[T] = new impl.Promise.KeptPromise[T](Failure(exception))

  /** Creates an already completed Promise with the specified result.
   *  
   *  @tparam T       the type of the value in the promise
   *  @return         the newly created `Promise` object
   */
  def successful[T](result: T): Promise[T] = new impl.Promise.KeptPromise[T](Success(result))

}

さて,これらのプロミス,DefaultPromiseとKeptPromiseの実装は,以下のようになります. にあります. . これらは両方とも,たまたま同じ名前を持つ基本的な小さな特徴を拡張していますが,別のパッケージに置かれています.

private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with scala.concurrent.Future[T] {
  def future: this.type = this
}

で何を意味しているのかがわかります。 p.future == p .

DefaultPromise は私が上で言及したバッファで、一方 KeptPromise は作成時から値が入れられたバッファです。

あなたの例についてですが、あなたがそこで使っているfutureブロックは、実は裏側でプロミスを作っているのです。の定義を見てみましょう。 future ここで :

def future[T](body: =>T)(implicit execctx: ExecutionContext): Future[T] = Future[T](body)

メソッドの連鎖をたどっていくと、最終的に impl.Future :

private[concurrent] object Future {
  class PromiseCompletingRunnable[T](body: => T) extends Runnable {
    val promise = new Promise.DefaultPromise[T]()

    override def run() = {
      promise complete {
        try Success(body) catch { case NonFatal(e) => Failure(e) }
      }
    }
  }

  def apply[T](body: =>T)(implicit executor: ExecutionContext): scala.concurrent.Future[T] = {
    val runnable = new PromiseCompletingRunnable(body)
    executor.execute(runnable)
    runnable.promise.future
  }
}

このように、プロデューサーブロックから得られる結果はプロミスに注ぎ込まれます。

後日編集 :

実戦での使用について。ほとんどの場合、プロミスを直接扱うことはないでしょう。非同期計算を行うライブラリを使うのであれば、そのライブラリのメソッドから返されるfutureを扱うだけです。この場合,プロミスはライブラリによって生成され,あなたはそれらのメソッドが行うことのリーディングエンドで作業しているだけです.

しかし、もしあなた自身の非同期APIを実装する必要があるならば、それらを使って作業を始めなければなりません。 例えば、Netty の上に非同期 HTTP クライアントを実装する必要があるとします。その場合、あなたのコードは次のようになります。

    def makeHTTPCall(request: Request): Future[Response] = {
        val p = Promise[Response]
        registerOnCompleteCallback(buffer => {
            val response = makeResponse(buffer)
            p success response
        })
        p.future
    }