Python MultiProcessing Files using Manager and Consumer


Using Multiprocessing in Python is a bit tricky. Sometimes when we are using simple Queue() and join() it’s just hang there.
To make it more stable, we can use Manager and Consumer in python multiprocessing. Remember, using Queue on multiprocessing manager is better than Queue(). Why?

Warning As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread()), then that process will not terminate until all buffered items have been flushed to the pipe. This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children. Note that a queue created using a manager does not have this issue. See Programming guidelines.

Here is the problem:
I need to read all HTML files inside folder and process them not in sequential way because it too slow.
That mean I should be able open and read HTML files in multi way.

Then, we can use multiprocessing to solve this problem. First thing first that we will build is queue.
It will cater all input and output. Here is implementation of manager Queue() :

1
2
3
manager = multiprocessing.Manager()
task_queue = manager.Queue()
todo_queue = manager.Queue()

Then we use Consumer instead of Process to wrapped the queue:

1
2
consumers_finished = manager.dict()
consumers = [Consumer(todo_queue, task_queue, consumers_finished) for i in range(3)]

Then, we just iterate on each consumer to start the task:

1
2
3
for consumer in consumers:
    consumers_finished[consumer.name] = False
    consumer.start()

In this example, I will list all HTML files on folder and put it into todo_queue().

1
2
3
for html in os.listdir("."):
    if ".html" in html:
        todo_queue.put(html)

We should join all consumer by :

1
for consumer in consumers: consumer.join()

And the end, we can get the results from :

1
for r in iter(task_queue.get, "STOP"):

Here is the full code of implementation:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import os
import multiprocessing
from Queue import Empty

class Consumer(multiprocessing.Process):

    def __init__(self, todo_queue, task_queue, consumers_finished):
        multiprocessing.Process.__init__(self)
        self.todo_queue = todo_queue
        self.task_queue = task_queue
        self.consumers_finished = consumers_finished

    def run(self):
        while not all(flag for flag in self.consumers_finished.values()):
            try:
                task = self.todo_queue.get(False)
                self.consumers_finished[self.name] = False
            except Empty:
                self.task_queue.put("STOP")
                self.consumers_finished[self.name] = True
            else:
                task_result = self.process_data(task)
                self.task_queue.put(task_result)

    def process_data(self, html):
        print "Processing %s" % html
        return html

class Starter(object):
    def start(self):
        manager = multiprocessing.Manager()
        task_queue = manager.Queue()
        todo_queue = manager.Queue()
        consumers_finished = manager.dict()
        cpu_core = 3
        consumers = [Consumer(todo_queue, task_queue, consumers_finished) for i in range(cpu_core)]

        for html in os.listdir("."):
            if ".html" in html:
                todo_queue.put(html)

        for consumer in consumers:
            consumers_finished[consumer.name] = False
            consumer.start()

        for consumer in consumers: consumer.join()

        for r in iter(task_queue.get, "STOP"):
            print r

if __name__ == "__main__":
    s = Starter()
    s.start()

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.