大家好,我是肖恩,源码解析每周见
Celery是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息,并且提供了一整套操作此系统的工具。Celery 也是一款消息队列工具,可用于处理实时数据以及任务调度。
本文是是celery源码解析的第五篇,在前4篇里分别介绍了vine, py-amqp和kombu:
- 神器 celery 源码解析- vine实现Promise功能
- 神器 celery 源码解析- py-amqp实现AMQP协议
- 神器 celery 源码解析- kombu,一个python实现的消息库
- 神器 celery 源码解析- kombu的企业级算法
基本扫清celery的基础库后,我们正式进入celery的源码解析,本文包括下面几个部分:
- celery应用示例
- celery项目概述
- worker启动流程跟踪
- client启动流程跟踪
- celery的app
- worker模式启动流程
- 小结
celery应用示例
启动celery之前,我们先使用docker启动一个redis服务,作为broker:
1 | arduino复制代码$ docker run -p 6379:6379 --name redis -d redis:6.2.3-alpine |
使用telnet监控redis服务,观测任务调度情况:
1 | sql复制代码$ telnet 127.0.0.1 6379 |
下面是我们的celery服务代码 myapp.py
:
1 | python复制代码# myapp.py |
打开一个新的终端,使用下面的命令启动celery的worker服务:
1 | ruby复制代码$ python myapp.py worker -l DEBUG |
正常情况下,可以看到worker正常启动。启动的时候会显示一些banner信息,包括AMQP的实现协议,任务等:
1 | sql复制代码$ celery -A myapp worker -l DEBUG |
再开启一个终端窗口,作为client执行下面的代码, 可以看到add函数正确的执行,获取到计算 16+16 的结果 32。注意: 这个过程是远程执行的,使用的是delay方法,函数的打印print("add", x, y)
并没有输出:
1 | python复制代码$ python |
在celery的worker服务窗口,可以看到类似下面的输出。收到一个执行任务 myapp.add 的请求, 请求的uuid是 5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b ,参数数组是 [16, 16] 正常执行后返回结果32。
1 | css复制代码[2021-11-11 20:13:48,040: INFO/MainProcess] Task myapp.add[5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b] received |
在redis的monitor窗口,也可以可以看到类似的输出,展示了过程中一些对redis的操作命令:
1 | swift复制代码+1636632828.304020 [0 172.16.0.117:51127] "SUBSCRIBE" "celery-task-meta-5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b" |
我们再一次回顾下图,对比一下示例,加强理解:
- 我们先启动一个celery的worker服务作为消费者
- 再启动一个窗口作为生产者执行task
- 使用redis作为broker,负责生产者和消费者之间的消息通讯
- 最终生成者的task,作为消息发送到远程的消费者上执行,执行的结果又通过网络回传给生产者
上面示例展示了celery作为一个分布式任务调度系统的执行过程,本地的任务调用,通过AMQP协议的包装,作为消息发送到远程的消费者执行。
celery项目概述
解析celery采用的代码版本5.0.5
, 主要模块结构:
模块 | 描述 |
---|---|
app | celery的app实现 |
apps | celery服务的三种主要模式,worker,beat和multi |
backends | 任务结果存储 |
bin | 命令行工具实现 |
concurrency | 各种并发实现,包括线程,gevent,asyncpool等 |
events | 事件实现 |
worker | 服务启动环节实现 |
beat.py&&schedules.py | 定时和调度实现 |
result.py | 任务结果实现 |
signals.py | 一些信号定义 |
status.py | 一些状态定义 |
从项目结构看,模块较多,功能复杂。不过我们已经搞定了vine, py-amqp和kombu三个库,接下来只需要理解worker,beat和multi三种服务模型,就可以较好的了解celery这个分布式系统如何构建。
worker启动流程跟踪
worker的启动命令 celery -A myapp worker -l DEBUG
使celery作为一个模块,入口在main文件的main函数:
1 | python复制代码# ch23-celery/celery-5.0.5/celery/__main__.py |
celery命令作为主命令,加载celery-app的同时,还会启动worker子命令:
1 | python复制代码# ch23-celery/celery-5.0.5/celery/bin/celery.py |
在worker子命令中创建worker并启动:
1 | ini复制代码# ch23-celery/celery-5.0.5/celery/bin/worker.py |
下面是创建worker的方式,创一个 celery.apps.worker:Worker 对象:
1 | ruby复制代码# ch23-celery/celery-5.0.5/celery/app/base.py |
服务启动过程中,调用链路如下:
1 | lua复制代码 +----------+ |
在这个服务启动过程中,创建了celery-application和worker-application两个应用程序。至于具体的启动流程,我们暂时跳过,先看看客户端的流程。
client启动流程分析
示例client的启动过程包括下面4步:
1 创建celery-application,
2 创建task
3 调用task的delay方法执行任务得到一个异步结果
4 最后使用异步结果的get方法获取真实结果
task是通过app创建的装饰器创建的Promise对象:
1 | python复制代码# ch23-celery/celery-5.0.5/celery/app/base.py |
task实际上是一个由Task基类动态创建的子类:
1 | kotlin复制代码def _task_from_fun(self, fun, name=None, base=None, bind=False, **options): |
任务的执行使用app的send_task方法进行:
1 | ruby复制代码# ch23-celery/celery-5.0.5/celery/app/task.py |
可以看到,client作为生产者启动任务,也需要创建celery-application,下面我们就先看celery-application的实现。
celery的app两大功能
Celery的构造函数:
1 | ini复制代码class Celery: |
可以看到celery类提供了一些默认模块类的名称,可以根据这些类名动态创建对象。app对象任务的处理使用一个队列作为pending状态的任务容器,使用TaskRegistry来管理任务的注册。
任务通过task装饰器,记录到celery的TaskRegistry中:
1 | ruby复制代码def task(self, *args, **opts): |
celery另外一个核心功能是提供到broker的连接:
1 | python复制代码def _connection(self, url, userid=None, password=None, |
AMQP的实现,是依赖kombu提供的AMQP协议封装:
1 | python复制代码from kombu import Connection, Consumer, Exchange, Producer, Queue, pools |
然后使用我们熟悉的Queue,Consumer,Producer进行消息的生成和消费:
1 | ini复制代码def Queues(self, queues, create_missing=None, |
celery-app的两大功能,管理task和管理AMQP连接,我们有一个大概的了解。
worker模式启动流程
worker模式启动在WorkController中,将服务分成不同的阶段,然后将各个阶段组装成一个叫做蓝图(Blueprint)的方式进行管理:
1 | python复制代码class WorkController: |
启动蓝图:
1 | python复制代码def start(self): |
启动步骤,比较简单,大概代码如下:
1 | python复制代码class StepType(type): |
从Step大概可以看出:
- 每个步骤,可以有依赖requires
- 每个步骤,可以有具体的动作instantiate
- 步骤具有树状的父子结构,可以自动创建上级步骤
比如一个消费者步骤, 依赖Connection步骤。启动的时候对Connection进行消费。两者代码如下:
1 | python复制代码class ConsumerStep(StartStopStep): |
在Blueprint中创建和管理这些step:
1 | python复制代码class Blueprint: |
启动Blueprint:
1 | python复制代码def start(self, parent): |
通过将启动过程拆分成多个step单元,然后组合单元构建成graph,逐一启动。
小结
本篇我们正式学习了一下celery的使用流程,了解celery如果使用redis作为broker,利用服务作为消费者,使用客户端作为生成者,完成一次远程任务的执行。简单探索worker服务模式的启动流程,重点分析celery-application的管理task和管理连接两大功能实现。
小技巧
celery中展示了一种动态创建类和对象的方法:
1 | kotlin复制代码task = type(fun.__name__, (Task,), dict({ |
通过type函数创了一个动态的task子类,然后执行 () 实例化一个task子对象。
参考链接
本文转载自: 掘金