1. ホーム
  2. java-8

[解決済み] Java 8 のストリームと RxJava の observable の違い

2022-04-28 22:36:09

質問

Java 8 のストリームは、RxJava の observable に似ていますか?

Java 8 ストリームの定義。

のクラスは、新しい java.util.stream パッケージは、ストリームAPIを提供し は、要素のストリームに対する関数型の操作をサポートしています。

どうやって解決するの?

簡単な答え

どのシーケンス/ストリーム処理ライブラリも、パイプライン構築のための非常によく似たAPIを提供しています。違いは、マルチスレッドを扱うAPIとパイプラインを構成するAPIです。

長い回答

RxJavaはStreamとはかなり違います。JDKの中で、最も近いのは rx.Observable は、おそらく java.util.stream.Collector Stream + CompletableFuture コンボ(これは、余分なモナド層を扱うという代償を伴います。 Stream<CompletableFuture<T>>CompletableFuture<Stream<T>> ).

ObservableとStreamには大きな違いがあります。

  • ストリームはプルベース、Observablesはプッシュベースです。これは抽象的すぎると思われるかもしれませんが、非常に具体的で重要な結果をもたらします。
  • ストリームは一度しか使えないが、Observableは何度でも購読できる。
  • Stream#parallel() はシーケンスをパーティションに分割する。 Observable#subscribeOn()Observable#observeOn() をエミュレートするのは厄介です。 Stream#parallel() の動作は、Observableでは、かつて .parallel() メソッドがありますが、このメソッドは非常に多くの混乱を引き起こしたため .parallel() のサポートは別のリポジトリに移されました。 ReactiveX/RxJavaParallel: RxJavaの実験的な並列拡張機能 . 詳細は 別解 .
  • Stream#parallel() は、オプションの Scheduler を受け付ける多くの RxJava メソッドと異なり、使用するスレッドプールを指定することができません。そのため すべて ストリームインスタンスは、同じフォークジョインプールを使用します。 .parallel() は、あなたのプログラムの他のモジュールでの動作に誤って影響を与える可能性があります。
  • ストリームは、以下のような時間に関する操作が欠けています。 Observable#interval() , Observable#window() これは、ストリームがプルベースであり、上流が以下のことを制御できないためです。 いつ を使用して、次の要素を下流に送信します。
  • ストリームはRxJavaと比較して、制限された操作のセットを提供します。例えば、ストリームはカットオフ操作( takeWhile() , takeUntil() を使用した回避策です。 Stream#anyMatch() 端末操作なので、1ストリームに1回以上使用することはできません。
  • JDK 8の時点では Stream#zip() の操作で、時々かなり便利です。
  • ストリームは自分で構築するのが難しいが、Observableは様々な方法で構築することができる。 EDITです。 コメントにもあるように、Streamを構成する方法はあります。しかし、非端子短絡がないので、例えば、ファイル中の行のStreamを簡単に生成することはできない(JDKでは Files#lines()BufferedReader#lines() のように、IteratorからStreamを構築することで同様のシナリオを管理することができます)。
  • Observableは、リソース管理機能( Observable#using() IO ストリームやミューテックスをこれでラップし、ユーザがリソースの解放を忘れないようにすることができます - それはサブスクリプション終了時に自動的に破棄されます; ストリームには onClose(Runnable) メソッドがありますが、手動で呼び出すか、try-with-resourcesで呼び出す必要があります。例えば、以下のことを念頭に置く必要があります。 Files#lines() なければならない は try-with-resources ブロックで囲まれています。
  • Observablesは終始同期している(Streamsも同様かどうかは実際に確認しなかった)。このため、基本的な操作がスレッドセーフかどうかを考える必要はありませんが(バグがない限り、答えは常に「イエス」です)、同期に関連するオーバーヘッドは、あなたのコードがそれを必要とするかどうかに関係なく存在することになります。

ラウンドアップ

RxJavaはStreamsと大きく異なります。RxJavaの代替となるのは リアクティブストリーム 例えば、Akkaの関連する部分です。

更新情報

にデフォルトでないフォークジョインプールを使用するトリックがあります。 Stream#parallel をご覧ください。 Java 8 の並列ストリームにおけるカスタムスレッドプール .

更新情報

上記はすべてRxJava 1.xでの経験に基づいています。 RxJava 2.xはこちら この回答は古くなっている可能性があります。