celery源码分析-wroker初始化分析(上)

xiaoxiao2021-02-28  27

celery源码分析

本文环境python3.5.2,celery4.0.2,django1.10.x系列

celery与Django的配合使用

首先,在安装有django的环境中创建一个django项目,

(venv) wuzideMacBook-Air:workpy wuzi$ django-admin startproject celery_django

此时进入celery_django,在配置文件夹celery_django中创建一个celery.py文件,此时该文件的内容如下,

from __future__ import absolute_import, unicode_literals import os from celery import Celery # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_django.settings') app = Celery('celery_django') # Using a string here means the worker doesn't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object('django.conf:settings', namespace='CELERY') # Load task modules from all registered Django app configs. app.autodiscover_tasks() @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))

并在同级的init.py文件中写入,

from __future__ import absolute_import, unicode_literals # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app __all__ = ('celery_app',)

此时使用django命令创建一个app,

(venv) wuzideMacBook-Air:celery_django wuzi$ python manage.py startapp celery_app

此时我们在celery_app文件夹中,创建一个task.py文件,文件内容如下,

# Create your tasks here from __future__ import absolute_import, unicode_literals from celery import shared_task @shared_task def add(x, y): return x + y @shared_task def mul(x, y): return x * y @shared_task def xsum(numbers): return sum(numbers)

此时celery与Django配置使用的配置就完成了。此时如果本机装有rabbitmq并运行与默认配置,启动celery项目即可如果没有装,此时选用本机的redis作为消息队列此时建议在celery_django/settings.py文件中加入如下配置,

CELERY_BROKER_URL = 'redis://127.0.0.1:6379/7' CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/6'

此时在中断上输入启动命令,

(venv) wuzideMacBook-Air:celery_django wuzi$ celery -A celery_django worker -------------- celery@wuzideMacBook-Air.local v4.0.0 (latentcall) ---- **** ----- --- * *** * -- Darwin-15.6.0-x86_64-i386-64bit 2018-07-07 03:21:14 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: celery_django:0x109b19278 - ** ---------- .> transport: redis://127.0.0.1:6379/7 - ** ---------- .> results: redis://127.0.0.1:6379/6 - *** --- * --- .> concurrency: 4 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery

同时在django项目下进入shell中断,

In [1]: from celery_app.tasks import * In [2]: add.delay(1,2) Out[2]: <AsyncResult: 9465e723-1027-45d9-9b38-8a5d94457c3a> In [3]: ast=_ In [4]: ast.get() Out[4]: 3

至此,django与celery的配合启动就完成了。

celery的worker启动

根据上一篇博文的分析,此时输入的参数为-A celery_django worker,此时在CeleryCommand类中执行到,execute函数时,执行的类实例化时,

return cls( app=self.app, on_error=self.on_error, no_color=self.no_color, quiet=self.quiet, on_usage_error=partial(self.on_usage_error, command=command), ).run_from_argv(self.prog_name, argv[1:], command=argv[0]) # 实例化该类,实例化之后调用run_from_argv

该类就是worker的类。此时针对self.app的实例化是,在CeleryCommand类调用父类execute_from_commandline函数时,调用了setup_app_from_commandline方法,该方法执行到如下代码段时,

if self.respects_app_option: # 由于在multi命令中此时该值被设置为了False,如果设置成True if app: # 如果参数中传入了app值 self.app = self.find_app(app) # 加载并获取该app实例 elif self.app is None:

此时的app就是传入参数的celery_django,此时就会调用find_app方法去项目中查找实例化的app,

def find_app(self, app): from celery.app.utils import find_app return find_app(app, symbol_by_name=self.symbol_by_name) def symbol_by_name(self, name, imp=imports.import_from_cwd): return imports.symbol_by_name(name, imp=imp)

调用了utils中的find_app方法传入的symbol_by_name方式Command类自身的symbol_by_name方法,

def find_app(app, symbol_by_name=symbol_by_name, imp=import_from_cwd): """Find app by name.""" from .base import Celery try: sym = symbol_by_name(app, imp=imp) # 导入app名称的moudel except AttributeError: # last part was not an attribute, but a module sym = imp(app) if isinstance(sym, ModuleType) and ':' not in app: # 获得的sym是否是用户定义的类实例类型,并且app中没有: try: found = sym.app # 获取导入模块的app属性 if isinstance(found, ModuleType): raise AttributeError() except AttributeError: try: found = sym.celery # 获取celery属性 if isinstance(found, ModuleType): # 如果不是用户定义类实例则报错 raise AttributeError() except AttributeError: if getattr(sym, '__path__', None): try: return find_app( '{0}.celery'.format(app), symbol_by_name=symbol_by_name, imp=imp, ) # 此时查找app.celery,再去查找 except ImportError: pass for suspect in values(vars(sym)): if isinstance(suspect, Celery): return suspect raise else: return found else: return found return sym

通过该函数return的found就是我们在celery_django/celery.py中定义Celery的实例app,此时在CeleryCommand中的self.app就是该app实例。

此时我们回头继续查看worker实例化后执行run_from_argv函数,

def run_from_argv(self, prog_name, argv=None, command=None): argv = [x for x in argv if x not in self.removed_flags] # 判断输入参数是否是需要删除的参数 command = sys.argv[0] if command is None else command # 获取执行任务的command argv = sys.argv[1:] if argv is None else argv # 获取输入参数 # parse options before detaching so errors can be handled. options, args = self.prepare_args( *self.parse_options(prog_name, argv, command)) # 获取解析之后的参数值 self.maybe_detach([command] + argv) # 判断是否是后台运行 return self(*args, **options) # 调用__call__方法

由于参数中没有后台运行的参数,所有就会执行self函数,此时调用父类Command中的call函数,

def __call__(self, *args, **kwargs): random.seed() # maybe we were forked. self.verify_args(args) try: ret = self.run(*args, **kwargs) return ret if ret is not None else EX_OK except self.UsageError as exc: self.on_usage_error(exc) return exc.status except self.Error as exc: self.on_error(exc) return exc.status

该函数最终会调用中self.run方法,此时在worker中定义了run方法,会调用worker类中的run方法,

def run(self, hostname=None, pool_cls=None, app=None, uid=None, gid=None, loglevel=None, logfile=None, pidfile=None, statedb=None, **kwargs): maybe_drop_privileges(uid=uid, gid=gid) # Pools like eventlet/gevent needs to patch libs as early # as possible. pool_cls = (concurrency.get_implementation(pool_cls) or self.app.conf.worker_pool) # 获取pool_cls类 if self.app.IS_WINDOWS and kwargs.get('beat'): # 定时任务不能再window平台运行 self.die('-B option does not work on Windows. ' 'Please run celery beat as a separate service.') hostname = self.host_format(default_nodename(hostname)) # 获取hostname信息 if loglevel: # 是否配置日志信息 try: loglevel = mlevel(loglevel) except KeyError: # pragma: no cover self.die('Unknown level {0!r}. Please use one of {1}.'.format( loglevel, '|'.join( l for l in LOG_LEVELS if isinstance(l, string_t)))) worker = self.app.Worker( hostname=hostname, pool_cls=pool_cls, loglevel=loglevel, logfile=logfile, # node format handled by celery.app.log.setup pidfile=self.node_format(pidfile, hostname), statedb=self.node_format(statedb, hostname), **kwargs) # 调用app.Worker属性并实例化一个worker worker.start() # 启动worker return worker.exitcode # 返回worker退出的退出码

此时在该函数中会实例化app的Worker,此时的app就是前面讲过的在celery_django/celery.py中定义的Celery类的实例app,

@cached_property def Worker(self): """Worker application. See Also: :class:`~@Worker`. """ return self.subclass_with_self('celery.apps.worker:Worker')

此时subclass_with_self利用了Python的type动态生成类实例的属性,

def subclass_with_self(self, Class, name=None, attribute='app', reverse=None, keep_reduce=False, **kw): """Subclass an app-compatible class. App-compatible means that the class has a class attribute that provides the default app it should use, for example: ``class Foo: app = None``. Arguments: Class (type): The app-compatible class to subclass. name (str): Custom name for the target class. attribute (str): Name of the attribute holding the app, Default is 'app'. reverse (str): Reverse path to this object used for pickling purposes. For example, to get ``app.AsyncResult``, use ``"AsyncResult"``. keep_reduce (bool): If enabled a custom ``__reduce__`` implementation won't be provided. """ Class = symbol_by_name(Class) # 导入该类 reverse = reverse if reverse else Class.__name__ # 判断是否传入值,如没有则使用类的名称 def __reduce__(self): # 定义的方法 该方法在pickle过程中会被调用 return _unpickle_appattr, (reverse, self.__reduce_args__()) attrs = dict( {attribute: self}, # 默认设置app的值为self __module__=Class.__module__, __doc__=Class.__doc__, **kw) # 填充属性 if not keep_reduce: attrs['__reduce__'] = __reduce__ # 如果默认则生成的类设置__reduce__方法 return type(bytes_if_py2(name or Class.__name__), (Class,), attrs) # 利用type实诚类实例

此时得到了一个celery.apps.worker:Worker的实例,然后调用该实例的start方法,此时首先分析一下Worker类的实例化的过程,

def __init__(self, app=None, hostname=None, **kwargs): self.app = app or self.app # 设置app属性 self.hostname = default_nodename(hostname) # 生成node的hostname self.app.loader.init_worker() # 调用app.loader的init_worker()方法 self.on_before_init(**kwargs) # 调用该初始化方法 self.setup_defaults(**kwargs) # 设置默认值 self.on_after_init(**kwargs) self.setup_instance(**self.prepare_args(**kwargs)) # 建立实例

此时会调用app.loader的init_worker方法,此处的app.loader,是在Celery初始化的时候设置的loader属性,该值默认是celery.loaders.app:AppLoader,

@cached_property def loader(self): """Current loader instance.""" return get_loader_cls(self.loader_cls)(app=self) ... def get_loader_cls(loader): """Get loader class by name/alias.""" return symbol_by_name(loader, LOADER_ALIASES, imp=import_from_cwd)

此时的loader实例就是AppLoader,然后调用该类的init_worker方法,

def import_default_modules(self): signals.import_modules.send(sender=self.app) return [ self.import_task_module(m) for m in ( tuple(self.builtin_modules) + tuple(maybe_list(self.app.conf.imports)) + tuple(maybe_list(self.app.conf.include)) ) # 导入项目中需要导入的modules ] def init_worker(self): if not self.worker_initialized: # 如果该类没有被设置过 self.worker_initialized = True # 设置成设置过 self.import_default_modules() # 导入默认的modules self.on_worker_init()

主要就是导入在app配置文件中需要导入的modules,

此时继续分析Worker类初始化过程中的self.setup_defaults方法,

def setup_defaults(self, concurrency=None, loglevel='WARN', logfile=None, task_events=None, pool=None, consumer_cls=None, timer_cls=None, timer_precision=None, autoscaler_cls=None, pool_putlocks=None, pool_restarts=None, optimization=None, O=None, # O maps to -O=fair statedb=None, time_limit=None, soft_time_limit=None, scheduler=None, pool_cls=None, # XXX use pool state_db=None, # XXX use statedb task_time_limit=None, # XXX use time_limit task_soft_time_limit=None, # XXX use soft_time_limit scheduler_cls=None, # XXX use scheduler schedule_filename=None, max_tasks_per_child=None, prefetch_multiplier=None, disable_rate_limits=None, worker_lost_wait=None, max_memory_per_child=None, **_kw): either = self.app.either # 从配置文件中获取,如果获取不到就使用给定的默认值 self.loglevel = loglevel # 设置日志等级 self.logfile = logfile # 设置日志文件 self.concurrency = either('worker_concurrency', concurrency) # 设置worker的工作进程数 self.task_events = either('worker_send_task_events', task_events) # 设置时间 self.pool_cls = either('worker_pool', pool, pool_cls) # 连接池设置 self.consumer_cls = either('worker_consumer', consumer_cls) # 消费类设置 self.timer_cls = either('worker_timer', timer_cls) # 时间类设置 self.timer_precision = either( 'worker_timer_precision', timer_precision, ) self.optimization = optimization or O # 优先级设置 self.autoscaler_cls = either('worker_autoscaler', autoscaler_cls) self.pool_putlocks = either('worker_pool_putlocks', pool_putlocks) self.pool_restarts = either('worker_pool_restarts', pool_restarts) self.statedb = either('worker_state_db', statedb, state_db) # 执行结果存储 self.schedule_filename = either( 'beat_schedule_filename', schedule_filename, ) # 定时任务调度设置 self.scheduler = either('beat_scheduler', scheduler, scheduler_cls) # 获取调度类 self.time_limit = either( 'task_time_limit', time_limit, task_time_limit) # 获取限制时间值 self.soft_time_limit = either( 'task_soft_time_limit', soft_time_limit, task_soft_time_limit, ) self.max_tasks_per_child = either( 'worker_max_tasks_per_child', max_tasks_per_child, ) # 设置每个子进程最大处理任务的个数 self.max_memory_per_child = either( 'worker_max_memory_per_child', max_memory_per_child, ) # 设置每个子进程最大内存值 self.prefetch_multiplier = int(either( 'worker_prefetch_multiplier', prefetch_multiplier, )) self.disable_rate_limits = either( 'worker_disable_rate_limits', disable_rate_limits, ) self.worker_lost_wait = either('worker_lost_wait', worker_lost_wait)

给运行中需要设置的参数设置值,执行完成后,会继续执行 self.setup_instance方法,

def setup_instance(self, queues=None, ready_callback=None, pidfile=None, include=None, use_eventloop=None, exclude_queues=None, **kwargs): self.pidfile = pidfile # pidfile self.setup_queues(queues, exclude_queues) # 指定相关的消费与不消费队列 self.setup_includes(str_to_list(include)) # 获取所有的task任务 # Set default concurrency if not self.concurrency: # 如果没有设置默认值 try: self.concurrency = cpu_count() # 设置进程数与cpu的个数相同 except NotImplementedError: self.concurrency = 2 # 如果获取失败则默认为2 # Options self.loglevel = mlevel(self.loglevel) # 设置日志等级 self.ready_callback = ready_callback or self.on_consumer_ready # 设置回调函数 # this connection won't establish, only used for params self._conninfo = self.app.connection_for_read() self.use_eventloop = ( self.should_use_eventloop() if use_eventloop is None else use_eventloop ) # 获取eventloop类型 self.options = kwargs signals.worker_init.send(sender=self) # 发送信号 # Initialize bootsteps self.pool_cls = _concurrency.get_implementation(self.pool_cls) # 获取缓冲池类 self.steps = [] # 需要执行的步骤 self.on_init_blueprint() self.blueprint = self.Blueprint( steps=self.app.steps['worker'], on_start=self.on_start, on_close=self.on_close, on_stopped=self.on_stopped, ) # 初始化blueprint self.blueprint.apply(self, **kwargs) # 调用初始化完成的blueprint类的apply方法

其中setup_queues和setup_includes所做的工作如下,

def setup_queues(self, include, exclude=None): include = str_to_list(include) exclude = str_to_list(exclude) try: self.app.amqp.queues.select(include) # 添加队列消费 except KeyError as exc: raise ImproperlyConfigured( SELECT_UNKNOWN_QUEUE.strip().format(include, exc)) try: self.app.amqp.queues.deselect(exclude) # 不消费指定的队列中的任务 except KeyError as exc: raise ImproperlyConfigured( DESELECT_UNKNOWN_QUEUE.strip().format(exclude, exc)) if self.app.conf.worker_direct: self.app.amqp.queues.select_add(worker_direct(self.hostname)) # 添加消费的队列 def setup_includes(self, includes): # Update celery_include to have all known task modules, so that we # ensure all task modules are imported in case an execv happens. prev = tuple(self.app.conf.include) # 获取配置文件中的task if includes: prev += tuple(includes) [self.app.loader.import_task_module(m) for m in includes] # 将task添加到loader的task中 self.include = includes task_modules = {task.__class__.__module__ for task in values(self.app.tasks)} # 获取已经注册的任务 self.app.conf.include = tuple(set(prev) | task_modules) # 去重后重新设置include

执行完成如上两个函数后,会执行到Blueprint类的初始化工作,此时的Blueprint类定义在Worker里面,

class Blueprint(bootsteps.Blueprint): """Worker bootstep blueprint.""" name = 'Worker' default_steps = { 'celery.worker.components:Hub', 'celery.worker.components:Pool', 'celery.worker.components:Beat', 'celery.worker.components:Timer', 'celery.worker.components:StateDB', 'celery.worker.components:Consumer', 'celery.worker.autoscale:WorkerComponent', }

当该类初始化完成后,就调用该实例的apply方法,

def apply(self, parent, **kwargs): """Apply the steps in this blueprint to an object. This will apply the ``__init__`` and ``include`` methods of each step, with the object as argument:: step = Step(obj) ... step.include(obj) For :class:`StartStopStep` the services created will also be added to the objects ``steps`` attribute. """ self._debug('Preparing bootsteps.') order = self.order = [] steps = self.steps = self.claim_steps() # 获取定义的steps self._debug('Building graph...') for S in self._finalize_steps(steps): # 经常依赖排序后,返回对应的step step = S(parent, **kwargs) # 获得实例化的step steps[step.name] = step # 已step.name为key,step实例为val order.append(step) # 添加到order列表中 self._debug('New boot order: {%s}', ', '.join(s.alias for s in self.order)) for step in order: # 遍历order列表 step.include(parent) # 调用step实例的include方法 return self

其中self.claim_steps方法如下,

def claim_steps(self): return dict(self.load_step(step) for step in self.types) # 导入types中的类,并返回已名称和类对应的k:v字典

其中self.types是在初始化的时候传入的,

self.types = set(steps or []) | set(self.default_steps)

在Blueprint初始化时没有,传入steps,所有此时types就是default_steps属性,该值就是WorkController类中的Blueprint类的default_steps值,

default_steps = { 'celery.worker.components:Hub', 'celery.worker.components:Pool', 'celery.worker.components:Beat', 'celery.worker.components:Timer', 'celery.worker.components:StateDB', 'celery.worker.components:Consumer', 'celery.worker.autoscale:WorkerComponent', }

此时由于这几个类中存在相互依赖的执行,比如Hub类,

class Hub(bootsteps.StartStopStep): """Worker starts the event loop.""" requires = (Timer,)

Hub类就依赖于Timer类,所以_finalize_steps的工作就是将被依赖的类先导入,该函数的分析如有机会后续分析。此时继续分析到order列表,该列表就是所有依赖顺序解决完成后的各个类的列表,并且这些steps类都是直接继承或间接继承自bootsteps.Step ,

@with_metaclass(StepType) class Step(object): ...

该类使用了元类,继续查看StepType,

class StepType(type): """Meta-class for steps.""" name = None requires = None def __new__(cls, name, bases, attrs): module = attrs.get('__module__') # 获取__module__属性 qname = '{0}.{1}'.format(module, name) if module else name # 如果获取到了__module__就设置qname为模块.name的形式,否则就设置成name attrs.update( __qualname__=qname, name=attrs.get('name') or qname, ) # 将要新建的类中的属性,name和__qualname__更新为所给的类型 return super(StepType, cls).__new__(cls, name, bases, attrs) def __str__(self): return bytes_if_py2(self.name) def __repr__(self): return bytes_if_py2('step:{0.name}{{{0.requires!r}}}'.format(self))

这里使用了有关Python元类编程的相关知识,通过在新建该类实例的时候控制相关属性的值,从而达到控制类的相关属性的目的。 此时会调用Step的include方法,

def _should_include(self, parent): if self.include_if(parent): return True, self.create(parent) return False, None def include(self, parent): return self._should_include(parent)[0]

如果继承的是StartStopStep,则调用的include方法如下,

def include(self, parent): inc, ret = self._should_include(parent) if inc: self.obj = ret parent.steps.append(self) return inc

此时分别来看下,首先看Timer类,

class Timer(bootsteps.Step): “”“Timer bootstep.”“”

def create(self, w): if w.use_eventloop: # 检查传入的Worker是否使用了use_eventloop # does not use dedicated timer thread. w.timer = _Timer(max_interval=10.0) # 直接使用kombu的timer做定时器 else: if not w.timer_cls: # 如果配置文件中没有配置timer_clas # Default Timer is set by the pool, as for example, the # eventlet pool needs a custom timer implementation. w.timer_cls = w.pool_cls.Timer # 使用缓冲池中的Timer w.timer = self.instantiate(w.timer_cls, max_interval=w.timer_precision, on_error=self.on_timer_error, on_tick=self.on_timer_tick) # 导入对应的类并实例化

Hub类的执行分析如下,

class Hub(bootsteps.StartStopStep): """Worker starts the event loop.""" requires = (Timer,) def __init__(self, w, **kwargs): w.hub = None super(Hub, self).__init__(w, **kwargs) def include_if(self, w): return w.use_eventloop # 通过是否Worker中的use_eventloop判断是否启动 def create(self, w): w.hub = get_event_loop() # 获取loop if w.hub is None: # 如果为空 required_hub = getattr(w._conninfo, 'requires_hub', None) # 获取连接中的requires_hub属性 w.hub = set_event_loop(( required_hub if required_hub else _Hub)(w.timer)) # 设置loop self._patch_thread_primitives(w) return self

剩余的类留待下文继续分析,本文的分析到此为止。

本文总结

本文主要讲述了celery与django框架的配合使用,并分析了celery的worker的启动过程,鉴于启动流程较繁琐,目前只分析到了Blueprint的apply的过程,并且还有相关类没有分析,留待下文继续探讨相关的执行过程。

转载请注明原文地址: https://www.6miu.com/read-2623475.html

最新回复(0)