Skip to content Skip to sidebar Skip to footer

Python: How To Parallelizing A Simple Loop With MPI

I need to rewrite a simple for loop with MPI cause each step is time consuming. Lets say I have a list including several np.array and I want to apply some computation on each arra

Solution 1:

For simplicity let us assume that the master process (the process with rank = 0) is the one that will read the entire file from disk into memory. This problem can be solved only knowing about the following MPI routines, Get_size(), Get_rank(), scatter, and gather.

The Get_size():

Returns the number of processes in the communicator. It will return the same number to every process.

The Get_rank():

Determines the rank of the calling process in the communicator.

In MPI to each process is assigned a rank, that varies from 0 to N - 1, where N is the total number of processes running.

The scatter:

MPI_Scatter involves a designated root process sending data to all processes in a communicator. The primary difference between MPI_Bcast and MPI_Scatter is small but important. MPI_Bcast sends the same piece of data to all processes while MPI_Scatter sends chunks of an array to different processes.

and the gather:

MPI_Gather is the inverse of MPI_Scatter. Instead of spreading elements from one process to many processes, MPI_Gather takes elements from many processes and gathers them to one single process.

Obviously, you should first follow a tutorial and read the MPI documentation to understand its parallel programming model, and its routines. Otherwise, you will find it very hard to understand how it all works. That being said your code could look like the following:

from mpi4py import MPI

def myFun(x):
    return x+2 # simple example, the real one would be complicated

comm = MPI.COMM_WORLD
rank = comm.Get_rank() # get your process ID
data = # init the data    

if rank == 0: # The master is the only process that reads the file
    data = # something read from file

# Divide the data among processes
data = comm.scatter(data, root=0)

result = []
for item in data:
    result.append(myFun(item))

# Send the results back to the master processes
newData = comm.gather(result,root=0)

 

In this way, each process will work (in parallel) in only a certain chunk of the data. After having finish their work, each process send back to the master process their data chunks (i.e., comm.gather(result,root=0)). This is just a toy example, now it is up to you to improved according to your testing environment and code.


Solution 2:

You could either go the low-level MPI way as shown in the answer of @dreamcrash or you could go for a more Pythonic solution that uses an executor pool very similar to the one provided by the standard Python multiprocessing module.

First, you need to turn your code into a more functional-style one by noticing that you are actually doing a map operation, which applies myFun to each element of dat:

def myFun(x):
    return x+2 # simple example, the real one would be complicated

dat = [
    np.random.rand(3,2), np.random.rand(3,2), np.random.rand(3,2), np.random.rand(3,2)
] # real data would be much larger

result = map(myFun, dat)

map here runs sequentially in one Python interpreter process.

To run that map in parallel with the multiprocessing module, you only need to instantiate a Pool object and then call its map() method in place of the Python map() function:

from multiprocessing import Pool

def myFun(x):
    return x+2 # simple example, the real one would be complicated

if __name__ == '__main__':
    dat = [
        np.random.rand(3,2), np.random.rand(3,2), np.random.rand(3,2), np.random.rand(3,2)
    ] # real data would be much larger

    with Pool() as pool:
        result = pool.map(myFun, dat)

Here, Pool() creates a new executor pool with as many interpreter processes as there are logical CPUs as seen by the OS. Calling the map() method of the pool runs the mapping in parallel by sending items to the different processes in the pool and waiting for completion. Since the worker processes import the Python script as a module, it is important to have the code that was previously at the top level moved under the if __name__ == '__main__': conditional so it doesn't run in the workers too.

Using multiprocessing.Pool() is very convenient because it requires only a slight change of the original code and the module handles for you all the work scheduling and the required data movement to and from the worker processes. The problem with multiprocessing is that it only works on a single host. Fortunately, mpi4py provides a similar interface through the mpi4py.futures.MPIPoolExecutor class:

from mpi4py.futures import MPIPoolExecutor

def myFun(x):
    return x+2 # simple example, the real one would be complicated

if __name__ == '__main__':
    dat = [
        np.random.rand(3,2), np.random.rand(3,2), np.random.rand(3,2), np.random.rand(3,2)
    ] # real data would be much larger

    with MPIPoolExecutor() as pool:
        result = pool.map(myFun, dat)

Like with the Pool object from the multiprocessing module, the MPI pool executor handles for you all the work scheduling and data movement.

There are two ways to run the MPI program. The first one starts the script as an MPI singleton and then uses the MPI process control facility to spawn a child MPI job with all the pool workers:

mpiexec -n 1 python program.py

You also need to specify the MPI universe size (the total number of MPI ranks in both the main and all child jobs). The specific way of doing so differs between the implementations, so you need to consult your implementation's manual.

The second option is to launch directly the desired number of MPI ranks and have them execute the mpi4py.futures module itself with the script name as argument:

mpiexec -n 24 python -m mpi4py.futures program.py

Keep in mind that no mater which way you launch the script one MPI rank will be reserved for the controller and will not be running mapping tasks. You are aiming at running on 24 hosts, so you should be having plenty of CPU cores and can probably afford to have one reserved. Or you could instruct MPI to oversubscribe the first host with one more rank.

One thing to note with both multiprocessing.Pool and mpi4py.futures.MPIPoolExecutor is that the map() method guarantees the order of the items in the output array, but it doesn't guarantee the order in which the different items are evaluated. This shouldn't be a problem in most cases.


A word of advise. If your data is actually chunks read from a file, you may be tempted to do something like this:

if __name__ == '__main__':
   data = read_chunks()
   with MPIPoolExecutor() as p:
       result = p.map(myFun, data)

Don't do that. Instead, if possible, e.g., if enabled by the presence of a shared (and hopefully parallel) filesytem, delegate the reading to the workers:

NUM_CHUNKS = 100

def myFun(chunk_num):
    # You may need to pass the value of NUM_CHUNKS to read_chunk()
    # for it to be able to seek to the right position in the file
    data = read_chunk(NUM_CHUNKS, chunk_num)
    return ...

if __name__ == '__main__':
    chunk_nums = range(NUM_CHUNKS)  # 100 chunks
    with MPIPoolExecutor() as p:
        result = p.map(myFun, chunk_nums)

Post a Comment for "Python: How To Parallelizing A Simple Loop With MPI"