Skip to content Skip to sidebar Skip to footer

Python: 'before' And 'after' For Multiprocessing Workers

Update: Here is a more specific example Suppose I want to compile some statistical data from a sizable set of files: I can make a generator (line for line in fileinput.input(file

Solution 1:

So you basically create a histogram. This is can easily be parallelized, because histograms can be merged without complication. One might want to say that this problem is trivially parallelizable or "embarrassingly parallel". That is, you do not need to worry about communication among workers.

Just split your data set into multiple chunks, let your workers work on these chunks independently, collect the histogram of each worker, and then merge the histograms.

In practice, this problem is best off by letting each worker process/read its own file. That is, a "task" could be a file name. You should not start pickling file contents and send them around between processes through pipes. Let each worker process retrieve the bulk data directly from files. Otherwise your architecture spends too much time with inter-process communication, instead of doing some real work.

Do you need an example or can you figure this out yourself?

Edit: example implementation

I have a number of data files with file names in this format: data0.txt, data1.txt, ... .

Example contents:

wolf
wolf
cat
blume
eisenbahn

The goal is to create a histogram over the words contained in the data files. This is the code:

from multiprocessing import Pool
from collections import Counter
import glob


def build_histogram(filepath):
    """This function is run by a worker process.
    The `filepath` argument is communicated to the worker
    through a pipe. The return value of this function is
    communicated to the manager through a pipe.
    """
    hist = Counter()
    with open(filepath) as f:
        for line in f:
            hist[line.strip()] += 1
    return hist


def main():
    """This function runs in the manager (main) process."""

    # Collect paths to data files.
    datafile_paths = glob.glob("data*.txt")

    # Create a pool of worker processes and distribute work.
    # The input to worker processes (function argument) as well
    # as the output by worker processes is transmitted through
    # pipes, behind the scenes.
    pool = Pool(processes=3)
    histograms = pool.map(build_histogram, datafile_paths)

    # Properly shut down the pool of worker processes, and
    # wait until all of them have finished.
    pool.close()
    pool.join()

    # Merge sub-histograms. Do not create too many intermediate
    # objects: update the first sub-histogram with the others.
    # Relevant docs: collections.Counter.update
    merged_hist = histograms[0]
    for h in histograms[1:]:
        merged_hist.update(h)

    for word, count in merged_hist.items():
        print "%s: %s" % (word, count)


if __name__ == "__main__":
    main()

Test output:

python countwords.py
eisenbahn: 12
auto: 6
cat: 1
katze: 10
stadt: 1
wolf: 3
zug: 4
blume: 5
herbert: 14
destruction: 4

Solution 2:

I had to modify the original pool.py (the trouble was worker is defined as a method without any inheritance) to get what I want but it's not so bad, and probably better than writing a new pool entirely.

class worker(object):
    def __init__(self, inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
           wrap_exception=False, finalizer=None, finargs=()): 
        assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
        put = outqueue.put
        get = inqueue.get
        self.completed = 0
        if hasattr(inqueue, '_writer'):
            inqueue._writer.close()
            outqueue._reader.close()
        if initializer is not None:
            initializer(self, *initargs)

        def run(self): 
            while maxtasks is None or (maxtasks and self.completed < maxtasks):
                try:
                    task = get()
                except (EOFError, OSError):
                    util.debug('worker got EOFError or OSError -- exiting')
                    break

                if task is None:
                    util.debug('worker got sentinel -- exiting')
                    break

                job, i, func, args, kwds = task
                try:
                    result = (True, func(*args, **kwds))
                except Exception as e:
                    if wrap_exception:
                        e = ExceptionWithTraceback(e, e.__traceback__)
                    result = (False, e)
                try:
                    put((job, i, result))
                except Exception as e:
                    wrapped = MaybeEncodingError(e, result[1])
                    util.debug("Possible encoding error while sending result: %s" % (
                        wrapped))
                    put((job, i, (False, wrapped)))
                self.completed += 1
            if finalizer:
                finalizer(self, *finargs)
            util.debug('worker exiting after %d tasks' % self.completed)
        run(self)

Post a Comment for "Python: 'before' And 'after' For Multiprocessing Workers"