Skip to content Skip to sidebar Skip to footer

Dynamically Create A List Of Shared Arrays Using Python Multiprocessing

I'd like to share several numpy arrays between different child processes with python's multiprocessing module. I'd like the arrays to be separately lockable, and I'd like the numbe

Solution 1:

Turns out this was easier than I thought! Following J.F. Sebastian's encouragement, here's my crack at an answer:

import time
import ctypes
import logging
import Queue
import multiprocessing as mp
import numpy as np

info = mp.get_logger().info

defmain():
    logger = mp.log_to_stderr()
    logger.setLevel(logging.INFO)

    data_pipeline = Image_Data_Pipeline(
        num_data_buffers=5,
        buffer_shape=(60, 256, 512))
    start = time.clock()
    data_pipeline.load_buffers(data_pipeline.num_data_buffers)
    end = time.clock()
    data_pipeline.close()
    print"Elapsed time:", end-start


classImage_Data_Pipeline:
    def__init__(self, num_data_buffers, buffer_shape):
        """
        Allocate a bunch of 16-bit buffers for image data
        """
        self.num_data_buffers = num_data_buffers
        self.buffer_shape = buffer_shape
        pix_per_buf = np.prod(buffer_shape)
        self.data_buffers = [mp.Array(ctypes.c_uint16, pix_per_buf)
                             for b inrange(num_data_buffers)]
        self.idle_data_buffers = range(num_data_buffers)

        """
        Launch the child processes that make up the pipeline
        """
        self.camera = Data_Pipeline_Process(
            target=child_process, name='Camera',
            data_buffers=self.data_buffers, buffer_shape=buffer_shape)
        self.display_prep = Data_Pipeline_Process(
            target=child_process, name='Display Prep',
            data_buffers=self.data_buffers, buffer_shape=buffer_shape,
            input_queue=self.camera.output_queue)
        self.file_saving = Data_Pipeline_Process(
            target=child_process, name='File Saving',
            data_buffers=self.data_buffers, buffer_shape=buffer_shape,
            input_queue=self.display_prep.output_queue)
        returnNonedefload_buffers(self, N, timeout=0):
        """
        Feed the pipe!
        """for i inrange(N):
            self.camera.input_queue.put(self.idle_data_buffers.pop())

        """
        Wait for the buffers to idle. Here would be a fine place to
        feed them back to the pipeline, too.
        """whileTrue:
            try:
                self.idle_data_buffers.append(
                    self.file_saving.output_queue.get_nowait())
                info("Buffer %i idle"%(self.idle_data_buffers[-1]))
            except Queue.Empty:
                time.sleep(0.01)
            iflen(self.idle_data_buffers) >= self.num_data_buffers:
                breakreturnNonedefclose(self):
        self.camera.input_queue.put(None)
        self.display_prep.input_queue.put(None)
        self.file_saving.input_queue.put(None)
        self.camera.child.join()
        self.display_prep.child.join()
        self.file_saving.child.join()


classData_Pipeline_Process:
    def__init__(
        self,
        target,
        name,
        data_buffers,
        buffer_shape,
        input_queue=None,
        output_queue=None,
        ):
        if input_queue isNone:
            self.input_queue = mp.Queue()
        else:
            self.input_queue = input_queue

        if output_queue isNone:
            self.output_queue = mp.Queue()
        else:
            self.output_queue = output_queue

        self.command_pipe = mp.Pipe() #For later, we'll send instrument commands

        self.child = mp.Process(
            target=target,
            args=(name, data_buffers, buffer_shape,
                  self.input_queue, self.output_queue, self.command_pipe),
            name=name)
        self.child.start()
        returnNonedefchild_process(
    name,
    data_buffers,
    buffer_shape,
    input_queue,
    output_queue,
    command_pipe):
    if name == 'Display Prep':
        display_buffer = np.empty(buffer_shape, dtype=np.uint16)
    whileTrue:
        try:
            process_me = input_queue.get_nowait()
        except Queue.Empty:
            time.sleep(0.01)
            continueif process_me isNone:
            break#We're doneelse:
            info("start buffer %i"%(process_me))
            with data_buffers[process_me].get_lock():
                a = np.frombuffer(data_buffers[process_me].get_obj(),
                                  dtype=np.uint16)
                if name == 'Camera':
                    """
                    Fill the buffer with data (eventually, from the
                    camera, dummy data for now)
                    """
                    a.fill(1)
                elif name == 'Display Prep':
                    """
                    Process the 16-bit image into a display-ready
                    8-bit image. Fow now, just copy the data to a
                    similar buffer.
                    """
                    display_buffer[:] = a.reshape(buffer_shape)
                elif name == 'File Saving':
                    """
                    Save the data to disk.
                    """
                    a.tofile('out.raw')
            info("end buffer %i"%(process_me))
            output_queue.put(process_me)
    returnNoneif __name__ == '__main__':
    main()

Background: This is the skeleton of a data-acquisition pipeline. I want to acquire data at a very high rate, process it for on-screen display, and save it to disk. I don't ever want display rate or disk rate to limit acquisition, which is why I think using separate child processes in individual processing loops is appropriate.

Here's typical output of the dummy program:

C:\code\instrument_control>c:\Python27\python.exe test.py
[INFO/MainProcess] allocating a new mmap of length 15728640
[INFO/MainProcess] allocating a new mmap of length 15728640
[INFO/MainProcess] allocating a new mmap of length 15728640
[INFO/MainProcess] allocating a new mmap of length 15728640
[INFO/MainProcess] allocating a new mmap of length 15728640
[[INFO/Camera] child process calling self.run()
INFO/Display Prep] child process calling self.run()
[INFO/Camera] start buffer 4
[INFO/File Saving] child process calling self.run()
[INFO/Camera] end buffer 4
[INFO/Camera] start buffer 3
[INFO/Camera] end buffer 3
[INFO/Camera] start buffer 2
[INFO/Display Prep] start buffer 4
[INFO/Camera] end buffer 2
[INFO/Camera] start buffer 1
[INFO/Camera] end buffer 1
[INFO/Camera] start buffer 0
[INFO/Camera] end buffer 0
[INFO/Display Prep] end buffer 4
[INFO/Display Prep] start buffer 3
[INFO/File Saving] start buffer 4
[INFO/Display Prep] end buffer 3
[INFO/Display Prep] start buffer 2
[INFO/File Saving] end buffer 4
[INFO/File Saving] start buffer 3
[INFO/MainProcess] Buffer 4 idle
[INFO/Display Prep] end buffer 2
[INFO/Display Prep] start buffer 1
[INFO/File Saving] end buffer 3
[INFO/File Saving] start buffer 2
[INFO/MainProcess] Buffer 3 idle
[INFO/Display Prep] end buffer 1
[INFO/Display Prep] start buffer 0
[INFO/File Saving] end buffer 2
[INFO/File Saving] start buffer 1
[[INFO/MainProcess] Buffer 2 idle
INFO/Display Prep] end buffer 0
[INFO/File Saving] end buffer 1
[INFO/File Saving] start buffer 0
[INFO/MainProcess] Buffer 1 idle
[INFO/File Saving] end buffer 0
[INFO/MainProcess] Buffer 0 idle
[INFO/Camera] process shutting down
[INFO/Camera] process exiting with exitcode 0
[INFO/Display Prep] process shutting down
[INFO/File Saving] process shutting down
[INFO/Display Prep] process exiting with exitcode 0
[INFO/File Saving] process exiting with exitcode 0
Elapsed time: 0.263240348548
[INFO/MainProcess] process shutting down

C:\code\instrument_control>

It seems to do what I want: the data gets processed for display and saved to disk without interfering with the acquisition rate.

Post a Comment for "Dynamically Create A List Of Shared Arrays Using Python Multiprocessing"