flynn.gg

Christopher Flynn

Machine Learning
Systems Architect,
PhD Mathematician

Home
Projects
Open Source
Blog
Résumé

GitHub
LinkedIn

Blog


Integrating Celery tasks with Slack

2017-10-26 Feed

A few months ago for work I set up an instance running Celery, which is a distributed task queue written in Python. Its purpose is to handle cron jobs related to operational and analytics tasks as part of our applications’ data pipeline, automated report generation, and data science related analysis.

Also available is a companion Python package known as Flower (as in monitoring the flow of tasks) which is a web tool used to monitor Celery workers and the tasks that they process. It’s a fantastic package with a clean UI and even comes with implementations of GitHub OAuth. I’ve been using Flower (screenshots) alongside Celery since setting it up but I’ve been looking for a better way to monitor the flow of jobs.

I started playing around with the Slack API recently and realized that I could create an integration for Celery to interface with a #celery slack channel. Surprisingly, the process was relatively painless. Here’s how I did it.

Create the Slack application

The first step is to create the Slack application, which can be accomplished by following the steps here. I called mine Celery and attached it to our Slack organization.

You’ll be taken to a page which shows Basic Information about your Celery app, including credentials, display information, and authorization settings. For this application we’ll need to create Incoming Webhooks, that will allow us to write messages to a Slack channel using HTTP POST requests.

In Slack create a new channel called #celery. On the app info page under Features and Functionality, click Incoming Webhooks. On the webhooks page, click the button to Add New Webhook to Workspace. You’ll be prompted to select a slack channel (#celery) to associate with the webhook. It should look something like this, with random values in place of XYZs:

https://hooks.slack.com/services/XXXXXXXXX/YYYYYYYYY/ZZZZZZZZZZZZZZZZZZZZZZZZ

You should keep this address a secret (i.e. don’t commit to your codebase). This is where we’ll be sending our POST requests to write to the #celery channel.

Patching Celery’s Task objects

If you’ve created a standard Celery project, then you probably have a celery.py file that looks (not exactly) like this:

"""The celery app."""
from celery import Celery


app = Celery('schedule')
config_object = 'celery.config'
app.config_from_object(config_object)


if __name__ == '__main__':
    app.start()

What we want to do here is have Celery post to the #celery channel the results of each task that is executed. After doing some digging I found that Celery’s Task objects have on_success() and on_failure() handlers (and a few others) that are unused by default. We can patch the methods of Task class by overriding them when Celery starts up in celery.py.

The following snippet includes a slack posting utility function using the requests library, and overriding callback functions for success and failure of tasks. More details on the message and attachments API can be found here

import requests
import celery
from celery import Celery


# This should really be imported from a secret file instead of written out.
SLACK_WEBHOOK = 'https://hooks.slack.com/services/XXXXXXXXX/YYYYYYYYY/ZZZZZZZZZZZZZZZZZZZZZZZZ'


def post_to_slack(text, attachment=None):
    """Post a message to the slack channel."""
    payload = {'text': text}
    if attachment is not None:
        payload.update(attachment)
    result = requests.post(SLACK_WEBHOOK, json=payload)
    return result


def slack_on_success_callback(self, retval, task_id, args, kwargs):
    """Post a success to slack as a callback for tasks."""
    message = self.name + ' success.'
    success = '\n'.join([
        'Name: *' + self.name + '*',
        'Task ID: ' + task_id,
        'args: ' + str(args),
        'kwargs: ' + str(kwargs),
        'Return: ' + str(retval),
    ])
    attachment = {
        "attachments": [
            {
                "fallback": message,
                "color": "#36a64f",
                "text": success,
                "title": message,
                "title_link": 'https://flower.example.com/task/{tid}'.format(tid=task_id),
                "mrkdwn_in": ["text"]
            }
        ],
        "text": ''
    }
    post_to_slack('', attachment)


def slack_on_failure_callback(self, exc, task_id, args, kwargs, einfo):
    """Post a failure message to slack as a callback for tasks."""
    message = self.name + ' failure.'
    failure = '\n'.join([
        'Name: *' + self.name + '*',
        'Task ID: ' + task_id,
        'args: ' + str(args),
        'kwargs: ' + str(kwargs),
        'Exception: ' + str(exc),
        'Info: ' + str(einfo),
    ])
    attachment = {
        "attachments": [
            {
                "fallback": message,
                "color": "#D00001",
                "text": failure,
                "title": message,
                "title_link": 'https://flower.example.com/task/{tid}'.format(tid=task_id),
                "mrkdwn_in": ["text"]
            }
        ],
        "text": ''
    }
    post_to_slack('', attachment)


app = Celery('schedule')

config_object = 'celery.config'
app.config_from_object(config_object)

# Patch the Task handlers with our Slack post functions
app.Task.on_success = slack_on_success_callback
app.Task.on_failure = slack_on_failure_callback


if __name__ == '__main__':
    app.start()

The results

The results are exceptional and actually quite similar to the Jenkins CI Slack integration. Additionally you can set up highlight words in Slack that provide personal channel notifications if you want to hear about certain tasks. Alternately, you could add @user notifications to the return value of your tasks to be notified individually. Here is a sample screenshot of the results.

celeryslack

Further reading

Celery

Flower

Slack

celery

Back to the posts.