Skip to content Skip to sidebar Skip to footer

Trying To Implement 2 "threads" Using `asyncio` Module

I've played around with threading before in Python, but decided to give the asyncio module a try, especially since you can cancel a running task, which seemed like a nice detail. H

Solution 1:

Why current code is not working:

  • You're running event loop until self._supervisor() is complete. self._supervisor() creates task (it happens immediately) and finishes immediately.

  • You're trying to run event loop until _supervisor complete, but how and when are you going start server? I think event loop should be running until server stopped. _supervisor or other stuff can be added as task (to same event loop). aiohttp already has function to start server and event loop - web.run_app, but we can do it manually.

Your questions:

  1. Your server will run until you stop it. You can start/stop different coroutines while your server working.

  2. You need only one event loop for different coroutines.

  3. I think you don't need supervisor.

  4. More general question: asyncio helps you to run different functions parallel in single thread in single process. That's why asyncio is so cool and fast. Some of your sync code with threads you can rewrite using asyncio and it's coroutines. Moreover: asyncio can interact with threads and processes. It can be useful in case you still need threads and processes: here's example.

Useful notes:

  • It's better to use term coroutine instead of thread while we talk about asyncio coroutines that are not threads
  • If you use Python 3.5, you can use async/awaitsyntax instead of coroutine/yield from

I rewrote your code to show you idea. How to check it: run program, see console, open http://localhost:8080/stop, see console, open http://localhost:8080/start, see console, type CTRL+C.

import asyncio
import random
from contextlib import suppress

from aiohttp import web


classaiotest():
    def__init__(self):
        self._webapp = None
        self._d_task = None
        self.init_server()

    # SERVER:definit_server(self):
        app = web.Application()
        app.router.add_route('GET', '/start', self.start)
        app.router.add_route('GET', '/stop', self.stop)
        app.router.add_route('GET', '/kill_server', self.kill_server)
        self._webapp = app

    defrun_server(self):
        # Create server:
        loop = asyncio.get_event_loop()
        handler = self._webapp.make_handler()
        f = loop.create_server(handler, '0.0.0.0', 8080)
        srv = loop.run_until_complete(f)
        try:
            # Start downloader at server start:
            asyncio.async(self.start(None))  # I'm using controllers here and below to be short,# but it's better to split controller and start func# Start server:
            loop.run_forever()
        except KeyboardInterrupt:
            passfinally:
            # Stop downloader when server stopped:
            loop.run_until_complete(self.stop(None))
            # Cleanup resources:
            srv.close()
            loop.run_until_complete(srv.wait_closed())
            loop.run_until_complete(self._webapp.shutdown())
            loop.run_until_complete(handler.finish_connections(60.0))
            loop.run_until_complete(self._webapp.cleanup())
        loop.close()

    @asyncio.coroutinedefkill_server(self, request):
        print('Server killing...')
        loop = asyncio.get_event_loop()
        loop.stop()
        return web.Response(body=b"Server killed")

    # DOWNLOADER    @asyncio.coroutinedefstart(self, request):
        if self._d_task isNone:
            print('Downloader starting...')
            self._d_task = asyncio.async(self._downloader())
            return web.Response(body=b"Downloader started")
        else:
            return web.Response(body=b"Downloader already started")

    @asyncio.coroutinedefstop(self, request):
        if (self._d_task isnotNone) and (not self._d_task.cancelled()):
            print('Downloader stopping...')
            self._d_task.cancel()            
            # cancel() just say task it should be cancelled# to able task handle CancelledError await for itwith suppress(asyncio.CancelledError):
                yieldfrom self._d_task
            self._d_task = Nonereturn web.Response(body=b"Downloader stopped")
        else:
            return web.Response(body=b"Downloader already stopped or stopping")

    @asyncio.coroutinedef_downloader(self):
        whileTrue:
            print('Downloading and verifying file...')
            # Dummy sleep - to be replaced by actual codeyieldfrom asyncio.sleep(random.randint(1, 2))
            # Wait a predefined nr of seconds between downloadsyieldfrom asyncio.sleep(1)


if __name__ == '__main__':
    t = aiotest()
    t.run_server()

Post a Comment for "Trying To Implement 2 "threads" Using `asyncio` Module"