Celery for Internal API in SOA infrastructure

Posted:   |  More posts about python celery   |   Source

I should admit that I'm very slow. PyCon Russia in Yekaterinburg has ended almost a month ago, and only now I manage to publish the transcript of my talk.

Anyway, presentation from Slideshare is here.

Talk transcript is below.

What's the deal?

Suppose, you have a guy storming to you and saying: "I've just known about internets. It's sooo cool. I want you to create a clone of Facebook, ebay, internet bank, whatever. Quick. Now. I don't know yet what my customers will want from this service exactly. Timeline, integration with third party services, something else. I don't know how how many users will I have, if any. But I'm sure I will prosper. So maybe will you. Here is the pile of money for you to start off".

If for some reasons you agreed, you must think about the applications architecture. You must create something scalable and extensible. The best decision for most cases will be the service-oriented architecture, when we split our product to a number of relatively independent services interacting with each other with help of specific protocol.

The external part of the application is wrapped to a web-interface and shown to users. How it works behind the scenes, is usually isn't exposed to user and user isn't interested to details.

And the question is: how to organize inter-module communication better? On one hand, we must ensure that components are isolated and independent. On the other hand, our goal is to make communication as transparent to developers as possible.

There are protocols which were invented especially to solve this problem. Protocol names only make me feel scared. DCOM, which is too microsofish. CORBA which is too enterprisy. SOAP with XML inside. You can use REST-based interfaces, and so on.

I propose to consider Celery in this role and to see what happens from it.

Celery for API. Naive approach

From the point of view of the application developer, Celery is a library which helps you execute the code on the remote host, and then gets back the result or exception as easy as it would do locally. Actually, it's RPC on steroids, where steroids is asynchronous model, based on task queue usage. Clients push messages to the queue, workers pop them and execute them.

This is a classic way of using celery. It's a worker code.

from celery import Celery
celery = Celery(broker='redis://', backend='redis://')

def add(x, y):
    return x + y

We create an object of class Celery. It's an app. We export arbitrary function with the celery.task decorator

The role of decorator is dual:

  • For client, it turns a function to a fully fledged object with tons of useful methods, delay and apply_async among them. The goal of client is not to execute the function, but to create the task and to push it to the queue.
  • For celery worker the decorator registers the task and uses the function code as a task handler.

Then the client makes a task and enqueue it, by calling the task-name.delay method.

from worker import add
result = add.delay(1, 1).get()

The very same approach, which is mostly used to create separate "ad-hoc" tasks, can be used to create API for SOA, but it's not very convenient.


Because this approach makes a developer:

  • to install all dependencies of all components of the system on all nodes. It's even more complicated, when a worker installation requires extra settings for configuration, as with Django
  • to import tasks along with their implementation and all their dependencies. It make the process grow in size, and so on.

Moreover, you cannot outsource some parts of your job without revealing the whole code to the 3rd party developer.

And after all. This approach makes William Occam sad. It's always a bad idea to irritate him, because, you know, of a razor he has.

What can we do to make the situation better? I see at least three ways to fix it

Celery for API. Variant 1. Step one level down

In fact, all the broker passes to the queue, is function name and a list of parameters. And the celery object has a send_task method to make it.

from celery import Celery
celery = Celery(broker='redis://',  backend='redis://')
celery.send_task('add', args=(1, 1)).get()

It's good, but it's not very convenient. And you lose the ability to execute canvas.

Celery for API. Variant 2. Create pseudo-tasks

To remind you, that a task decorator converts a function to the Task object. We may create the same object manually. All we have to do is to define its "name" property right after creation.

from celery import Celery
celery = Celery(broker='redis://', backend='redis://')

add = celery.Task()
add.name = 'add'

# just a simple delay
add.delay(1, 1).get()
# chain of tasks
chain = add.s(1, 1) | add.s(1) | add.s(1)
# return 4

As seen from example, this "pseudo-task" can execute all functions of the ordinary task, and also can participate in chains, groups, etc.

Celery for API. Variant 3. Introspection

One minor inconvenience remains. You have to create tasks manually. But it turns out that we can overcome this inconvenience too, because celery provides a feature of introspection to us.

You can use celery inspect command set to get the data. The very same can be done from Celery API.

Especially for the pycon I wrote the module celery-api, which does the introspection.

It's available at http://github.com/imankulov/celery-api.

It asks the worker, which queues it listens, and which tasks it knows how to execute. Then it builds a tree of attributes, where leaf nodes are task objects.

It's important to note, that because the module asks workers, if there is no worker is launched, no task will be found.

API routing

When you send a task via the API, any API, you always specify its destination, a node which must execute it. A typical case to make it work is via DNS hostname. But what should play a role of such an identifier in our celery-based infrastructure?

My obvious proposal is to use queue names.

Well, in order to launch the endpoint, we should 1) define its queue name 2) define a list of tasks it can execute. Name of the queue is defined when the worker is launched. A list of tasks can be defined with the CELERY_IMPORT option, or directly by importing all required modules from the main.

We may push the task to a queue explicitly, by defining a queue parameter in the apply_async method.

add.apply_async(queue='foo', args=(1, 1))

It's not very handy. We repeat ourselves, and using delay instead of apply_async is more convenient.

Alternatively, we may use the CELERY_ROUTES instead. There we define a list of classes to perform the routing.

CELERY_ROUTES = ('myapp.routers.MyRouter', )

class MyRouter(object):

    def route_for_task(self, task, args=None, kwargs=None):
        return {
            'exchange': '...',
            'exchange_type': '...',
            'routing_key': '...',

Every element of CELERY_ROUTES is a class name. The class should have at least one method: route_for_task. This method accepts the name of the task, args and kwargs, but it returns... something else instead of the queue which we would expect.

As you see from the example, instead of clean and obvious queue name, the route_for_task returns exchange, exchange_type and a routing_key. What does it means, and how it's converted to a queue name?

Below is the contents of the tasks, when it's in the queue.

    "body": "<base64 encoded string with task name and args>",
    "properties": {
        "body_encoding": "base64",
        "delivery_info": {
            "priority": 0, "routing_key": "foo", "exchange": "foo"
       "delivery_mode": 2,
       "delivery_tag": UUID

The name of the task, along with its arguments is stored in the body, but currently we're mostly interested in element named delivery_info.

As you see, there is no information about the queue in the task. Instead, there is a routing key and exchange. Those who are familiar with AMQP protocol, got it already...

Matching between these two parameters and queue name is defined by the internal configuration of the broker. Inside every broker has one or more exchanges.

A message is passed to the exchange and has a routing key. Exchange is connected with several queues, and has rules which match routing key and the queue name. In the simplest case (Direct Exchange) it's just a one-to-one table. A message will be passed to one of the workers listening for the queue.

Broker configuration is also described in the celery configuration. Below there is an example of such a configuration.


    Queue('foo', routing_key='foo'),
    Queue('foo', routing_key='bar'),
    Queue('foo', routing_key='baz'),

There is one exchange of direct type, having three queues connected. For the sake of simplicity, queue names are the same as routing keys, but generally speaking they may be different.


In short about the advantages of Celery as the API media:

  1. The protocol is high level. It takes care of serialization, routing, exception handling and so forth.
  2. Tons of brokers are supported. You may switch between brokers with no significant changes in your code.
  3. Tons of features you were afraid ro dream of from the simple API media: asynchronous execution, parallel execution, asynchronous parallel execution, delayed execution, throttling, automatic retrying of failed tasks, etc.

There are some drawback, for sure.

  1. Celery is python-centric and it's supposed that it will be used in the python-based infrastructure. Sure, you may use subprocess or jython, but as for me, it doesn't look very natural.
  2. Increasing system complexity. If your application is not very big and complex, extra burden to support extra library can be annoying. Celery application is quite complex, non-trivial, and the documentation about its innards is scarce.

Final warning note

Never ever expose Celery broker to the world and don't try to use it for external API purposes. Celery is not conceived with this in mind, and it doesn't have corresponding code to ensure you're safe in this situation.

Comments powered by Disqus
Contents © 2013 Roman Imankulov - Powered by Nikola