1. ホーム
  2. java

[解決済み] Java 8 の並列ストリームにおけるカスタムスレッドプール

2022-03-20 21:37:32

質問内容

Java 8 のカスタムスレッドプールを指定することは可能ですか? 並列ストリーム ? どこにも見当たらないのですが。

サーバー・アプリケーションがあり、並列ストリームを使いたいとします。しかし、アプリケーションは大規模でマルチスレッドであるため、それを区分けしたいと思います。アプリケーションのあるモジュールで実行速度の遅いタスクが、他のモジュールのタスクをブロックするようなことはしたくありません。

モジュールごとに異なるスレッドプールを使用できないのであれば、現実世界のほとんどの状況で並列ストリームを安全に使用できないことを意味します。

次のような例で試してみてください。CPUに負荷のかかるタスクがいくつかあり、別々のスレッドで実行されています。 このタスクは並列ストリームを利用しています。最初のタスクは壊れているので、各ステップに 1 秒かかります (スレッドスリープでシミュレート)。問題は、他のスレッドが、壊れたタスクの終了を待って、立ち往生してしまうことです。これは意図的な例ですが、サーブレットアプリで、誰かが長時間実行するタスクを共有フォークジョインプールにサブミットした場合を想像してみてください。

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));


        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}

解決方法は?

実は、並列処理を特定のフォークジョインプールで実行する方法があるんです。フォークジョインプール内のタスクとして実行すると、そこに留まって共通のものを使わないのです。

final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
    forkJoinPool = new ForkJoinPool(parallelism);
    final List<Integer> primes = forkJoinPool.submit(() ->
        // Parallel task here, for example
        IntStream.range(1, 1_000_000).parallel()
                .filter(PrimesPrint::isPrime)
                .boxed().collect(Collectors.toList())
    ).get();
    System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
    throw new RuntimeException(e);
} finally {
    if (forkJoinPool != null) {
        forkJoinPool.shutdown();
    }
}

この仕掛けは ForkJoinTask.fork は次のように指定します: "現在のタスクが実行されているプールでこのタスクを非同期に実行するように手配します (該当する場合)。 ForkJoinPool.commonPool() そうでない場合は inForkJoinPool() "。