Using Multiprocessing in Python is a bit tricky. Sometimes when we are using simple Queue() and join() it’s just hang there.
To make it more stable, we can use Manager and Consumer in python multiprocessing. Remember, using Queue on multiprocessing manager is better than Queue(). Why?
Warning As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread()), then that process will not terminate until all buffered items have been flushed to the pipe. This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children. Note that a queue created using a manager does not have this issue. See Programming guidelines.
Here is the problem:
I need to read all HTML files inside folder and process them not in sequential way because it too slow.
That mean I should be able open and read HTML files in multi way.
Then, we can use multiprocessing to solve this problem. First thing first that we will build is queue.
It will cater all input and output. Here is implementation of manager Queue() :
1 2 3 | manager = multiprocessing.Manager() task_queue = manager.Queue() todo_queue = manager.Queue() |
Then we use Consumer instead of Process to wrapped the queue:
1 2 | consumers_finished = manager.dict() consumers = [Consumer(todo_queue, task_queue, consumers_finished) for i in range(3)] |
Then, we just iterate on each consumer to start the task:
1 2 3 | for consumer in consumers: consumers_finished[consumer.name] = False consumer.start() |
In this example, I will list all HTML files on folder and put it into todo_queue().
1 2 3 | for html in os.listdir("."): if ".html" in html: todo_queue.put(html) |
We should join all consumer by :
1 | for consumer in consumers: consumer.join() |
And the end, we can get the results from :
1 | for r in iter(task_queue.get, "STOP"): |
Here is the full code of implementation:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 | import os import multiprocessing from Queue import Empty class Consumer(multiprocessing.Process): def __init__(self, todo_queue, task_queue, consumers_finished): multiprocessing.Process.__init__(self) self.todo_queue = todo_queue self.task_queue = task_queue self.consumers_finished = consumers_finished def run(self): while not all(flag for flag in self.consumers_finished.values()): try: task = self.todo_queue.get(False) self.consumers_finished[self.name] = False except Empty: self.task_queue.put("STOP") self.consumers_finished[self.name] = True else: task_result = self.process_data(task) self.task_queue.put(task_result) def process_data(self, html): print "Processing %s" % html return html class Starter(object): def start(self): manager = multiprocessing.Manager() task_queue = manager.Queue() todo_queue = manager.Queue() consumers_finished = manager.dict() cpu_core = 3 consumers = [Consumer(todo_queue, task_queue, consumers_finished) for i in range(cpu_core)] for html in os.listdir("."): if ".html" in html: todo_queue.put(html) for consumer in consumers: consumers_finished[consumer.name] = False consumer.start() for consumer in consumers: consumer.join() for r in iter(task_queue.get, "STOP"): print r if __name__ == "__main__": s = Starter() s.start() |