Note: This article applies to Python 2 environment.

Background

Nowadays, I’m writing a program to do some system administration work using Python. The tasks in my program should be run in parallel. Considering the Python GIL mechanism, I decide to use multi-processing instead of multi-threading. Quite naturally, I choose the multiprocessing module.

The program structure looks roughly like this: the main process creates a pool(named A) of worker process, each worker process in A will create a sub pool of worker process, the real work will be accomplished by the innermost worker process.

Problem

In order to describe my problem more clearly, I try to simplify my code as follows(You can down it from here):

#!/usr/bin/python2.7
# -*- coding: UTF-8 -*-


import multiprocessing
import multiprocessing.pool


def create_process_pool(index):
    print index
    li = range(3)
    pool = multiprocessing.Pool(processes = len(li))
    for sub_index in li:
        pool.apply_async(print_process_index, (index, sub_index))
    pool.close()
    pool.join()


def print_process_index(index, sub_index):
    print "%d-%d" %(index, sub_index)


li = range(3)
pool = multiprocessing.Pool(processes = len(li))
pool.map(create_process_pool, li)
# pool.map_async(create_process_pool, li)
# for index in li:
    # pool.apply(create_process_pool, [index])
    # pool.apply_async(create_process_pool, [index])
pool.close()
pool.join()


However, when I try to create a sub pool in the worker process of pool A, I always get this error message:

Traceback (most recent call last):
  File "./multiprocessing_test.bug.py", line 25, in <module>
    pool.map(create_process_pool, li)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 251, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 558, in get
    raise self._value
AssertionError: daemonic processes are not allowed to have children

 

How I fixed the problem

After several times of Googling, I found this. Here is the explanation of this error:

The multiprocessing.pool.Pool class creates the worker processes in its __init__ method, makes them daemonic and starts them, and it is not possible to re-set their daemon attribute to False before they are started (and afterwards it's not allowed anymore). But you can create your own sub-class of multiprocesing.pool.Pool (multiprocessing.Pool is just a wrapper function) and substitute your own multiprocessing.Process sub-class, which is always non-daemonic, to be used for the worker processes.

Here is the fixed code(You can down it from here):

#!/usr/bin/python2.7
# -*- coding: UTF-8 -*-


import multiprocessing
import multiprocessing.pool


class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class NoDaemonProcessPool(multiprocessing.pool.Pool):
    Process = NoDaemonProcess


def create_process_pool(index):
    print index
    li = range(3)
    pool = multiprocessing.Pool(processes = len(li))
    for sub_index in li:
        pool.apply_async(print_process_index, (index, sub_index))
    pool.close()
    pool.join()


def print_process_index(index, sub_index):
    print "%d-%d" %(index, sub_index)


li = range(3)
# pool = multiprocessing.Pool(processes = len(li))
pool = NoDaemonProcessPool(processes = len(li))
pool.map(create_process_pool, li)
# pool.map_async(create_process_pool, li)
# for index in li:
    # pool.apply(create_process_pool, [index])
    # pool.apply_async(create_process_pool, [index])
pool.close()
pool.join()


Now everything works as expected. 🙂

 

References:

Leave a Reply

Your email address will not be published. Required fields are marked *