Celery 源码分析(九): Timer & Eventloop
补充 Timer & Eventloop
前面我们详细的讲了一个任务的执行周期,并且还讲到了一个任务在执行的过程中与其他组件的联系。但是有两个组件我们只是一笔带过了,那就是Eventloop
和Timer
组件。
之所以把这篇文章作为补充,而不是在Consumer子组件那一章一起写了,最主要的原因是,timer
的一个比较经典的应用场景是延时任务的执行,而Eventloop
是实现延时任务的一个重要的部分。如果我们对于Task
的消费流程一知半解的话,要理解这快的内容是有一点点麻烦的。直接进入正题:
前面我们分析了一个完整Task任务的生命周期,从delay
开始,从task_message_handler
结束,但是对celery比较熟悉的人可能会敏感的发现,在那一篇文章中,我们实验使用的task都是立即执行的,但是问题是,celery也同样提高了一个额外的参数countdown
, 允许我们指定这个任务在一段时间后才会执行。而这样的任务显然Worker收到之后显然是不能直接就消费的,不然延时
的效果何在? 也不能直接在主进程slepp
等待这个任务到时间了才执行。
现在对于实现这样一种特殊的任务,我们从生产者和消费者的角度各自出发,会产生两种截然不同的方案:
- 从生产者的角度出发: 此刻Worker的处理逻辑不变,收到消息立即消费。但是生产者的逻辑就有一定的变化了。当生产者发现一个带
countdown
参数的延时任务产生时,我并不直接就往任务队列里面扔,而是我维护一个延时任务队列 ,我先扔延时队列里面去,然后时间到了扔到任务队列里面去。确实也可以。 - 从消费者也就是
worker
到角度出发,生产者的处理逻辑不变,生产任务直接丢到任务队列里面。但是worker
的处理逻辑变了,当worker收到一个延时任务时,注意,我并不直接消费它。而是把它扔到worker维护的一个延时任务队列里面,当任务时间到的时候,再去执行。
本质上实现的思路是一样的,只是实现的位置不同。
第二个问题就是,定时任务,那不就是Beat
的工作吗,能这么想的朋友肯定是把延时任务
和定时任务
搞混了,定时任务是每隔一段时间,这个任务就会被重复执行一次。而延时任务,本质上是和普通的任务没什么区别,都是由用户发起的,只是时间上稍微往后延长了那么一点点,但是不管延期多久,这个任务只会执行一次,它并不是一个周期性质的东西。也就是我们没有办法用Beat
去实现这样的逻辑。
搞清楚这个概念之后,我们只需要确认Celery的延时任务是在消费者端实现的还是在生产者端实现的就行了。如何判别, 排除法,我们先看看生产者相关的逻辑:
def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
link=None, link_error=None, shadow=None, **options):
"""省略部分代码逻辑"""
app = self._get_app()
if app.conf.task_always_eager:
with app.producer_or_acquire(producer) as eager_producer:
serializer = options.get('serializer')
if serializer is None:
if eager_producer.serializer:
serializer = eager_producer.serializer
else:
serializer = app.conf.task_serializer
body = args, kwargs
content_type, content_encoding, data = serialization.dumps(
body, serializer,
)
args, kwargs = serialization.loads(
data, content_type, content_encoding,
accept=[content_type]
)
with denied_join_result():
return self.apply(args, kwargs, task_id=task_id or uuid(),
link=link, link_error=link_error, **options)
else:
return app.send_task(
self.name, args, kwargs, task_id=task_id, producer=producer,
link=link, link_error=link_error, result_cls=self.AsyncResult,
shadow=shadow, task_type=self,
**options
)
def send_task(self, name, args=None, kwargs=None, countdown=None,
eta=None, task_id=None, producer=None, connection=None,
router=None, result_cls=None, expires=None,
publisher=None, link=None, link_error=None,
add_to_parent=True, group_id=None, retries=0, chord=None,
reply_to=None, time_limit=None, soft_time_limit=None,
root_id=None, parent_id=None, route_name=None,
parent = have_parent = None
amqp = self.amqp
task_id = task_id or uuid()
producer = producer or publisher # XXX compat
router = router or amqp.router
conf = self.conf
if conf.task_always_eager: # pragma: no cover
warnings.warn(AlwaysEagerIgnored(
'task_always_eager has no effect on send_task',
), stacklevel=2)
ignored_result = options.pop('ignore_result', False)
options = router.route(
options, route_name or name, args, kwargs, task_type)
if not root_id or not parent_id:
parent = self.current_worker_task
if parent:
if not root_id:
root_id = parent.request.root_id or parent.request.id
if not parent_id:
parent_id = parent.request.id
if conf.task_inherit_parent_priority:
options.setdefault('priority',
parent.request.delivery_info.get('priority'))
message = amqp.create_task_message(
task_id, name, args, kwargs, countdown, eta, group_id,
expires, retries, chord,
maybe_list(link), maybe_list(link_error),
reply_to or self.oid, time_limit, soft_time_limit,
self.conf.task_send_sent_event,
root_id, parent_id, shadow, chain,
argsrepr=options.get('argsrepr'),
kwargsrepr=options.get('kwargsrepr'),
)
"""省略部分代码逻辑"""
return result
我们发现生产者貌似啥也没干,拿到直接就send_task
发送出去了, 事实上经过调试确实发现,延时任务发出去worker
很快就收到了, 所以这也就说明了,延时任务的方案是在worker
内部实现的。
现在我们需要准备一个延时任务, 并让延时五分钟执行,这样我们比较容易观察。
add.apply_async(args=[1,2], countdown=300)
然后继续回到我们Worker对于Task的处理逻辑上去:
def task_message_handler(message, body, ack, reject, callbacks,
to_timestamp=to_timestamp):
"""省略部分代码逻辑"""
req = Req(
message,
on_ack=ack, on_reject=reject, app=app, hostname=hostname,
eventer=eventer, task=task, connection_errors=connection_errors,
body=body, headers=headers, decoded=decoded, utc=utc,
)
if _does_info:
info('Received task: %s', req)
if (req.expires or req.id in revoked_tasks) and req.revoked():
return
signals.task_received.send(sender=consumer, request=req)
if task_sends_events:
send_event(
'task-received',
uuid=req.id, name=req.name,
args=req.argsrepr, kwargs=req.kwargsrepr,
root_id=req.root_id, parent_id=req.parent_id,
retries=req.request_dict.get('retries', 0),
eta=req.eta and req.eta.isoformat(),
expires=req.expires and req.expires.isoformat(),
)
# 注意这里,千万要注意这里
bucket = None
eta = None
if req.eta:
try:
if req.utc:
eta = to_timestamp(to_system_tz(req.eta))
else:
eta = to_timestamp(req.eta, app.timezone)
except (OverflowError, ValueError) as exc:
error("Couldn't convert ETA %r to timestamp: %r. Task: %r",
req.eta, exc, req.info(safe=True), exc_info=True)
req.reject(requeue=False)
if rate_limits_enabled:
bucket = get_bucket(task.name)
if eta and bucket:
consumer.qos.increment_eventually()
return call_at(eta, limit_post_eta, (req, bucket, 1),
priority=6)
# 常规的定时任务会走这快的逻辑
if eta:
consumer.qos.increment_eventually()
call_at(eta, apply_eta_task, (req,), priority=6)
return task_message_handler
if bucket:
return limit_task(req, bucket, 1)
task_reserved(req)
if callbacks:
[callback(req) for callback in callbacks]
handle(req)
return task_message_handler
经过调试我们发现,req eta:==> 2022-02-08 20:19:28.130377+08:00
刚好就是五分钟以后的时间。然后我们发现它把时间转成时间戳以后,就调用了call_at(eta, apply_eta_task, (req,), priority=6)
这个函数。
通过看它的上下文,我们发现call_at
指向的是: call_at = consumer.timer.call_at
所以,当woker收到一个五分钟之后执行的任务的时候,并没有选择直接处理(handle(req)
),而是直接丢给timer
了。
到这里,我们终于把timer
的概念正式的引入了我们任务的消费中。
下一步就很简单,我们需要知道这个timer
具体的指向是哪个类。根据经验,这个需要看Timer
组件的初始化逻辑。
class Timer(bootsteps.Step):
"""Timer bootstep."""
def create(self, w):
if w.use_eventloop:
# does not use dedicated timer thread.
w.timer = _Timer(max_interval=10.0)
else:
if not w.timer_cls:
# Default Timer is set by the pool, as for example, the
# eventlet pool needs a custom timer implementation.
w.timer_cls = w.pool_cls.Timer
w.timer = self.instantiate(w.timer_cls,
max_interval=w.timer_precision,
on_error=self.on_timer_error,
on_tick=self.on_timer_tick)
我们发现,如果开启了 use_eventloop
, 就使用 from kombu.asynchronous.timer import Timer as _Timer
如果没开启,用户也没指定,则使用w.pool_cls.Timer
, 通过前面的知识,我们很容易就可以定位到celery.utils.timer2.Timer
默认的话是开 use_eventloop
的,所以使用的是kombu.asynchronous.timer.Timer
注意这个参数,max_interval
, 是不是非常的熟悉? 是吧,随着代码的深入你会越来越感到熟悉的。
点进到kombu.asynchronous.timer.Timer
的call_at
函数:
def call_at(self, eta, fun, args=(), kwargs=None, priority=0):
kwargs = {} if not kwargs else kwargs
return self.enter_at(self.Entry(fun, args, kwargs), eta, priority)
def enter_at(self, entry, eta=None, priority=0, time=monotonic):
"""Enter function into the scheduler.
Arguments:
entry (~kombu.asynchronous.timer.Entry): Item to enter.
eta (datetime.datetime): Scheduled time.
priority (int): Unused.
"""
if eta is None:
eta = time()
if isinstance(eta, datetime):
try:
eta = to_timestamp(eta)
except Exception as exc:
if not self.handle_error(exc):
raise
return
return self._enter(eta, priority, entry)
def _enter(self, eta, priority, entry, push=heapq.heappush):
push(self._queue, scheduled(eta, priority, entry))
return entry
我说什么来着,我说什么来着,果然诶,扔进去了一个队列里面。看来我们在开头的猜测是初步吻合的,看下有没有执行任务相关的函数。
def apply_entry(self, entry):
"""
任务就是在这里执行的
"""
try:
entry()
except Exception as exc:
if not self.handle_error(exc):
logger.error('Error in timer: %r', exc, exc_info=True)
def __iter__(self, min=min, nowfun=monotonic,
pop=heapq.heappop, push=heapq.heappush):
# 看到重写了一个迭代器的函数
max_interval = self.max_interval
queue = self._queue
while 1:
if queue:
eventA = queue[0]
now, eta = nowfun(), eventA[0]
if now < eta:
yield min(eta - now, max_interval), None
else:
eventB = pop(queue)
if eventB is eventA:
entry = eventA[2]
if not entry.canceled:
yield None, entry
continue
else:
push(queue, eventB)
else:
yield None, None
大致看了下,这快的逻辑其实非常的简单,yield 后面第一个参数是下次执行的时间的间隔,最大为max_interval
, 第二个是 任务的实例. 当返回时间间隔的时候,说明任务还没到执行时间呢,等着吧,所以任务实例是None
, 当到时间的时候,直接就返回任务执行了,这个时候时间间隔是None
,任务是entry
。
妙哇,这不就是celery beat
的逻辑吗,实际上差不多, 因为这里的队列实际上是一个有序队列pop=heapq.heappop
但是我们别忘了一件事, celery的Schedule
是由Service
驱动的,但是到现在我们仍然不知道,谁在外层调用这个Timer
, 答案就是Eventloop
, Eventloop
在这个场景中,正是充当了Service
的作用。
接下来我们去看Eventloop
相关的初始化逻辑, 别找Eventloop了,还记得我们在Celery基础架构那一章的一句介绍吗:
Hub: Eventloop 的封装对象,
所以我们应该去找Hub的初始化逻辑,不找不知道,一找吓一跳。果然和timer
有关系
class Hub(bootsteps.StartStopStep):
"""Worker starts the event loop."""
requires = (Timer,)
"""
忽略部分代码
"""
def create(self, w):
w.hub = get_event_loop()
if w.hub is None:
required_hub = getattr(w._conninfo, 'requires_hub', None)
#. 重点关注这一句
w.hub = set_event_loop((
required_hub if required_hub else _Hub)(w.timer))
self._patch_thread_primitives(w)
return self
点进去_Hub
对象,看看里面有没有什么具体的逻辑: 位置: kombu.asynchronous.hub.Hub
:
有意思的地方来了,发现了一个叫create_loop
的函数:
def create_loop(self,
generator=generator, sleep=sleep, min=min, next=next,
Empty=Empty, StopIteration=StopIteration,
KeyError=KeyError, READ=READ, WRITE=WRITE, ERR=ERR):
traceback.print_stack()
readers, writers = self.readers, self.writers
poll = self.poller.poll
fire_timers = self.fire_timers
hub_remove = self.remove
scheduled = self.timer._queue
consolidate = self.consolidate
consolidate_callback = self.consolidate_callback
on_tick = self.on_tick
propagate = self.propagate_errors
while 1:
todo = self._ready
self._ready = set()
for tick_callback in on_tick:
tick_callback()
for item in todo:
if item:
item()
poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
# print('[[[HUB]]]: %s' % (self.repr_active(),))
if readers or writers:
to_consolidate = []
try:
events = poll(poll_timeout)
# print('[EVENTS]: %s' % (self.repr_events(events),))
except ValueError: # Issue celery/#882
return
"""
忽略部分代码
"""
else:
# 没有链接之后的重试逻辑
# no sockets yet, startup is probably not done.
sleep(min(poll_timeout, 0.1))
yield
大家只需要注意一个地方 fire_timers
,
fire_timers
里面则是执行了用户的任务.
def fire_timers(self, min_delay=1, max_delay=10, max_timers=10,
propagate=()):
timer = self.timer
delay = None
if timer and timer._queue:
# max_timers 每次消费几个
for i in range(max_timers):
delay, entry = next(self.scheduler)
if entry is None:
break
try:
# 这里执行了任务
entry()
except propagate:
raise
except (MemoryError, AssertionError):
raise
except OSError as exc:
if exc.errno == errno.ENOMEM:
raise
logger.error('Error in timer: %r', exc, exc_info=1)
except Exception as exc:
logger.error('Error in timer: %r', exc, exc_info=1)
return min(delay or min_delay, max_delay)
而至于Hub
的create_loop
什么时候被执行的,则是在 Consumer
的子组件Eventloop
的初始化逻辑里面。
ef asynloop(obj, connection, consumer, blueprint, hub, qos,
heartbeat, clock, hbrate=2.0):
"""Non-blocking event loop."""
loop = hub.create_loop()
同样的,这个eventloop的最大作用更多的也就是帮我们把Worker
阻塞住,别退出了。
这个系列写到这里,本身应该大多数的内容都已经讲清楚了,但貌似总感觉缺了点什么,我们知道Celery
向Kombu
注册了一个回调函数,然后Kombu
收到消息之后就扔给Celery
处理。但是,回调
是何时发生的? 在哪里执行的回调? 对此我们一无所知,而且为什么放到这一章节呢?和Eventloop 或者 Timer 有什么关系?
为了解开这些疑问,于是我继续往后看,让我们继续回到asynloop
这个函数中,再一次看下这个函数的内容是什么。
def asynloop(obj, connection, consumer, blueprint, hub, qos,
heartbeat, clock, hbrate=2.0):
RUN = bootsteps.RUN
update_qos = qos.update
errors = connection.connection_errors
on_task_received = obj.create_task_handler()
_enable_amqheartbeats(hub.timer, connection, rate=hbrate)
consumer.on_message = on_task_received
obj.controller.register_with_event_loop(hub)
obj.register_with_event_loop(hub)
consumer.consume()
obj.on_ready()
if not obj.restart_count and not obj.pool.did_start_ok():
raise WorkerLostError('Could not start worker processes')
if connection.transport.driver_type == 'amqp':
hub.call_soon(_quick_drain, connection)
hub.propagate_errors = errors
loop = hub.create_loop()
try:
# 这个大循环就是为了防止 celery 进程退出
while blueprint.state == RUN and obj.connection:
state.maybe_shutdown()
if qos.prev != qos.value:
update_qos()
try:
next(loop)
except StopIteration:
loop = hub.create_loop()
finally:
try:
hub.reset()
except Exception as exc: # pylint: disable=broad-except
logger.exception(
'Error cleaning up after event loop: %r', exc)
现在我们已经非常清楚正是在asynloop
这个函数里面,我们绑定了我们的回调方法。也正是在这里我们执行了create_loop
的操作。之前我的猜测是,底层的kombu
起了一个新的进程or线程
去监听消息队列,有消息后进行回调,但是我发现并不是这样,我发现:
on_task_received
和 asynloop
同属一个线程ID
, 这说明他们是在同一个上下文中进行的。有了这个重要的线索,结合代码上下文,最终可以得出一个结论那就是回调函数实际上是在 create_loop
中执行的。具体在哪里呢? 通过堆栈信息,我们最终定位到了:
def create_loop(self,
generator=generator, sleep=sleep, min=min, next=next,
Empty=Empty, StopIteration=StopIteration,
KeyError=KeyError, READ=READ, WRITE=WRITE, ERR=ERR):
readers, writers = self.readers, self.writers
poll = self.poller.poll
fire_timers = self.fire_timers
hub_remove = self.remove
scheduled = self.timer._queue
consolidate = self.consolidate
consolidate_callback = self.consolidate_callback
on_tick = self.on_tick
propagate = self.propagate_errors
while 1:
todo = self._ready
self._ready = set()
for tick_callback in on_tick:
tick_callback()
for item in todo:
if item:
item()
poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
# print('[[[HUB]]]: %s' % (self.repr_active(),))
if readers or writers:
to_consolidate = []
try:
events = poll(poll_timeout)
# print('[EVENTS]: %s' % (self.repr_events(events),))
except ValueError: # Issue celery/#882
return
for fd, event in events or ():
general_error = False
if fd in consolidate and \
writers.get(fd) is None:
to_consolidate.append(fd)
continue
cb = cbargs = None
if event & READ:
try:
cb, cbargs = readers[fd]
except KeyError:
self.remove_reader(fd)
continue
elif event & WRITE:
try:
cb, cbargs = writers[fd]
except KeyError:
self.remove_writer(fd)
continue
elif event & ERR:
general_error = True
else:
logger.info(W_UNKNOWN_EVENT, event, fd)
general_error = True
if general_error:
try:
cb, cbargs = (readers.get(fd) or
writers.get(fd))
except TypeError:
pass
if cb is None:
self.remove(fd)
continue
if isinstance(cb, generator):
try:
next(cb)
except OSError as exc:
if exc.errno != errno.EBADF:
raise
hub_remove(fd)
except StopIteration:
pass
except Exception:
hub_remove(fd)
raise
else:
try:
# 这个位置,注意这个位置
cb(*cbargs)
except Empty:
pass
if to_consolidate:
consolidate_callback(to_consolidate)
else:
# no sockets yet, startup is probably not done.
sleep(min(poll_timeout, 0.1))
yield
cb(*cbargs)
这行代码是最关键的,我们发现,在处理定时任务的同时,也把消息的接受回调在个函数完成了。就是: 收到消息,回调,发现是个延时任务,再回来 create_loop函数处理,真不愧是个loop
。延时任务陷入循环了这是。
Kombu 消息回调过程:
Kombu 基本概念:
Conection
: kumbu 对 AMQP 连接到封装Chanel
: 是 AMQP 对 MQ 的操作的封装;TransPort
: Transport 负责具体的 MQ 的操作,也就是说 Channel 的操作都会落到 Transport 上执行
我们简单梳理一下if readers or writers:
这行代码之后的逻辑,大概就是获取 到我们对应的消息队列的 readers
和 writers
。应该就是对应的读和写。然后呢拿到对应的文件句柄fb
。当然这个文件句柄什么的我只是猜的,确实没有什么注释告诉我fd代表的含义和类型。然后根据事件类型去拿对应的cb
。这个cb啥意思我是真的猜不出来了,应该不是callback
的意思。然后执行这个cb
对象。 大概就是这样,至于具体的类型,需要调试确认一下:
events: [(12, 1), (18, 1), (20, 1)] 其中: 1表示是一个read事件
cb: <bound method Transport.on_readable of <kombu.transport.redis.Transport object at 0x7fed6f9c5630>>
value: 12
进入on_readable
, 注意,现在我们已经进入到另外一个框架了。
def on_readable(self, fileno):
chan, type = self._fd_to_chan[fileno]
if chan.qos.can_consume():
chan.handlers[type]()
# 这里的 chan 指的就是我们对应的 Chanel了 `kombu.transport.redis.Channel`
self.handlers = {'BRPOP': self._brpop_read, 'LISTEN': self._receive}
这里呢,根据 type
的不同讲操作路由到对应的函数里面,当有新的任务过来的时候,type
的值等于BRPOP
。我们进入到这个方法看看:
def _brpop_read(self, **options):
try:
try:
dest__item = self.client.parse_response(self.client.connection,
'BRPOP',
**options)
# dest_item (b'default', b'{"body": "gAJLAUsChnEAfXEBfXECKFgJAAAAY2FsbGJhY2tzcQNOWAgAAABlcnJiYWNrc3EETlgFAAAAY2hhaW5xBU5YBQAAAGNob3JkcQZOdYdxBy4=", "content-encoding": "binary", "content-type": "application/x-python-serialize", "headers": {"lang": "py", "task": "itsm.ticket.tasks.add", "id": "41db8a03-6e46-4d72-847c-531b6950d02e", "shadow": null, "eta": null, "expires": null, "group": null, "retries": 0, "timelimit": [null, null], "root_id": "41db8a03-6e46-4d72-847c-531b6950d02e", "parent_id": null, "argsrepr": "(1, 2)", "kwargsrepr": "{}", "origin": "gen59658@MARKHAN-MB0"}, "properties": {"correlation_id": "41db8a03-6e46-4d72-847c-531b6950d02e", "reply_to": "8e55fb0b-6071-335b-ac59-c1e545ce3e7d", "delivery_mode": 2, "delivery_info": {"exchange": "", "routing_key": "default"}, "priority": 0, "body_encoding": "base64", "delivery_tag": "12d20e40-da7a-474d-9680-5b4e93882a20"}}')
except self.connection_errors:
self.client.connection.disconnect()
raise
if dest__item:
dest, item = dest__item
dest = bytes_to_str(dest).rsplit(self.sep, 1)[0]
self._queue_cycle.rotate(dest)
self.connection._deliver(loads(bytes_to_str(item)), dest)
return True
else:
raise Empty()
finally:
self._in_poll = None
注意,self.connection._deliver(loads(bytes_to_str(item)), dest)
这一行代码,这行代码就是开始回调的入口了。
kombu.transport.virtual.base.Transport._deliver
, 此刻我们的消息再次进入到了Transport
, 我们之前说过Chanel
是对MQ的操作的封装。此刻这里的callback
则是对应的是``<function Channel.basic_consume.._callback>
def _deliver(self, message, queue):
if not queue:
raise KeyError(
'Received message without destination queue: {0}'.format(
message))
try:
callback = self._callbacks[queue]
except KeyError:
logger.warning(W_NO_CONSUMERS, queue)
self._reject_inbound_message(message)
else:
callback(message)
点进去callback
函数看看:
def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
"""Consume from `queue`."""
self._tag_to_queue[consumer_tag] = queue
self._active_queues.append(queue)
def _callback(raw_message):
message = self.Message(raw_message, channel=self)
if not no_ack:
self.qos.append(message, message.delivery_tag)
return callback(message)
self.connection._callbacks[queue] = _callback
self._consumers.add(consumer_tag)
self._reset_cycle()
我们发现最后手调用到了callback
,当然这里的callback
仍然不是我们之前注册的回调函数, 而是:
<bound method Consumer._receive_callback
。位置是, kombu.messaging.Consumer._receive_callback
def _receive_callback(self, message):
accept = self.accept
on_m, channel, decoded = self.on_message, self.channel, None
try:
m2p = getattr(channel, 'message_to_python', None)
if m2p:
message = m2p(message)
if accept is not None:
message.accept = accept
if message.errors:
return message._reraise_error(self.on_decode_error)
decoded = None if on_m else message.decode()
except Exception as exc:
if not self.on_decode_error:
raise
self.on_decode_error(message, exc)
else:
return on_m(message) if on_m else self.receive(decoded, message)
现在让我们回到之前注册回调函数的位置:
consumer.on_message = on_task_received
obj.controller.register_with_event_loop(hub)
obj.register_with_event_loop(hub)
consumer.consume()
obj.on_ready()
注意到这个on_message
了吗? 在最后的on_m(message)
实际上就是回调了我们的on_task_received
到此,消息就完成了一个完美的闭环
转载自:https://juejin.cn/post/7352792335904423963