1. ホーム
  2. python

[解決済み] Pythonでマルチプロセッシングキューを使うには?

2022-07-07 02:25:26

質問

Pythonのマルチプロセッシングキューがどのように動作するのか、またどのように実装するのか理解するのに大変苦労しています。共有ファイルからデータにアクセスする2つのPythonモジュールがあるとします。これらの2つのモジュールをライターとリーダーと呼びましょう。私の計画は、リーダーとライターの両方が2つの別々のマルチプロセッシングキューにリクエストを入れ、そして3番目のプロセスがループでこれらのリクエストをポップして、そのように実行することです。

私の主な問題は、multiprocessing.queue を正しく実装する方法を本当に知らないことです。それらは別々のキューになるので、各プロセスのオブジェクトを本当にインスタンス化することはできません。

どのように解決するのですか?

私の主な問題は、multiprocessing.queue を正しく実装する方法が本当にわからないことです。各プロセスが個別のキューになるため、各プロセスのオブジェクトを本当にインスタンス化することはできませんが、すべてのプロセスが共有キュー(またはこの場合はキュー)に関連していることを確認する方法はあります。

これは、リーダとライタがひとつのキューを共有する簡単な例です... ライターが整数の束をリーダーに送ります。ライターが数を使い果たすと「DONE」を送り、リーダーに読み込みループから抜け出すことを知らせます。

リーダープロセスはいくつでも生成できる

from multiprocessing import Process, Queue
import time
import sys


def reader_proc(queue):
    """Read from the queue; this spawns as a separate Process"""
    while True:
        msg = queue.get()  # Read from the queue and do nothing
        if msg == "DONE":
            break


def writer(count, num_of_reader_procs, queue):
    """Write integers into the queue.  A reader_proc() will read them from the queue"""
    for ii in range(0, count):
        queue.put(ii)  # Put 'count' numbers into queue

    ### Tell all readers to stop...
    for ii in range(0, num_of_reader_procs):
        queue.put("DONE")


def start_reader_procs(qq, num_of_reader_procs):
    """Start the reader processes and return all in a list to the caller"""
    all_reader_procs = list()
    for ii in range(0, num_of_reader_procs):
        ### reader_p() reads from qq as a separate process...
        ###    you can spawn as many reader_p() as you like
        ###    however, there is usually a point of diminishing returns
        reader_p = Process(target=reader_proc, args=((qq),))
        reader_p.daemon = True
        reader_p.start()  # Launch reader_p() as another proc

        all_reader_procs.append(reader_p)

    return all_reader_procs


if __name__ == "__main__":
    num_of_reader_procs = 2
    qq = Queue()  # writer() writes to qq from _this_ process
    for count in [10**4, 10**5, 10**6]:
        assert num_of_reader_procs > 0
        assert num_of_reader_procs < 4
        all_reader_procs = start_reader_procs(qq, num_of_reader_procs)

        writer(count, len(all_reader_procs), qq)  # Queue stuff to all reader_p()
        print("All reader processes are pulling numbers from the queue...")

        _start = time.time()
        for idx, a_reader_proc in enumerate(all_reader_procs):
            print("    Waiting for reader_p.join() index %s" % idx)
            a_reader_proc.join()  # Wait for a_reader_proc() to finish

            print("        reader_p() idx:%s is done" % idx)

        print(
            "Sending {0} integers through Queue() took {1} seconds".format(
                count, (time.time() - _start)
            )
        )
        print("")