Skip to content Skip to sidebar Skip to footer

Python Parallelising "async For"

I have the following method in my Tornado handler: async def get(self): url = 'url here' try: async for batch in downloader.fetch(url): self.w

Solution 1:

How can I modify what I am doing in order to be able to yield from multiple coroutines in parallel?

You need a function that merges two async sequences into one, iterating over both in parallel and yielding elements from one or the other, as they become available. While such a function is not included in the current standard library, you can find one in the aiostream package.

You can also write your own merge function, as shown in this answer:

async def merge(*iterables):
    iter_next = {it.__aiter__(): None for it in iterables}
    while iter_next:
        for it, it_next in iter_next.items():
            if it_next is None:
                fut = asyncio.ensure_future(it.__anext__())
                fut._orig_iter = it
                iter_next[it] = fut
        done, _ = await asyncio.wait(iter_next.values(),
                                     return_when=asyncio.FIRST_COMPLETED)
        for fut in done:
            iter_next[fut._orig_iter] = None
            try:
                ret = fut.result()
            except StopAsyncIteration:
                del iter_next[fut._orig_iter]
                continue
            yield ret

Using that function, the loop would look like this:

async for batch in merge(downloader.fetch(url1), downloader.fetch(url2)):
    ....

Solution 2:

Edit: As mentioned in the comment, below method does not execute given routines in parallel.

Checkout aitertools library.

import asyncio
import aitertools

async def f1():
    await asyncio.sleep(5)
    yield 1

async def f2():
    await asyncio.sleep(6)
    yield 2

async def iter_funcs():
    async for x in aitertools.chain(f2(), f1()):
        print(x)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(iter_funcs())

It seems that, functions being iterated must be couroutine.


Post a Comment for "Python Parallelising "async For""