Skip to content Skip to sidebar Skip to footer

Initializing A Worker With Arguments Using Celery

I'm having issues finding something that seems like it would be relatively simple to me. I'm using Celery 3.1 with Python 3 and am wanting to initialize my workers with arguments

Solution 1:

I would suggest using an abstract task base class and caching the requests.session.

From the Celery docs:

A task is not instantiated for every request, but is registered in the task registry as a global instance.

This means that the __init__ constructor will only be called once per process, and that the task class is semantically closer to an Actor.

This can also be useful to cache resources...

import requests
from celery import Task

classAPITask(Task):
    """API requests task class."""

    abstract = True# the cached requests.session object
    _session = Nonedef__init__(self):
        # since this class is instantiated once, use this method# to initialize and cache resources like a requests.session# or use a property like the example below which will create# a requests.session only the first time it's accessed    @propertydefsession(self):
        if self._session isNone:
            # store the session object for the first time
            session = requests.Session()
            session.auth = ('user', 'pass')

            self._session = session

        return self._session

Now when you create the tasks that will make API requests:

@app.task(base=APITask, bind=True)defcall_api(self, url):
    # self will refer to the task instance (because we're using bind=True)
    self.session.get(url)

Also you can pass the API authentication options using the app.task decorator as an extra argument which will be set on the __dict__ of the task, for example:

# pass a custom auth argument@app.task(base=APITask, bind=True, auth=('user', 'pass'))defcall_api(self, url):
    pass

And make the base class use the passed authentication options:

classAPITask(Task):
    """API requests task class."""

    abstract = True# the cached requests.session object
    _session = None# the API authentication
   auth = ()

    @propertydefsession(self):
        if self._session isNone:
            # store the session object for the first time
            session = requests.Session()
            # use the authentication that was passed to the task
            session.auth = self.auth

            self._session = session

        return self._session

You can read more on the Celery docs site:

Now back to your original question which is passing extra arguments to the worker from the command line:

There is a section about this in the Celery docs Adding new command-line options, here's an example of passing a username and a password to the worker from the command line:

$ celery worker -A appname --username user --password pass

The code:

from celery import bootsteps
from celery.binimport Option


app.user_options['worker'].add(
    Option('--username', dest='api_username', default=None, help='API username.')
)

app.user_options['worker'].add(
    Option('--password', dest='api_password', default=None, help='API password.')
)


classCustomArgs(bootsteps.Step):

    def__init__(self, worker, api_username, api_password, **options):
        # store the api authentication
        APITask.auth = (api_username, api_password)


app.steps['worker'].add(CustomArgs)

Solution 2:

I would think you could call the script you wrote using command line arguments. Something like the following:

my_script.py username password

Inside your script, you can have your main function wrapped in an @celery.task or @app.task decorator.

import sys

from celery import Celery

cel = Celery() # put whatever config info you need in here@celery.taskdefmain():
    username, password = sys.argv[1], sys.argv[2]

Something like that should get you started. Be sure to also check out Python's argparse for more sophisticated argument parsing.

Post a Comment for "Initializing A Worker With Arguments Using Celery"