Skip to content Skip to sidebar Skip to footer

Fixing The Issue Where Upon Calculating How Frequently A Function Is Called During Multiprocessing It Returns A Negative Value

I have a function foo() which might be accessed by multiple worker processes concurrently. This function blocks until an output is ready, and then returns it. A sample foo is below

Solution 1:

Update

For good measure you should probably be ensuring that foo is updating the call_rate dictionary under a Lock instance to handle concurrent access now that you are running multiple processes. But the real problem is that value last_call needs to be maintained for each process and cannot be shared among the processes.

This solution uses a managed class, WorkerManager that is able to keep track of all the created processes as long as method init_process is called for each process passing its process id immediately after it's started as in the code below. Then all a worker function has to do is call method update_statistics passing the wait times for each of its requests it is processing. A call to get_statistics will return the statistics.

import random
import time
from time import sleep
from multiprocessing import Manager, Process, Lock, current_process
from multiprocessing.managers import BaseManager

classWorkerManager:
    def__init__(self):
        self._total_calls = 0
        self._total_time = 0.0
        self._rate = 0.0
        self._lock = Lock()
        self._call_times = {}

    definit_process(self, pid):
        self._call_times[pid] = time.time()

    defupdate_statistics(self, pid, wait_time):
        now = time.time()
        time_elapsed = now - self._call_times[pid]
        execution_time = time_elapsed - wait_time
        self._call_times[pid] = now
        with self._lock:
            self._total_calls += 1
            self._total_time += execution_time
            self._rate = self._total_time / (self._total_calls * len(self._call_times))

    defget_statistics(self):
        return {'rate': self._rate, 'total_time': self._total_time, 'total_calls': self._total_calls}

classWorkerManagerManager(BaseManager):
    pass

WorkerManagerManager.register('WorkerManager', WorkerManager)


deffoo(worker_manager):
    enter_time = time.time()
    # Mimic blocking of function
    sleep(random.randint(1, 3))

    output = 'result of some logic'

    wait_time = time.time() - enter_time

    pid = current_process().pid
    worker_manager.update_statistics(pid, wait_time)

    return output

defworker(worker_manager, num):
    for _ inrange(num):
        # Mimic doing some logic before asking output
        sleep(1)

        foo(worker_manager)

if __name__ == '__main__':
    with WorkerManagerManager() as m:
        worker_manager = m.WorkerManager()
        processes = [Process(target=worker, args=(worker_manager, 3)) for _ inrange(3)]
        for p in processes:
            p.start()
            worker_manager.init_process(p.pid)
        for p in processes:
            p.join()
        statistics = worker_manager.get_statistics()
        print('foo called once every {}s'.format(statistics['rate']))

Prints:

foo called once every0.34751895621970846s

How to use a process pool

And if you want to use a process pool, this is how you might use a pool size of 3 to submit 6 tasks:

import random
import time
from time import sleep
from multiprocessing import Manager, Pool, Lock, current_process
from multiprocessing.managers import BaseManager
from functools import partial

classWorkerManager:
    def__init__(self):
        self._total_calls = 0
        self._total_time = 0.0
        self._rate = 0.0
        self._lock = Lock()
        self._call_times = {}

    definit_process(self, pid):
        self._call_times[pid] = time.time()

    defupdate_statistics(self, pid, wait_time):
        now = time.time()
        time_elapsed = now - self._call_times[pid]
        execution_time = time_elapsed - wait_time
        self._call_times[pid] = now
        with self._lock:
            self._total_calls += 1
            self._total_time += execution_time
            self._rate = self._total_time / (self._total_calls * len(self._call_times))

    defget_statistics(self):
        return {'rate': self._rate, 'total_time': self._total_time, 'total_calls': self._total_calls}

classWorkerManagerManager(BaseManager):
    pass

WorkerManagerManager.register('WorkerManager', WorkerManager)


defpool_init(worker_manager):
    worker_manager.init_process(current_process().pid)

deffoo(worker_manager):
    enter_time = time.time()
    # Mimic blocking of function
    sleep(random.randint(1, 3))

    output = 'result of some logic'

    wait_time = time.time() - enter_time

    pid = current_process().pid
    worker_manager.update_statistics(pid, wait_time)

    return output

defworker(worker_manager, num):
    for _ inrange(num):
        # Mimic doing some logic before asking output
        sleep(1)

        foo(worker_manager)

if __name__ == '__main__':
    with WorkerManagerManager() as m:
        worker_manager = m.WorkerManager()
        pool = Pool(3, initializer=pool_init, initargs=(worker_manager,))
        # run 6 tasks
        pool.map(partial(worker, worker_manager), range(6))
        statistics = worker_manager.get_statistics()
        print('foo called once every {}s'.format(statistics['rate']))

Prints:

foo called once every0.333592324786716s

Solution 2:

I found a way to do it without asking for the number of workers running:

import random
import time
from time import sleep
from multiprocessing import Manager, Process, Lock

deffoo(call_rate, lock):
    # Shift this to the start of the functionwith lock:
        call_rate['total_time'] += time.time() - call_rate['last_call']
        call_rate['last_call'] = time.time()
        call_rate['total_calls'] += 1
        call_rate['rate'] = call_rate['total_time'] / call_rate['total_calls']
    
    # Mimic blocking of function
    sleep(random.randint(1, 3))

    output = 'result of some logic'# By doing this, we are ignoring the time spent within the functionwith lock:
        call_rate['last_call'] = time.time()
    return output

defworker(num, call_rate, lock):
    for _ inrange(num):
        # Mimic doing some logic before asking output
        sleep(1)

        foo(call_rate, lock)

if __name__ == '__main__':
    # Create a shared dictionary accessible by all processes
    m = Manager()
    lock = m.Lock()
    call_rate = m.dict({'rate': 0.0, 'total_time': 0.0, 'last_call': time.time(), 'total_calls': 0})

    w = []

    # Create 3 worker processes that run foo() thrice for i inrange(3):
        w.append(Process(target=worker, args=(3, call_rate, lock, )))
        w[i].start()
    for i inrange(3):
        w[i].join()

    # Output: 0.354s print('foo called once every {}s'.format(call_rate['rate']))

I will explain why this works. In the original code, the last call time was recorded AFTER the function had blocked. This meant that the time spent in the function need to be subtracted. But, as @Booboo had already pointed out in the comment to their answer, this was problematic because there maybe multiple workers running and we can't just subtract the waiting time EACH worker spends in the function.

A simple workaround to this is to record the last call time at the start of the function, where the time spent within the function has not yet been added. But it still doesn't solve the broader problem because the next time foo() will be called from the worker, it will include the time spent within the function from the last call, leaving us at square one again. But this, and I don't know why I didn't see this before, can be fixed very simply; by adding this line just before the function exits:

call_rate['last_call'] = time.time()

This makes sure that when the function exits, the last call is refreshed such that it seems the worker did not spend any time in the function at all. This approach does not require subtracting anything and thats why it works.

I did a test where I ran this 10 times and calculated some statistics using the code below:

import random
import time
from time import sleep
from multiprocessing import Manager, Process, Lock
import statistics


deffoo(call_rate, lock):
    with lock:
        call_rate['total_time'] += time.time() - call_rate['last_call']
        call_rate['last_call'] = time.time()
        call_rate['total_calls'] += 1
        call_rate['rate'] = call_rate['total_time'] / call_rate['total_calls']
    # Mimic blocking of function
    sleep(2)

    output = 'result of some logic'# By doing this, we are ignoring the time spent within the functionwith lock:
        call_rate['last_call'] = time.time()
    return output

defworker(num, call_rate, lock):
    for _ inrange(num):
        # Mimic doing some logic before asking output
        sleep(1)

        foo(call_rate, lock)

defmain():
    # Create a shared dictionary accessible by all processes
    m = Manager()
    lock = m.Lock()
    call_rate = m.dict({'rate': 0.0, 'total_time': 0.0, 'last_call': time.time(), 'total_calls': 0})

    w = []

    # Create 3 worker processes that run foo() thricefor i inrange(3):
        w.append(Process(target=worker, args=(3, call_rate, lock, )))
        w[i].start()
    for i inrange(3):
        w[i].join()

    return call_rate['rate']

if __name__ == '__main__':
    avgs = []
    for i inrange(10):
        avgs.append(main())

    print("Highest is : {}".format(max(avgs)))
    print("Lowest is : {}".format(min(avgs)))
    print("Avergae is : {}".format(statistics.mean(avgs)))

This outputs:

Highest is : 0.35980285538567436
Lowest is : 0.3536567423078749
Avergae is : 0.356808172331916

As a 'proof' that the above code does ignore the time spent within the function, you can make the function block for a larger time, say 15s, and the output will still be approximately the same.

Update

The reason why the frequency is not 0.3s when the function blocks for a varying time has to do with when the workers enter and exit foo(). Consider the code below where two workers are run once which execute foo() twice and output call_rate every enter and exit of foo() along with a unique id to identify the worker:

import random
import time
from time import sleep
from multiprocessing import Manager, Process, Lock
import statistics
import string

deffoo(call_rate, lock, id):
    with lock:
        call_rate['total_time'] += time.time() - call_rate['last_call']
        call_rate['last_call'] = time.time()
        call_rate['total_calls'] += 1
        call_rate['rate'] = call_rate['total_time'] / call_rate['total_calls']
        print("{} entered, call rate {}".format(id, call_rate))
    # Mimic blocking of function
    sleep(1)

    output = 'result of some logic'# By doing this, we are ignoring the time spent within the functionwith lock:
        call_rate['last_call'] = time.time()
        print("{} exited, call rate {}".format(id, call_rate))
    return output


defid_generator(size=6, chars=string.ascii_uppercase + string.digits):
    return''.join(random.choice(chars) for _ inrange(size))


defworker(num, call_rate, lock):
    id = id_generator()
    for _ inrange(num):
        # Mimic doing some logic before asking output
        sleep(1)

        foo(call_rate, lock, id)

defmain():
    # Create a shared dictionary accessible by all processes
    m = Manager()
    lock = m.Lock()
    call_rate = m.dict({'rate': 0.0, 'total_time': 0.0, 'last_call': time.time(), 'total_calls': 0})

    w = []

    # Create 3 worker processes that run foo() thricefor i inrange(2):
        w.append(Process(target=worker, args=(2, call_rate, lock, )))
        w[i].start()
    for i inrange(2):
        w[i].join()

    return call_rate['rate']

if __name__ == '__main__':
    avgs = []
    for i inrange(1):
        avgs.append(main())

    print("Highest is : {}".format(max(avgs)))
    print("Lowest is : {}".format(min(avgs)))
    print("Avergae is : {}".format(statistics.mean(avgs)))

Note that in this code, foo() always blocks for 1s. The rate should be close to 0.5s since there are two workers present. Running this code:

Output #1:

XEC6AU entered, call rate {'rate': 1.1851444244384766, 'total_time': 1.1851444244384766, 'last_call': 1624950732.381014, 'total_calls': 1}
O43FUI entered, call rate {'rate': 0.6178374290466309, 'total_time': 1.2356748580932617, 'last_call': 1624950732.4325447, 'total_calls': 2}
XEC6AU exited, call rate {'rate': 0.6178374290466309, 'total_time': 1.2356748580932617, 'last_call': 1624950733.4327667, 'total_calls': 2}
O43FUI exited, call rate {'rate': 0.6178374290466309, 'total_time': 1.2356748580932617, 'last_call': 1624950733.4484024, 'total_calls': 2}
XEC6AU entered, call rate {'rate': 0.7401185035705566, 'total_time': 2.22035551071167, 'last_call': 1624950734.433083, 'total_calls': 3}
O43FUI entered, call rate {'rate': 0.558994710445404, 'total_time': 2.235978841781616, 'last_call': 1624950734.4487064, 'total_calls': 4}
XEC6AU exited, call rate {'rate': 0.558994710445404, 'total_time': 2.235978841781616, 'last_call': 1624950735.4333804, 'total_calls': 4}
O43FUI exited, call rate {'rate': 0.558994710445404, 'total_time': 2.235978841781616, 'last_call': 1624950735.4958992, 'total_calls': 4}
Highest is : 0.558994710445404
Lowest is : 0.558994710445404
Avergae is : 0.558994710445404

The rate is 0.5s, which should be expected. Notice how both the workers enter and exit the functions simultaneously. Now after changing the function blocking time from 1s to random.randint(1, 10), this is what I get:

Output #2

NHXAKF entered, call rate {'rate': 1.1722326278686523, 'total_time': 1.1722326278686523, 'last_call': 1624886294.4630196, 'total_calls': 1}
R2DD8H entered, call rate {'rate': 0.5939309597015381, 'total_time': 1.1878619194030762, 'last_call': 1624886294.478649, 'total_calls': 2}
NHXAKF exited, call rate {'rate': 0.5939309597015381, 'total_time': 1.1878619194030762, 'last_call': 1624886300.4648588, 'total_calls': 2}
NHXAKF entered, call rate {'rate': 0.7293914159138998, 'total_time': 2.188174247741699, 'last_call': 1624886301.465171, 'total_calls': 3}
R2DD8H exited, call rate {'rate': 0.7293914159138998, 'total_time': 2.188174247741699, 'last_call': 1624886302.4811018, 'total_calls': 3}
R2DD8H entered, call rate {'rate': 0.7971136569976807, 'total_time': 3.1884546279907227, 'last_call': 1624886303.4813821, 'total_calls': 4}
NHXAKF exited, call rate {'rate': 0.7971136569976807, 'total_time': 3.1884546279907227, 'last_call': 1624886304.4660738, 'total_calls': 4}
R2DD8H exited, call rate {'rate': 0.7971136569976807, 'total_time': 3.1884546279907227, 'last_call': 1624886307.4826, 'total_calls': 4}
Highest is : 0.7971136569976807
Lowest is : 0.7971136569976807
Avergae is : 0.7971136569976807

The rate, unlike before, is almost 0.8. Moreover, both workers are no longer entering and exiting the function together either. This is ofcourse due to one blocking for a longer time than the other. But because they are no longer in sync, they are waiting for 1s at separate times instead of together inside of the worker() function. You can even see that in the call_rate['total_time']. For Output #1, where the workers are in sync, it is ~2s, while for Output #2 it is ~3s. And hence the difference in rates. So the 0.8s is the true rate of the workers calling foo() in this scenario, not the assumed 0.5s. Multiplying the rate by the number of processes would miss this nuance.

Post a Comment for "Fixing The Issue Where Upon Calculating How Frequently A Function Is Called During Multiprocessing It Returns A Negative Value"