本文概述
- 什么是Celery?
- 增进食欲
- Celery的第一步
- 方案1-报表生成和导出
- 方案2-通过电子邮件报告服务器500错误
- 超越默认Celery任务
- 方案3-每个任务的文件记录
- 方案4-范围感知任务
- 总结
现代Web应用程序及其底层系统比以往任何时候都更快, 响应更快。但是, 在许多情况下, 你希望将繁重的任务的执行工作转移到整个系统体系结构的其他部分, 而不是将其处理在主线程上。识别此类任务就像检查它们是否属于以下类别之一一样简单:
- 定期任务-你计划安排在特定时间或间隔运行的任务, 例如, 每月生成报告或每天运行两次的网络抓取工具。
- 第三方任务-Web应用程序必须快速为用户提供服务, 而不必等待页面加载时完成其他操作, 例如, 发送电子邮件或通知或传播内部工具的更新(例如, 收集数据以进行A / B测试或系统日志记录) )。
- 长期运行的作业-资源昂贵的作业, 用户在计算结果时需要等待, 例如, 复杂的工作流程执行(DAG工作流程), 图形生成, 类似Map-Reduce的任务以及媒体内容(视频, 音频)。
执行后台任务的直接解决方案是在单独的线程或进程中运行它。 Python是一种高级的Turing完整编程语言, 不幸的是, 它没有提供与Erlang, Go, Java, Scala或Akka匹配的规模的内置并发。这些基于Tony Hoare的通信顺序过程(CSP)。另一方面, Python线程是通过全局解释器锁(GIL)进行协调和调度的, 它可以防止多个本机线程一次执行Python字节码。摆脱GIL是Python开发人员中很多讨论的话题, 但这不是本文的重点。尽管你可以在srcminier Marcus McCurdy的Python Multithreading Tutorial中阅读它, 但使用Python进行并行编程是过时的。因此, 一致地设计进程之间的通信是一个容易出错的过程, 并且会导致代码耦合和不良的系统可维护性, 更不用说它会对可伸缩性产生负面影响。此外, Python进程是操作系统(OS)下的常规进程, 并且与整个Python标准库一起, 它变得非常繁重。随着应用程序中进程数量的增加, 从一个这样的进程切换到另一个进程变得很耗时。
为了更好地了解Python的并发性, 请观看David Beazley在PyCon’15上的精彩演讲。
更好的解决方案是为分布式队列或其称为publish-subscribe的同级范例提供服务。如图1所示, 有两种类型的应用程序, 其中一种称为发布者, 发送消息, 另一种称为订阅者, 接收消息。这两个代理不直接相互交互, 甚至彼此都不知道。发布者将消息发送到中央队列或代理, 订阅者从该代理接收感兴趣的消息。此方法有两个主要优点:
- 可伸缩性-代理商无需在网络中彼此了解。他们是按主题集中的。因此, 这意味着彼此可以以异步方式继续正常运行, 而与彼此无关。
- 松耦合-每个代理代表其在系统(服务, 模块)中的部分。由于它们是松散耦合的, 因此每个都可以单独扩展到数据中心之外。
有许多消息传递系统支持此类范例并提供纯净的API, 该API由TCP或HTTP协议驱动, 例如JMS, RabbitMQ, Redis Pub / Sub, Apache ActiveMQ等。
图1:发布-订阅范例
什么是Celery?
Celery是Python世界中最受欢迎的后台工作经理之一。 Celery与RabbitMQ或Redis等多个消息代理兼容, 并且可以同时充当生产者和消费者。
Celery是基于分布式消息传递的异步任务队列/作业队列。它专注于实时操作, 但也支持调度。执行单元(称为任务)使用多处理, Eventlet或gevent在一个或多个工作服务器上同时执行。任务可以异步执行(在后台)或同步执行(等待直到就绪)。 –Celery项目
要开始使用Celery, 只需遵循官方文档中的分步指南即可。
本文的重点是使你更好地了解Celery可以涵盖哪些用例。在本文中, 我们不仅将展示有趣的示例, 还将尝试学习如何将Celery应用于实际任务, 例如后台邮件, 报告生成, 日志记录和错误报告。我将分享测试仿真以外任务的方式, 最后, 我将提供一些技巧, 这些技巧没有(很好)记录在正式文档中, 这花了我大量时间进行自我研究。
如果你以前没有使用Celery的经验, 建议你先按照官方教程进行尝试。
增进食欲
如果本文引起你的兴趣, 并希望立即投入到代码中, 请访问此GitHub存储库以获取本文中使用的代码。那里的README文件将为你提供运行示例文件的快速而肮脏的方法。
Celery的第一步
首先, 我们将通过一系列实用示例向读者展示Celery如何简单而优雅地解决看似不重要的任务。所有示例都将在Django框架内展示;但是, 它们大多数都可以轻松移植到其他Python框架(Flask, Pyramid)。
项目布局由Cookiecutter Django生成;但是, 在我看来, 我只保留了一些依赖关系, 这些依赖关系有助于这些用例的开发和准备。我还删除了此帖子和应用程序不必要的模块, 以减少噪音并使代码更易于理解。
- celery_uncovered/
- celery_uncovered/__init__.py
- celery_uncovered/{toyex, tricks, advex}
- celery_uncovered/celery.py
- config/settings/{base, local, test}.py
- config/urls.py
- manage.py
- celery_uncovered / {toyex, tricks, advex}包含我们将在本文中介绍的不同应用程序。每个应用程序都包含一组示例, 这些示例按所需的Celery理解级别组织。
- celery_uncovered / celery.py定义一个Celery实例。
文件:celery_uncovered / celery.py:
from __future__ import absolute_import
import os
from celery import Celery, signals
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.local')
app = Celery('celery_uncovered')
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
然后我们需要确保Celery将与Django一起启动。因此, 我们将应用程序导入celery_uncovered / __ init__.py。
文件:celery_uncovered / __ init__.py:
from __future__ import absolute_import
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app # noqa
__all__ = ['celery_app']
__version__ = '0.0.1'
__version_info__ = tuple([int(num) if num.isdigit() else num for num in __version__.replace('-', '.', 1).split('.')])
config / settings是我们的应用程序和Celery的配置源。根据执行环境的不同, Django将启动相应的设置:local.py用于开发或test.py用于测试。如果需要, 你还可以定义自己的环境, 方法是创建一个新的python模块(例如prod.py)。 Celery配置以CELERY_为前缀。在这篇文章中, 我将RabbitMQ配置为代理, 并将SQLite配置为bac-end。
文件:config / local.py:
CELERY_BROKER_URL = env('CELERY_BROKER_URL', default='amqp://guest:[email protected]:5672//')
CELERY_RESULT_BACKEND = 'django-db+sqlite:///results.sqlite'
方案1-报表生成和导出
我们将介绍的第一种情况是报告的生成和导出。在此示例中, 你将学习如何定义一个生成CSV报告的任务, 并定期以celerybeat对其进行计划。
用例描述:在每个选定的时间段(天, 周, 月)从GitHub提取500个最热的存储库, 按主题分组, 然后将结果导出到CSV文件。
如果我们提供HTTP服务, 该服务将执行通过单击标记为” Generate Report”的按钮触发的此功能, 则应用程序将停止并等待任务完成, 然后再发送HTTP响应。这是不好的。我们希望我们的网络应用程序速度更快, 并且我们不希望用户在后端计算结果时等待。与其等待结果产生, 我们不如通过Celery中已注册的队列将任务排队到工作进程中, 并使用task_id响应前端。然后, 前端将使用task_id以异步方式(例如AJAX)查询任务结果, 并使用户随时了解任务进度。最后, 当该过程完成时, 结果可以作为文件通过HTTP下载。
实施细节
首先, 让我们将流程分解为最小的单位并创建管道:
- 提取程序是负责从GitHub服务获取存储库的工作人员。
- 聚合器是负责将结果合并到一个列表中的工作程序。
- Importer是负责在GitHub中生成最热门存储库的CSV报告的工作人员。
图2:使用Celery和Python的工人管道
提取存储库是使用GitHub Search API GET / search / repositories的HTTP请求。但是, 应处理的GitHub API服务有一个局限性:API每个请求最多返回100个存储库, 而不是500个。我们可以一次发送五个请求, 但我们不想让用户等待因为HTTP请求是一个I / O绑定操作, 所以可以用于五个单独的请求。相反, 我们可以使用适当的page参数执行五个并发的HTTP请求。因此页面将在[1..5]范围内。让我们在toyex / tasks.py模块中定义一个名为fetch_hot_repos / 3-> list的任务:
档案:celery_uncovered / toyex / local.py
@shared_task
def fetch_hot_repos(since, per_page, page):
payload = {
'sort': 'stars', 'order': 'desc', 'q': 'created:>={date}'.format(date=since), 'per_page': per_page, 'page': page, 'access_token': settings.GITHUB_OAUTH}
headers = {'Accept': 'application/vnd.github.v3+json'}
connect_timeout, read_timeout = 5.0, 30.0
r = requests.get(
'https://api.github.com/search/repositories', params=payload, headers=headers, timeout=(connect_timeout, read_timeout))
items = r.json()[u'items']
return items
因此, fetch_hot_repos创建对GitHub API的请求, 并通过存储库列表响应用户。它接收三个参数, 这些参数将定义我们的请求有效负载:
- 自— —在创建日期筛选存储库。
- per_page —每个请求要返回的结果数(限制为100)。
- page —请求的页码(在[1..5]范围内)。
注意:为了使用GitHub Search API, 你将需要OAuth令牌来通过身份验证检查。就我们而言, 它保存在GITHUB_OAUTH下的设置中。
接下来, 我们需要定义一个主任务, 该任务将负责汇总结果并将其导出到CSV文件中:produce_hot_repo_report_task / 2-> filepath:
档案:celery_uncovered / toyex / local.py
@shared_task
def produce_hot_repo_report(period, ref_date=None):
# 1. parse date
ref_date_str = strf_date(period, ref_date=ref_date)
# 2. fetch and join
fetch_jobs = group([
fetch_hot_repos.s(ref_date_str, 100, 1), fetch_hot_repos.s(ref_date_str, 100, 2), fetch_hot_repos.s(ref_date_str, 100, 3), fetch_hot_repos.s(ref_date_str, 100, 4), fetch_hot_repos.s(ref_date_str, 100, 5)
])
# 3. group by language and
# 4. create csv
return chord(fetch_jobs)(build_report_task.s(ref_date_str)).get()
@shared_task
def build_report_task(results, ref_date):
all_repos = []
for repos in results:
all_repos += [Repository(repo) for repo in repos]
# 3. group by language
grouped_repos = {}
for repo in all_repos:
if repo.language in grouped_repos:
grouped_repos[repo.language].append(repo.name)
else:
grouped_repos[repo.language] = [repo.name]
# 4. create csv
lines = []
for lang in sorted(grouped_repos.keys()):
lines.append([lang] + grouped_repos[lang])
filename = '{media}/github-hot-repos-{date}.csv'.format(media=settings.MEDIA_ROOT, date=ref_date)
return make_csv(filename, lines)
该任务使用celery.canvas.group执行五个并发调用fetch_hot_repos / 3。等待这些结果, 然后将其缩减为存储库对象列表。然后, 我们的结果集按主题分组, 最后导出到MEDIA_ROOT /目录下生成的CSV文件中。
为了定期安排任务, 你可能需要在配置文件中的计划列表中添加一个条目:
文件:config / local.py
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
'produce-csv-reports': {
'task': 'celery_uncovered.toyex.tasks.produce_hot_repo_report_task', 'schedule': crontab(minute=0, hour=0) # midnight, 'args': ('today', )
}, }
尝试一下
为了启动并测试任务的工作方式, 首先我们需要启动Celery流程:
$ celery -A celery_uncovered worker -l info
接下来, 我们需要创建celery_uncovered / media /目录。然后, 你将能够通过Shell或Celerybeat测试其功能:
贝壳:
from datetime import date
from celery_uncovered.toyex.tasks import produce_hot_repo_report_task
produce_hot_repo_report_task.delay('today').get(timeout=5)
Celery
# Start celerybeat with the following command
$ celery -A celery_uncovered beat -l info
你可以在MEDIA_ROOT /目录下观看结果。
方案2-通过电子邮件报告服务器500错误
Celery最常见的用例之一是发送电子邮件通知。电子邮件通知是一种脱机I / O绑定操作, 它利用本地SMTP服务器或第三方SES。有许多用例涉及发送电子邮件, 对于大多数用例, 用户无需等待此过程完成就可以接收HTTP响应。因此, 最好在后台执行此类任务并立即响应用户。
用例说明:通过Celery向管理员电子邮件报告50X错误。
Python和Django具有执行系统日志记录的必要背景。我不会详细介绍Python记录的实际工作方式。但是, 如果你以前从未尝试过或需要复习, 请阅读内置日志记录模块的文档。你绝对希望在你的生产环境中使用它。 Django有一个称为AdminEmailHandler的特殊记录器处理程序, 可将收到的每条日志消息发送给管理员。
实施细节
主要思想是扩展AdminEmailHandler类的send_mail方法, 使其可以通过Celery发送邮件。可以如下图所示进行:
图3:使用Celery和Python处理管理员电子邮件
首先, 我们需要设置一个名为report_error_task的任务, 以提供的主题和消息调用mail_admins:
档案:celery_uncovered / toyex / tasks.py
@shared_task
def report_error_task(subject, message, *args, **kwargs):
mail_admins(subject, message, *args, **kwargs)
接下来, 我们实际上扩展了AdminEmailHandler, 以便它在内部仅调用定义的Celery任务:
档案:celery_uncovered / toyex / admin_email.py
from django.utils.log import AdminEmailHandler
from celery_uncovered.handlers.tasks import report_error_task
class CeleryHandler(AdminEmailHandler):
def send_mail(self, subject, message, *args, **kwargs):
report_error_task.delay(subject, message, *args, **kwargs)
最后, 我们需要设置日志记录。在Django中登录的配置非常简单。你需要重写LOGGING, 以便日志引擎使用新定义的处理程序启动:
文件config / settings / local.py
LOGGING = {
'version': 1, 'disable_existing_loggers': False, ..., 'handlers': {
...
'mail_admins': {
'level': 'ERROR', 'filters': ['require_debug_true'], 'class': 'celery_uncovered.toyex.log_handlers.admin_email.CeleryHandler'
}
}, 'loggers': {
'django': {
'handlers': ['console', 'mail_admins'], 'level': 'INFO', }, ...
}
}
请注意, 我有意设置处理程序过滤器require_debug_true, 以便在应用程序以调试模式运行时测试此功能。
尝试一下
为了测试它, 我准备了一个Django视图, 该视图在localhost:8000 / report-error处执行”零除法”操作。你还需要启动MailHog Docker容器以测试电子邮件是否已实际发送。
$ docker run -d -p 1025:1025 -p 8025:8025 mailhog/mailhog
$ CELERY_TASKSK_ALWAYS_EAGER=False python manage.py runserver
$ # with your browser navigate to [http://localhost:8000](http://localhost:8000)
$ # now check your outgoing emails by vising web UI [http://localhost:8025](http://localhost:8025)
额外细节
作为邮件测试工具, 我设置了MailHog并将Django邮件配置为将其用于SMTP传递。有许多方法可以部署和运行MailHog。我决定使用Docker容器。你可以在相应的自述文件中找到详细信息:
档案:docker / mailhog / README.md
$ docker build . -f docker/mailhog/Dockerfile -t mailhog/mailhog:latest
$ docker run -d -p 1025:1025 -p 8025:8025 mailhog/mailhog
$ # navigate with your browser to localhost:8025
要将应用程序配置为使用MailHog, 你需要在配置中添加以下几行:
文件:config / settings / local.py
EMAIL_BACKEND = env('DJANGO_EMAIL_BACKEND', default='django.core.mail.backends.smtp.EmailBackend')
EMAIL_PORT = 1025
EMAIL_HOST = env('EMAIL_HOST', default='mailhog')
超越默认Celery任务
Celery任务可以通过任何可调用函数创建。默认情况下, 所有用户定义的任务都将celery.app.task.Task注入为父(抽象)类。此类包含以下功能:异步运行任务(通过网络将其传递给Celery工作者)或同步运行(出于测试目的), 创建签名和许多其他实用程序。在下一个示例中, 我们将尝试扩展Celery.app.task.Task, 然后将其用作基类, 以便为我们的任务添加一些有用的行为。
方案3-每个任务的文件记录
在我的一个项目中, 我正在开发一个应用程序, 该应用程序为最终用户提供了类似提取, 转换, 加载(ETL)的工具, 该工具能够提取然后过滤大量分层数据。后端分为两个模块:
- 用Celery编排数据处理管道
- 使用Go进行数据处理
Celery部署了一个Celerybeat实例和40多名工人。构成管道和业务流程活动的任务有二十多种。每个此类任务都可能在某个时候失败。所有这些故障都被转储到每个工作程序的系统日志中。在某个时候, 调试和维护Celery层变得不方便。最终, 我们决定将任务日志隔离到特定于任务的文件中。
用例描述:扩展Celery, 以便每个任务将其标准输出和错误记录到文件中
Celery为Python应用程序提供了对其内部功能的强大控制。它附带了一个熟悉的信号框架。使用Celery的应用程序可以订阅其中一些应用程序, 以增强某些操作的行为。我们将利用任务级别的信号来详细跟踪各个任务生命周期。 Celery始终带有日志记录后端, 我们将从中受益, 同时仅在几个地方稍加覆盖即可实现我们的目标。
实施细节
Celery已经支持按任务记录。要保存到文件, 必须将日志输出分派到正确的位置。在我们的情况下, 任务的正确位置是与任务名称匹配的文件。在Celery实例上, 我们将使用动态推断的日志处理程序覆盖内置日志配置。可以订阅celeryd_after_setup信号, 然后在此处配置系统日志记录:
档案:celery_uncovered / toyex / celery_conf.py
@signals.celeryd_after_setup.connect
def configure_task_logging(instance=None, **kwargs):
tasks = instance.app.tasks.keys()
LOGS_DIR = settings.ROOT_DIR.path('logs')
if not os.path.exists(str(LOGS_DIR)):
os.makedirs(str(LOGS_DIR))
print 'dir created'
default_handler = {
'level': 'DEBUG', 'filters': None, 'class': 'logging.FileHandler', 'filename': ''
}
default_logger = {
'handlers': [], 'level': 'DEBUG', 'propogate': True
}
LOG_CONFIG = {
'version': 1, # 'incremental': True, 'disable_existing_loggers': False, 'handlers': {}, 'loggers': {}
}
for task in tasks:
task = str(task)
if not task.startswith('celery_uncovered.'):
continue
task_handler = copy_dict(default_handler)
task_handler['filename'] = str(LOGS_DIR.path(task + ".log"))
task_logger = copy_dict(default_logger)
task_logger['handlers'] = [task]
LOG_CONFIG['handlers'][task] = task_handler
LOG_CONFIG['loggers'][task] = task_logger
logging.config.dictConfig(LOG_CONFIG)
请注意, 对于在Celery应用程序中注册的每个任务, 我们正在使用其处理程序构建一个相应的记录器。每个处理程序的类型为logging.FileHandler, 因此, 每个此类实例都将文件名作为输入。你需要执行的所有操作是将该模块导入文件末尾的celery_uncovered / celery.py中:
import celery_uncovered.tricks.celery_conf
通过调用get_task_logger(task_name)可以接收特定的任务记录器。为了概括每个任务的这种行为, 有必要使用一些实用程序方法稍微扩展celery.current_app.Task:
档案:celery_uncovered / tricks / celery_ext.py
class LoggingTask(current_app.Task):
abstract = True
ignore_result = False
@property
def logger(self):
logger = get_task_logger(self.name)
return logger
def log_msg(self, msg, *msg_args):
self.logger.debug(msg, *msg_args)
现在, 在调用task.log_msg(” Hello, 我的名字是:%s”, task.request.id)的情况下, 日志输出将被路由到任务名称下的相应文件。
尝试一下
为了启动并测试此任务的工作方式, 请首先启动Celery流程:
$ celery -A celery_uncovered worker -l info
然后, 你将能够通过Shell测试功能:
from datetime import date
from celery_uncovered.tricks.tasks import add
add.delay(1, 3)
最后, 要查看结果, 请导航到celery_uncovered / logs目录, 然后打开名为celery_uncovered.tricks.tasks.add.log的相应日志文件。多次运行此任务后, 你可能会看到类似以下的内容:
Result of 1 + 2 = 3
Result of 1 + 2 = 3
...
方案4-范围感知任务
让我们想象一个基于Celery和Django构建的面向国际用户的Python应用程序。用户可以设置使用你的应用程序所使用的语言(区域设置)。
你必须设计一个多语言, 可识别区域设置的电子邮件通知系统。要发送电子邮件通知, 你已经注册了一个特殊的Celery任务, 该任务由特定队列处理。此任务接收一些关键参数作为输入和当前用户的语言环境, 以便将电子邮件以用户选择的语言发送。
现在想象我们有很多这样的任务, 但是每个任务都接受一个locale参数。在这种情况下, 在更高的抽象水平上解决它会更好吗?在这里, 我们看到了如何做到这一点。
用例描述:自动从一个执行上下文继承范围, 并将其作为参数注入到当前执行上下文中。
实施细节
同样, 与任务记录一样, 我们想要扩展基本任务类celery.current_app.Task并重写一些负责调用任务的方法。出于演示目的, 我将重写celery.current_app.Task :: apply_async方法。该模块还有其他任务, 可以帮助你产生功能全面的替代产品。
档案:celery_uncovered / tricks / celery_ext.py
class ScopeBasedTask(current_app.Task):
abstract = True
ignore_result = False
default_locale_id = DEFAULT_LOCALE_ID
scope_args = ('locale_id', )
def __init__(self, *args, **kwargs):
super(ScopeBasedTask, self).__init__(*args, **kwargs)
self.set_locale(locale=kwargs.get('locale_id', None))
def set_locale(self, scenario_id=None):
self.locale_id = self.default_locale_id
if locale_id:
self.locale_id = locale_id
else:
self.locale_id = get_current_locale().id
def apply_async(self, args=None, kwargs=None, **other_kwargs):
self.inject_scope_args(kwargs)
return super(ScopeBasedTask, self).apply_async(args=args, kwargs=kwargs, **other_kwargs)
def __call__(self, *args, **kwargs):
task_rv = super(ScopeBasedTask, self).__call__(*args, **kwargs)
return task_rv
def inject_scope_args(self, kwargs):
for arg in self.scope_args:
if arg not in kwargs:
kwargs[arg] = getattr(self, arg)
关键提示是默认情况下将当前语言环境作为键值参数传递给调用任务。如果使用某个区域设置作为参数调用了任务, 则该任务不变。
尝试一下
为了测试此功能, 让我们定义一个ScopeBasedTask类型的虚拟任务。它通过语言环境ID定位文件, 并以JSON读取其内容:
文件:celery_uncovered / tricks / tasks.py
@shared_task(bind=True, base=ScopeBasedTask)
def read_scenario_file_task(self, **kwargs):
fixture_parts = ["locales", "sc_%i.json" %
kwargs['scenario_id']]
return read_fixture(*fixture_parts)
现在, 你需要做的是重复启动Celery, 启动Shell以及在不同情况下测试此任务执行的步骤。灯具位于celery_uncovered / tricks / fixtures / locales /目录下。
总结
这篇文章旨在从不同角度探讨Celery。我在常规示例中演示了Celery, 例如邮件和报告生成, 以及一些有趣的利基业务用例的共享技巧。 Celery建立在数据驱动的理念之上, 你的团队可以通过将其引入系统堆栈来简化他们的生活。如果你具有基本的Python经验, 则开发基于Celery的服务并不是很复杂, 并且应该可以很快地掌握它。默认配置足以满足大多数用途, 但是如果需要, 它们可以非常灵活。
我们的团队选择使用Celery作为后台任务和长期运行任务的业务流程后端。我们将其广泛用于各种用例, 本文中仅提及其中一些。我们每天都会摄取和分析千兆字节的数据, 但这只是水平缩放技术的开始。
评论前必须登录!
注册