前言
本著 「路漫漫其修遠(yuǎn)兮, 吾將上下而求索」 的精神。終于要開始深入研究 Python 中 asyncio 的源碼實(shí)現(xiàn)啦。
本文章可能篇幅較長(zhǎng),因?yàn)槭侵鹦蟹治?asyncio 的實(shí)現(xiàn),也需要讀者具有一定的 asyncio 編碼經(jīng)驗(yàn)和功底,推薦剛開始踏上 Python 異步編程之旅的朋友們可以先從官方文檔入手,由淺入深步步為營(yíng)。
若在讀的你對(duì)此感興趣,那么很開心能與你分享我的學(xué)習(xí)成果。
本次源碼分析將在 Python 3.11.3 的版本上進(jìn)行探索。
PS: 筆者功力有限,若有不足之處還望及時(shí)指正,因?yàn)槭侵鹦蟹治鏊赃^(guò)程稍顯枯燥。
更建議屏幕前的你打開 source code 跟隨整篇文章花費(fèi)一定的時(shí)間一起研究,盡信書不如無(wú)書,對(duì)此文持以質(zhì)疑的態(tài)度去閱讀將有更大的收獲。
全局代碼
在 Python 中,當(dāng)一個(gè)模塊被導(dǎo)入時(shí),Python 解釋器會(huì)執(zhí)行該模塊中的全局代碼。
而全局代碼則是指在模塊中未被封裝在函數(shù)或類中的代碼,它們會(huì)在模塊被導(dǎo)入時(shí)率先執(zhí)行。
這意味著全局代碼可以包括變量的初始化、函數(shù)的定義、類的定義、條件語(yǔ)句、循環(huán)等。這些代碼在模塊被導(dǎo)入時(shí)執(zhí)行,用于設(shè)置模塊的初始狀態(tài)或執(zhí)行一些必要的操作。
查看源碼時(shí),一定不要忽略全局代碼。
PS: 一個(gè)小技巧,查看全局代碼最好的辦法就是將所有的 fold 都先收起來(lái),vim 中使用 zM 快捷鍵即可。
導(dǎo)入模塊
研究任何一個(gè)模塊,我們需先從 import 開始,因?yàn)槟抢锏拇a會(huì)率先執(zhí)行:
import asyncio
點(diǎn)進(jìn) asyncio 模塊之后,可以發(fā)現(xiàn)它的入口文件 __init__ 篇幅是較為簡(jiǎn)短的:
import sys
from .base_events import *
from .coroutines import *
from .events import *
from .exceptions import *
from .futures import *
from .locks import *
from .protocols import *
from .runners import *
from .queues import *
from .streams import *
from .subprocess import *
from .tasks import *
from .taskgroups import *
from .timeouts import *
from .threads import *
from .transports import *
# __all__ 指的是 from asyncio import *
# 時(shí) * 所包含的資源
__all__ = (base_events.__all__ +
coroutines.__all__ +
events.__all__ +
exceptions.__all__ +
futures.__all__ +
locks.__all__ +
protocols.__all__ +
runners.__all__ +
queues.__all__ +
streams.__all__ +
subprocess.__all__ +
tasks.__all__ +
threads.__all__ +
timeouts.__all__ +
transports.__all__)
# 若是 win32 平臺(tái), 則添加 windows_events 中的 __all__
if sys.platform == 'win32':
from .windows_events import *
__all__ += windows_events.__all__
# 若是 unix 平臺(tái), 則添加 windows_events 中的 __all__
else:
from .unix_events import *
__all__ += unix_events.__all__
base_events
base_events 是在 asyncio 入口文件中第一個(gè)被 import 的模塊,提供了一些基本的類和設(shè)置項(xiàng),如 BaseEventLoop 以及 Server 等等 ...
base_events 中全局執(zhí)行的代碼不多,以下是其導(dǎo)入的 build-in package:
import collections
import collections.abc
import concurrent.futures
import functools
import heapq
import itertools
import os
import socket
import stat
import subprocess
import threading
import time
import traceback
import sys
import warnings
import weakref
try:
import ssl
except ImportError: # pragma: no cover
ssl = None
自定義的 package:
from . import constants
from . import coroutines
from . import events
from . import exceptions
from . import futures
from . import protocols
from . import sslproto
from . import staggered
from . import tasks
from . import transports
from . import trsock
from .log import logger
關(guān)注幾個(gè)有用的信息點(diǎn):
# 該模塊只允許通過(guò) * 導(dǎo)入 BaseEventLoop 以及 Server 類
__all__ = 'BaseEventLoop','Server',
# 定義異步事件循環(huán)中允許的最小計(jì)劃定時(shí)器句柄數(shù)
# loop.call_later() 以及 loop.call_at() 都是在創(chuàng)建定時(shí)器句柄
# 當(dāng)計(jì)劃定時(shí)器句柄的數(shù)量低于該值,事件循環(huán)可能會(huì)采取一些優(yōu)化措施
# 例如減少時(shí)間片的分配或合并定時(shí)器句柄,以提高性能和效率
_MIN_SCHEDULED_TIMER_HANDLES = 100
# 定義了被取消的定時(shí)器句柄數(shù)量與總計(jì)劃定時(shí)器句柄數(shù)量之間的最小比例
# 如果取消的定時(shí)器句柄數(shù)量超過(guò)了計(jì)劃定時(shí)器句柄數(shù)量的這個(gè)比例
# 事件循環(huán)可能會(huì)采取一些優(yōu)化措施,例如重新分配或重新排序定時(shí)器句柄列表,以提高性能和效率
_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
# 一個(gè)布爾值, 用來(lái)判斷當(dāng)前 socket 是否支持 IPV6
_HAS_IPv6 = hasattr(socket, 'AF_INET6')
# 事件循環(huán) SELECT 時(shí)的等待事件
MAXIMUM_SELECT_TIMEOUT = 24 * 3600
除此之外,還有關(guān)于 socket 部分的:
# 當(dāng)前的 socket 模塊是否具有非延遲特性
if hasattr(socket, 'TCP_NODELAY'):
def _set_nodelay(sock):
if (sock.family in {socket.AF_INET, socket.AF_INET6} and
sock.type == socket.SOCK_STREAM and
sock.proto == socket.IPPROTO_TCP):
# 啟用 tcp 協(xié)議非延遲特性,即禁用 Nagle 算法
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
else:
def _set_nodelay(sock):
pass
constants
constants 是在 base_events 中第一個(gè)被 import 的。其作用是定義一些通過(guò) asyncio 進(jìn)行網(wǎng)絡(luò)編程時(shí)的常量數(shù)據(jù)。
它的源碼雖然簡(jiǎn)單但涉及知識(shí)面較廣,基本是與網(wǎng)絡(luò)編程相關(guān)的,若想深入研究還需下一陣苦功夫:
import enum
# 在使用 asyncio 進(jìn)行網(wǎng)絡(luò)編程時(shí)
# 寫入操作多次失?。ㄈ珂溄觼G失的情況下)記錄一條 warning 日志
LOG_THRESHOLD_FOR_CONNLOST_WRITES = 5
# 在使用 asyncio 進(jìn)行網(wǎng)絡(luò)編程時(shí)
# 若對(duì)方斷聯(lián)、則在重試 accept 之前等待的秒數(shù)
# 參見 selector_events._accept_connection() 具體實(shí)現(xiàn)
ACCEPT_RETRY_DELAY = 1
# 在 asyncio debug 模式下需捕獲的堆棧條目數(shù)量(數(shù)量越大,運(yùn)行越慢)
# 旨在方便的為開發(fā)人員追蹤問題,例如找出事件循環(huán)中的協(xié)程或回調(diào)的調(diào)用路徑
DEBUG_STACK_DEPTH = 10
# 在使用 asyncio 進(jìn)行網(wǎng)絡(luò)編程時(shí)
# SSL/TSL 加密通信握手時(shí)可能會(huì)產(chǎn)生斷聯(lián)或失敗
# 而該常量是指等待 SSL 握手完成的秒數(shù),他和 Nginx 的 timeout 匹配
SSL_HANDSHAKE_TIMEOUT = 60.0
# 在使用 asyncio 進(jìn)行網(wǎng)絡(luò)編程時(shí)
# 等待 SSL 關(guān)閉完成的秒數(shù)
# 如 asyncio.start_server() 方法以及 asyncio.start_tls() 方法
# 在鏈接關(guān)閉后,會(huì)使用 n 秒來(lái)進(jìn)行確認(rèn)對(duì)方已經(jīng)成功關(guān)閉了鏈接
# 若在 n 秒內(nèi)未得到確認(rèn),則引發(fā) TimeoutError
SSL_SHUTDOWN_TIMEOUT = 30.0
# 在使用 loop.sendfile() 方法傳輸文件時(shí)
# 后備緩沖區(qū)的大?。ㄓ行┪募到y(tǒng)不支持零拷貝,因此需要一個(gè)緩沖區(qū))
SENDFILE_FALLBACK_READBUFFER_SIZE = 1024 * 256
# 當(dāng)在 SSL/TSL 握手期間,若 read 的內(nèi)核緩沖區(qū)數(shù)據(jù)大小
# 超過(guò)了下面設(shè)定的值,則會(huì)等待其內(nèi)核緩沖區(qū)大小降低后
# 再次進(jìn)行 read
FLOW_CONTROL_HIGH_WATER_SSL_READ = 256 # KiB
# 同上,只不過(guò)這個(gè)是寫入的上限流量閾值
FLOW_CONTROL_HIGH_WATER_SSL_WRITE = 512 # KiB
class _SendfileMode(enum.Enum):
UNSUPPORTED = enum.auto()
TRY_NATIVE = enum.auto()
FALLBACK = enum.auto()
coroutines
coroutines 是在 base_events 中第二個(gè)被 import 的。其作用是提供一些布爾的判定接口,如判斷對(duì)象是否是 coroutine、當(dāng)前是否是 debug 模式等等。
其全局代碼不多,暫可不必太過(guò)關(guān)注:
# 該模塊只允許通過(guò) * 導(dǎo)入 iscoroutinefunction 以及 iscoroutine 函數(shù)
__all__ = 'iscoroutinefunction', 'iscoroutine'
# ...
_is_coroutine = object()
# 優(yōu)先檢查原生協(xié)程以加快速度
# asyncio.iscoroutine
_COROUTINE_TYPES = (types.CoroutineType, types.GeneratorType,
collections.abc.Coroutine)
_iscoroutine_typecache = set()
events
events 是在 base_events 中第三個(gè)被 import 的。其作為是定義一些與事件循環(huán)相關(guān)的高級(jí)接口或定義一些事件循環(huán)的抽象基類供內(nèi)部或開發(fā)者使用。
注意他在這里還 import 了自定義模塊 format_helpers,但是 format_helpers 中并未有任何運(yùn)行的全局代碼,所以后面直接略過(guò)了:
from . import format_helpers
以下是它的全局代碼:
# __all__ 中難免會(huì)看到一些熟悉的身影
# 比如 get_event_loop get_running_loop 等等
__all__ = (
'AbstractEventLoopPolicy',
'AbstractEventLoop', 'AbstractServer',
'Handle', 'TimerHandle',
'get_event_loop_policy', 'set_event_loop_policy',
'get_event_loop', 'set_event_loop', 'new_event_loop',
'get_child_watcher', 'set_child_watcher',
'_set_running_loop', 'get_running_loop',
'_get_running_loop',
)
# ...
# 該變量有 2 個(gè)作用
# 分別是決定如何創(chuàng)建和獲取事件循環(huán)對(duì)象
# - 比如一個(gè)線程一個(gè)事件循環(huán)
# - 或者一個(gè)任務(wù)一個(gè)事件循環(huán)
# 再者就是獲取事件循環(huán),通過(guò) get_event_loop_policy 方法即可拿到該變量
_event_loop_policy = None
# 一把線程鎖、用于保護(hù)事件循環(huán)策略的實(shí)例化
_lock = threading.Lock()
# ...
class _RunningLoop(threading.local):
loop_pid = (None, None)
# 這個(gè)好像是獲取以及設(shè)置當(dāng)前的 running loop,由 _get_running_loop 使用。
# 它將 loop 和 pid 進(jìn)行綁定
_running_loop = _RunningLoop()
# 為了一些測(cè)試而取的以 _py 開始的別名
_py__get_running_loop = _get_running_loop
_py__set_running_loop = _set_running_loop
_py_get_running_loop = get_running_loop
_py_get_event_loop = get_event_loop
_py__get_event_loop = _get_event_loop
try:
# 純注釋翻譯:
# get_event_loop() 是最常調(diào)用的方法之一
# 異步函數(shù)。純 Python 實(shí)現(xiàn)是
# 大約比 C 加速慢 4 倍。
# PS: C 語(yǔ)言的部分就先暫時(shí)不看了
from _asyncio import (_get_running_loop, _set_running_loop,
get_running_loop, get_event_loop, _get_event_loop)
except ImportError:
pass
else:
# 為了一些測(cè)試而取的以 _c 開始的別名
_c__get_running_loop = _get_running_loop
_c__set_running_loop = _set_running_loop
_c_get_running_loop = get_running_loop
_c_get_event_loop = get_event_loop
_c__get_event_loop = _get_event_loop
exceptions
exceptions 是在 base_events 中第四個(gè)被 import 的。其主要作用是定義了一些異常類:
__all__ = ('BrokenBarrierError',
'CancelledError', 'InvalidStateError', 'TimeoutError',
'IncompleteReadError', 'LimitOverrunError',
'SendfileNotAvailableError')
有些異常類中實(shí)現(xiàn)了 __reduce__() 方法。該方法允許自定義對(duì)象在被序列化或持久化過(guò)程中的狀態(tài)和重建方式。
示例:
import pickle
from typing import Any
def ser_fn(name):
return name
class Example:
def __init__(self, name) -> None:
self.name = name
def __reduce__(self) -> str | tuple[Any, ...]:
"""
反序列化時(shí),將調(diào)用 ser_fn 并且傳入?yún)?shù)
下面注釋的第一個(gè)例子是重新實(shí)例化一下
第二個(gè)例子是更直觀的演示該方法的作用
"""
# return (__class__, (self.name, ))
return (ser_fn, ("反序列化結(jié)果", ))
if __name__ == "__main__":
obj = Example("instance")
serializer = pickle.dumps(obj)
deserializer = pickle.loads(serializer)
print(deserializer)
# 反序列化結(jié)果
futures
futures 是在 base_events 中第五個(gè)被 import 的。其作用是定義了 asyncio 中未來(lái)對(duì)象的實(shí)現(xiàn)方式。
在看其全局代碼之前,首先推薦閱讀官方文檔:
該 futures 和 collections 的 futures 有些許區(qū)別,futures 也算是 Python 異步編程中比較難以理解的一個(gè)點(diǎn),后續(xù)有機(jī)會(huì)再和大家詳細(xì)探討。
futures 文件中導(dǎo)入了 base_futures 自定義模塊,但 base_futures 中暫時(shí)沒有值得關(guān)注的點(diǎn),所以先在此略過(guò):
from . import base_futures
# 下面 3 個(gè)都已經(jīng)粗略看過(guò)一次
from . import events
from . import exceptions
from . import format_helpers
其全局代碼如下:
# 一個(gè)函數(shù),用于判斷對(duì)象是否是一個(gè)未來(lái)對(duì)象
isfuture = base_futures.isfuture
# 用于表明未來(lái)對(duì)象的當(dāng)前一些狀態(tài)的標(biāo)志
# 分別是 等待執(zhí)行、取消執(zhí)行、完成執(zhí)行
_PENDING = base_futures._PENDING
_CANCELLED = base_futures._CANCELLED
_FINISHED = base_futures._FINISHED
# 棧的調(diào)試 LOG 級(jí)別
STACK_DEBUG = logging.DEBUG - 1 # heavy-duty debugging
# ...
class Future:
pass
_PyFuture = Future
# ...
try:
import _asyncio
except ImportError:
pass
else:
Future = _CFuture = _asyncio.Future
protocols
protocols 是在 base_events 中第六個(gè)被 import 的。其作用主要是定義一些內(nèi)部協(xié)議。
如 '緩沖區(qū)控制流協(xié)議'、'接口數(shù)據(jù)報(bào)協(xié)議'、'子進(jìn)程調(diào)用接口協(xié)議' 等等。
暫先關(guān)注其 __all__ 即可:
__all__ = (
'BaseProtocol', 'Protocol', 'DatagramProtocol',
'SubprocessProtocol', 'BufferedProtocol',
)
sslproto
sslproto 是在 base_events 中第七個(gè)被 import 的。其作用是定義和具體實(shí)現(xiàn) SSL/TLS 協(xié)議。
同默認(rèn)的 socket 模塊不同,asyncio 所提供的流式傳輸是已經(jīng)實(shí)現(xiàn)好了 SSL/TLS 協(xié)議功能的。
下面先對(duì) SSL/TLS 做一個(gè)簡(jiǎn)短的介紹。
SSL/TLS 是一個(gè)獨(dú)立的協(xié)議,其功能主要用于網(wǎng)絡(luò)通信的加密和安全,如數(shù)據(jù)加密、身份認(rèn)證等等。
TLS 的前身為 SSL 協(xié)議,是 SSL 的現(xiàn)代版和改進(jìn)版。
在 sslproto 中也導(dǎo)入了一些標(biāo)準(zhǔn)庫(kù)以及自定義的模塊:
import collections
import enum
import warnings
try:
import ssl
except ImportError: # pragma: no cover
ssl = None
# 自定義模塊
from . import constants
from . import exceptions
from . import protocols
# 下面 2 個(gè)還沒看過(guò),transports 可以大概瞅瞅,但 log 沒必要看
# 他本質(zhì)就是使用 logging 模塊獲得一個(gè) log 對(duì)象
# 名字就是當(dāng)前的 package name,即為 asyncio
from . import transports
from .log import logger
其全局代碼如下:
# ...
if ssl is not None:
SSLAgainErrors = (ssl.SSLWantReadError, ssl.SSLSyscallError)
除此之外,它還定義了一些類,如 'SSL 協(xié)議'、'應(yīng)用協(xié)議狀態(tài)' 等等,這里不做細(xì)述。
transports
transports 在 sslproto 文件中被導(dǎo)入,主要定義一些傳輸類。
如 '讀傳輸'、'寫傳輸'、'數(shù)據(jù)報(bào)傳輸'、'子進(jìn)程接口傳輸' 等等,和 protocols 中的協(xié)議類關(guān)系較為密切。
一般來(lái)說(shuō)若要基于 asyncio 進(jìn)行二次開發(fā),如開發(fā) http 協(xié)議的 web 服務(wù)程序等等,才會(huì)關(guān)注到這里。
其下很多代碼看不到具體實(shí)現(xiàn),直接看其 __all__ 變量吧:
__all__ = (
'BaseTransport', 'ReadTransport', 'WriteTransport',
'Transport', 'DatagramTransport', 'SubprocessTransport',
)
staggered
staggered 是在 base_events 中第八個(gè)被 import 的。其作用是如何支持正在運(yùn)行的協(xié)程在時(shí)間點(diǎn)中錯(cuò)開(主要針對(duì) socket 網(wǎng)絡(luò)編程)。
他實(shí)現(xiàn)了一個(gè)協(xié)程函數(shù) staggered_race 以及導(dǎo)入了一些內(nèi)部或自定義模塊:
__all__ = 'staggered_race',
import contextlib
import typing
from . import events
from . import exceptions as exceptions_mod
from . import locks
from . import tasks
async def staggered_race(...):
pass
locks
locks 在 staggered 中被導(dǎo)入,其作用是實(shí)現(xiàn)了一些協(xié)程鎖。
官方文檔:協(xié)程同步
具體有 '同步鎖'、'事件鎖'、'條件鎖'、'信號(hào)量鎖'、'有界信號(hào)量鎖'、'屏障鎖'。相比于 threading 少了 '遞歸鎖' 和一些其他的鎖。
__all__ = ('Lock', 'Event', 'Condition', 'Semaphore',
'BoundedSemaphore', 'Barrier')
mixins
mixins 在 locks 中被導(dǎo)入,其作用是提供一些工具集功能。
代碼量較少:
import threading
from . import events
# 實(shí)現(xiàn)一把全局的線程同步鎖
_global_lock = threading.Lock()
class _LoopBoundMixin:
_loop = None
def _get_loop(self):
loop = events._get_running_loop()
if self._loop is None:
with _global_lock:
if self._loop is None:
self._loop = loop
if loop is not self._loop:
raise RuntimeError(f'{self!r} is bound to a different event loop')
return loop
tasks
tasks 在 locks 中被導(dǎo)入,其作用是定義 Task 對(duì)象、提供一些管理 Task 對(duì)象的高級(jí)接口。
# 有很多熟悉的高級(jí)接口,均來(lái)自于 tasks 模塊
__all__ = (
'Task', 'create_task',
'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
'wait', 'wait_for', 'as_completed', 'sleep',
'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
'current_task', 'all_tasks',
'_register_task', '_unregister_task', '_enter_task', '_leave_task',
)
下面是它導(dǎo)入的內(nèi)置模塊和第三方模塊:
import concurrent.futures
import contextvars
import functools
import inspect
import itertools
import types
import warnings
import weakref
from types import GenericAlias
# 除了 base_tasks 其他都已經(jīng)全部 load 掉了
from . import base_tasks
from . import coroutines
from . import events
from . import exceptions
from . import futures
from .coroutines import _is_coroutine
其全局代碼有:
# 生成新的 task 時(shí)的命名計(jì)數(shù)器
# 這里不采用 +=1 的操作是因?yàn)閰f(xié)程并非線程安全
# 通過(guò)迭代器不斷的向后計(jì)數(shù),可以完美的保證線程安全(Ps: GET 新技能)
_task_name_counter = itertools.count(1).__next__
# ...
_PyTask = Task
FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
# 包含所有正在活動(dòng)的任務(wù)
_all_tasks = weakref.WeakSet()
# 一個(gè)字典,包含當(dāng)前正在活動(dòng)的任務(wù) {loop: task}
_current_tasks = {}
# ...
_py_register_task = _register_task
_py_unregister_task = _unregister_task
_py_enter_task = _enter_task
_py_leave_task = _leave_task
try:
from _asyncio import (_register_task, _unregister_task,
_enter_task, _leave_task,
_all_tasks, _current_tasks)
except ImportError:
pass
else:
_c_register_task = _register_task
_c_unregister_task = _unregister_task
_c_enter_task = _enter_task
_c_leave_task = _leave_task
trsock
trsock 是在 base_events 中第九個(gè)被 import 的。其作用是實(shí)現(xiàn)了一個(gè) '傳輸套接字' 的類。
具體是對(duì)模塊內(nèi)的,暫不深究。
runners
runners 是在 asyncio 入口文件中第八個(gè)被 import 的。其作用為定義 asyncio 的入口方法 run 以及定義管理事件循環(huán)聲明周期類的 'Runner'。
__all__ = ('Runner', 'run')
Runner 功能為 Python 3.11 新功能。
queues
queues 是在 asyncio 入口文件中第九個(gè)被 import 的。其作用是定義一些用于協(xié)程信息同步的隊(duì)列。
__all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty')
streams
streams 是在 asyncio 入口文件中第十個(gè)被 import 的。其作用是定義流式傳輸相關(guān)的具體實(shí)現(xiàn)類,如 '可讀流'、'可寫流' 等等。
__all__ = (
'StreamReader', 'StreamWriter', 'StreamReaderProtocol',
'open_connection', 'start_server')
如果是在 Unix 平臺(tái)下,則 __all__ 會(huì)新增一些內(nèi)容:
if hasattr(socket, 'AF_UNIX'):
__all__ += ('open_unix_connection', 'start_unix_server')
# 讀寫流操作的緩沖區(qū)大小為 64kb
_DEFAULT_LIMIT = 2 ** 16
該文件與 transports 關(guān)系較為密切。
subprocess
subprocess 是在 asyncio 入口文件中第十一個(gè)被 import 的。其作用是定義子進(jìn)程通信相關(guān)的類,如 'SubprocessProtocol' 和 'Protocol' 等等。
__all__ = 'create_subprocess_exec', 'create_subprocess_shell'
# ...
PIPE = subprocess.PIPE
STDOUT = subprocess.STDOUT
DEVNULL = subprocess.DEVNULL
taskgroups
taskgroups 是在 asyncio 入口文件中中第十二個(gè)被 import 的。其作用是定義了任務(wù)組。
__all__ = ["TaskGroup"]
此功能為 Python 3.11 新功能。
timeouts
timeouts 是在 asyncio 入口文件中中第十三個(gè)被 import 的。其作用是定義了超時(shí)相關(guān)的類和函數(shù)。
__all__ = (
"Timeout",
"timeout",
"timeout_at",
)
class _State(enum.Enum):
CREATED = "created"
ENTERED = "active"
EXPIRING = "expiring"
EXPIRED = "expired"
EXITED = "finished"
threads
threads 是在 asyncio 入口文件中第十四個(gè)被 import 的。其作用是定義了函數(shù) to_thread。
__all__ = "to_thread",
async def to_thread(func, /, *args, **kwargs):
pass
模塊導(dǎo)入關(guān)系圖
整個(gè) asyncio 模塊的初始化模塊導(dǎo)入關(guān)系圖如下:
由 asyncio.run 引發(fā)的故事
asyncio.run() 作為目前 Python 較為推崇的協(xié)程起始方式。研究其內(nèi)部啟動(dòng)順序及執(zhí)行順序是十分有必要的。
在 Python 3.11 版本后 asyncio 新增了多種協(xié)程起始方式,但 asyncio.run 的地位依舊不容置疑。
如果后續(xù)有機(jī)會(huì),我們可以再繼續(xù)探討研究 Python 3.7 之前的協(xié)程起始方式。
事件循環(huán)的初始化過(guò)程
函數(shù)基本介紹
asyncio.run() 位于 asyncio.runners 文件中,其函數(shù)簽名如下:
def run(main, *, debug=None):
pass
如同官方文檔所說(shuō),該方法如果在沒有 Runner 參與的情況下,應(yīng)當(dāng)只調(diào)用一次。
在 Python 3.11 版本后,新加入的 Runner 類使其源碼發(fā)生了一定的變化,但其內(nèi)部邏輯總是萬(wàn)變不離其宗的。
def run(main, *, debug=None):
# 若當(dāng)前的線程已經(jīng)存在一個(gè)正在運(yùn)行的事件循環(huán)、則拋出異常
if events._get_running_loop() is not None:
raise RuntimeError(
"asyncio.run() cannot be called from a running event loop")
with Runner(debug=debug) as runner:
return runner.run(main)
_get_running_loop
源碼如下:
# ...
class _RunningLoop(threading.local):
loop_pid = (None, None)
# ...
_running_loop = _RunningLoop()
# ...
def _get_running_loop():
running_loop, pid = _running_loop.loop_pid
# 這里條件不滿足,所以返回的必然是 None
if running_loop is not None and pid == os.getpid():
return running_loop
對(duì)于了解過(guò) threading.local 源代碼的同學(xué)這里應(yīng)該比較好理解。
Ps: threading.local 所實(shí)現(xiàn)的功能是讓每一個(gè)線程能夠存儲(chǔ)自己獨(dú)有的數(shù)據(jù),其原理
大致是維護(hù)一個(gè) global dict,其結(jié)構(gòu)為 {"threading_ident": "data"}
Runner 類
繼續(xù)回到 asyncio.run() 函數(shù)中,可以發(fā)現(xiàn)它 with 了 Runner 類:
def run(main, *, debug=None):
# ...
with Runner(debug=debug) as runner:
return runner.run(main)
先看 Runner 的 __init__() 方法,再看其 __enter__ 方法。
class _State(enum.Enum):
CREATED = "created"
INITIALIZED = "initialized"
CLOSED = "closed"
class Runner:
def __init__(self, *, debug=None, loop_factory=None):
self._state = _State.CREATED
self._debug = debug
self._loop_factory = loop_factory
self._loop = None
self._context = None
self._interrupt_count = 0
self._set_event_loop = False
def __enter__(self):
self._lazy_init()
return self
def _lazy_init(self):
# 如果是關(guān)閉狀態(tài),則拋出異常
if self._state is _State.CLOSED:
raise RuntimeError("Runner is closed")
# 如果是初始化狀態(tài),則返回
if self._state is _State.INITIALIZED:
return
# 如果 loop 工廠函數(shù)是 None
if self._loop_factory is None:
# 創(chuàng)建一個(gè)新的 loop
self._loop = events.new_event_loop()
if not self._set_event_loop:
events.set_event_loop(self._loop)
self._set_event_loop = True
else:
self._loop = self._loop_factory()
if self._debug is not None:
self._loop.set_debug(self._debug)
self._context = contextvars.copy_context()
self._state = _State.INITIALIZED
events.new_event_loop
events.new_event_loop 的源碼如下,他通過(guò)拿到當(dāng)前事件循環(huán)策略來(lái)得到一個(gè)新的事件循環(huán):
# ...
_event_loop_policy = None
_lock = threading.Lock()
# ...
def new_event_loop():
return get_event_loop_policy().new_event_loop()
# ...
def get_event_loop_policy():
if _event_loop_policy is None:
_init_event_loop_policy()
return _event_loop_policy
# ...
def _init_event_loop_policy():
global _event_loop_policy
# 思考點(diǎn):
# 這里為何要加線程鎖?
# 是為了避免多線程多事件循環(huán)狀態(tài)下 _event_loop_policy 的
# 數(shù)據(jù)同步問題嗎?防止同時(shí)多次運(yùn)行 DefaultEventLoopPolicy 實(shí)例化嗎?
with _lock:
if _event_loop_policy is None:
from . import DefaultEventLoopPolicy
_event_loop_policy = DefaultEventLoopPolicy()
_UnixDefaultEventLoopPolicy
接下來(lái)我們要繼續(xù)看 DefaultEventLoopPolicy 的代碼實(shí)現(xiàn),它位于 unix_events 文件中。
class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
_loop_factory = None
class _Local(threading.local):
_loop = None
_set_called = False
def __init__(self):
# 2. 為當(dāng)前線程生成了一個(gè)獨(dú)立的 threading location
self._local = self._Local()
def new_event_loop(self):
# 3. 實(shí)例化 _UnixSelectorEventLoop
return self._loop_factory()
class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
_loop_factory = _UnixSelectorEventLoop
# 1. 初始化類
def __init__(self):
super().__init__()
self._watcher = None
DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy
_UnixSelectorEventLoop
繼續(xù)看 _UnixSelectorEventLoop 的實(shí)例化過(guò)程:
# ---- coroutines ----
def _is_debug_mode():
return sys.flags.dev_mode or (not sys.flags.ignore_environment and
bool(os.environ.get('PYTHONASYNCIODEBUG')))
# ---- base_events ----
class BaseEventLoop(events.AbstractEventLoop):
def __init__(self):
# 3. 實(shí)例化對(duì)象字典填充
self._timer_cancelled_count = 0
self._closed = False
self._stopping = False
# deque 雙端隊(duì)列
self._ready = collections.deque()
self._scheduled = []
# 默認(rèn)執(zhí)行器
self._default_executor = None
self._internal_fds = 0
self._thread_id = None
# 1e-09
self._clock_resolution = time.get_clock_info('monotonic').resolution
# 默認(rèn)異常處理程序
self._exception_handler = None
self.set_debug(coroutines._is_debug_mode())
self.slow_callback_duration = 0.1
self._current_handle = None
self._task_factory = None
self._coroutine_origin_tracking_enabled = False
self._coroutine_origin_tracking_saved_depth = None
self._asyncgens = weakref.WeakSet()
self._asyncgens_shutdown_called = False
self._executor_shutdown_called = False
# ---- selector_events ----
class BaseSelectorEventLoop(base_events.BaseEventLoop):
def __init__(self, selector=None):
# 2. 繼續(xù)調(diào)用父類 __init__ 方法,填充實(shí)例化對(duì)象的 __dict__ 字典
super().__init__()
# 3. 判斷 selector 是否為 None
if selector is None:
# 得到一個(gè)默認(rèn)的 io 復(fù)用選擇器
# select poll epoll
selector = selectors.DefaultSelector()
logger.debug('Using selector: %s', selector.__class__.__name__)
self._selector = selector
# 4. 調(diào)用 _make_self_pipe 方法
self._make_self_pipe()
# 10. 通過(guò) weakref 創(chuàng)建出 1 個(gè)弱引用映射類
self._transports = weakref.WeakValueDictionary()
def _make_self_pipe(self):
# 5. 創(chuàng)建 1 個(gè)非阻塞的 socket 對(duì)象
self._ssock, self._csock = socket.socketpair()
self._ssock.setblocking(False)
self._csock.setblocking(False)
self._internal_fds += 1
# 6. 調(diào)用 address,傳入當(dāng)前 sock 對(duì)象的文件描述符
self._add_reader(self._ssock.fileno(), self._read_from_self)
def _add_reader(self, fd, callback, *args):
# 7. 檢查當(dāng)前類是否是關(guān)閉狀態(tài)
self._check_closed()
# 9. 實(shí)例化注冊(cè)一個(gè) handle,注意這里的 callback 是
# self._read_from_self, args 為 ()
handle = events.Handle(callback, args, self, None)
try:
# 第一次運(yùn)行這里會(huì)報(bào)錯(cuò),返回當(dāng)前文件對(duì)象注冊(cè)的 SelectorKey
key = self._selector.get_key(fd)
except KeyError:
# 若報(bào)錯(cuò)則注冊(cè)一個(gè)讀事件,將 handle 放入
self._selector.register(fd, selectors.EVENT_READ,
(handle, None))
else:
# 如果是第二次運(yùn)行這個(gè)方法,則拿到 event
# 疑問點(diǎn)(register 時(shí)放入的是 handle 和 None)
# 為何出來(lái)就成了可讀流和可寫流?其實(shí)是事件循環(huán)開啟后的一系列處理
# 可參照 事件循環(huán)的 sele._selector.select 以及 BaseSelectorEventLoop._process_events() 方法
# 結(jié)果中的 reader 表示可讀流的事件處理器對(duì)象,而 writer 為 None
mask, (reader, writer) = key.events, key.data
# 修改 fd 的注冊(cè)事件
# select 中 1 是讀事件,2 是寫事件。按位或后的結(jié)果總是較大值
# 或兩者的和
self._selector.modify(fd, mask | selectors.EVENT_READ,
(handle, writer))
# 如果沒有可讀流,則關(guān)閉,說(shuō)明 except 那里沒有注冊(cè)好 handle 或者被 unregister 掉了
if reader is not None:
reader.cancel()
return handle
def _check_closed(self):
# 8. 若是關(guān)閉狀態(tài)則直接拋出異常
if self._closed:
raise RuntimeError('Event loop is closed')
# ---- unix_events ----
class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
def __init__(self, selector=None):
# 1. 調(diào)用父類進(jìn)行實(shí)例化數(shù)據(jù)填充,構(gòu)建 __dict__ 字典
super().__init__(selector)
self._signal_handlers = {}
events.Handle
events.Handle 類的源碼如下,這個(gè) Handle 類是 asyncio 中各類任務(wù)的上層封裝,十分重要:
# handle = events.Handle(callback, args, self, None)
# callback = BaseSelectorEventLoop._read_from_self
# self = _UnixSelectorEventLoop instance
class Handle:
__slots__ = ('_callback', '_args', '_cancelled', '_loop',
'_source_traceback', '_repr', '__weakref__',
'_context')
def __init__(self, callback, args, loop, context=None):
# 若當(dāng)前上下文為空,則 copy 當(dāng)前上下文
if context is None:
context = contextvars.copy_context()
self._context = context
# loop 就是 _UnixSelectorEventLoop 的實(shí)例化對(duì)象
self._loop = loop
self._callback = callback
# ()
self._args = args
self._cancelled = False
self._repr = None
# 先不看 debug 模式
if self._loop.get_debug():
self._source_traceback = format_helpers.extract_stack(
sys._getframe(1))
else:
self._source_traceback = None
event.set_event_loop
至此,_loop_factory 已經(jīng)全部走完了。實(shí)際上也沒干太特別的事情,就創(chuàng)建了一個(gè) DefaultSelector 以及實(shí)例化了一個(gè) socket 對(duì)象并注冊(cè)進(jìn)了 DefaultSelector 中。
我們要接著看 Runner:
class _State(enum.Enum):
CREATED = "created"
INITIALIZED = "initialized"
CLOSED = "closed"
class Runner:
def __init__(self, *, debug=None, loop_factory=None):
self._state = _State.CREATED
self._debug = debug
self._loop_factory = loop_factory
self._loop = None
self._context = None
self._interrupt_count = 0
self._set_event_loop = False
def __enter__(self):
self._lazy_init()
return self
def _lazy_init(self):
# 如果是關(guān)閉狀態(tài),則拋出異常
if self._state is _State.CLOSED:
raise RuntimeError("Runner is closed")
# 如果是初始化狀態(tài),則返回
if self._state is _State.INITIALIZED:
return
# 如果 loop 工廠函數(shù)是 None
if self._loop_factory is None:
# 創(chuàng)建一個(gè)新的 loop,這里的返回對(duì)象就是 _UnixSelectorEventLoop 的實(shí)例化對(duì)象
self._loop = events.new_event_loop()
if not self._set_event_loop:
events.set_event_loop(self._loop)
self._set_event_loop = True
else:
self._loop = self._loop_factory()
if self._debug is not None:
self._loop.set_debug(self._debug)
self._context = contextvars.copy_context()
self._state = _State.INITIALIZED
events.set_event_loop 源碼:
def get_event_loop_policy():
if _event_loop_policy is None:
_init_event_loop_policy()
# 1. 應(yīng)該走這里,實(shí)際上 _event_loop_policy 就是 _UnixDefaultEventLoopPolicy 的實(shí)例對(duì)象
return _event_loop_policy
def set_event_loop(loop):
# 2. 運(yùn)行 _UnixDefaultEventLoopPolicy 實(shí)例對(duì)象的 set_event_loop
get_event_loop_policy().set_event_loop(loop)
_UnixDefaultEventLoopPolicy 的 set_event_loop 方法:
# ---- events ----
class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
_loop_factory = None
class _Local(threading.local):
_loop = None
_set_called = False
def __init__(self):
self._local = self._Local()
def set_event_loop(self, loop):
# 2. 通過(guò) threading lock 設(shè)置標(biāo)志位置
self._local._set_called = True
if loop is not None and not isinstance(loop, AbstractEventLoop):
raise TypeError(f"loop must be an instance of AbstractEventLoop or None, not '{type(loop).__name__}'")
self._local._loop = loop
# ---- unix_events ----
class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
_loop_factory = _UnixSelectorEventLoop
# ...
def set_event_loop(self, loop):
# 這個(gè) loop 是 _UnixSelectorEventLoop 的實(shí)例化對(duì)象
# 1. super 父類的同名方法
super().set_event_loop(loop)
# 3. 實(shí)例化的時(shí)候這里是 None, 不會(huì)運(yùn)行下面的條件
if (self._watcher is not None and
threading.current_thread() is threading.main_thread()):
self._watcher.attach_loop(loop)
至此 Runner._lazy_init 應(yīng)該全部走完了:
class _State(enum.Enum):
CREATED = "created"
INITIALIZED = "initialized"
CLOSED = "closed"
class Runner:
def __init__(self, *, debug=None, loop_factory=None):
self._state = _State.CREATED
self._debug = debug
self._loop_factory = loop_factory
self._loop = None
self._context = None
self._interrupt_count = 0
self._set_event_loop = False
def __enter__(self):
self._lazy_init()
return self
def _lazy_init(self):
# 如果是關(guān)閉狀態(tài),則拋出異常
if self._state is _State.CLOSED:
raise RuntimeError("Runner is closed")
# 如果是初始化狀態(tài),則返回
if self._state is _State.INITIALIZED:
return
# 如果 loop 工廠函數(shù)是 None
if self._loop_factory is None:
# 創(chuàng)建一個(gè)新的 loop,這里的返回對(duì)象就是 _UnixSelectorEventLoop 的實(shí)例化對(duì)象
self._loop = events.new_event_loop()
# 設(shè)置新的標(biāo)志位,代表事件循環(huán)已經(jīng)初始化成功
if not self._set_event_loop:
events.set_event_loop(self._loop)
self._set_event_loop = True
# 不會(huì)走這里
else:
self._loop = self._loop_factory()
if self._debug is not None:
self._loop.set_debug(self._debug)
# copy 當(dāng)前上下文
self._context = contextvars.copy_context()
# 修改狀態(tài)
self._state = _State.INITIALIZED
注意,此時(shí)事件循環(huán)已經(jīng)初始化完成了,但還沒有正式啟動(dòng)。
事件循環(huán)初始化流程圖
以下是事件循環(huán)的初始化流程圖:
事件循環(huán)的啟動(dòng)和任務(wù)的執(zhí)行
在上面我們大概看了一下事件循環(huán)的初始化。接下來(lái)應(yīng)該走到 runner.run() 方法中看他如何運(yùn)行事件循環(huán)。
def run(main, *, debug=None):
# ...
with Runner(debug=debug) as runner:
return runner.run(main)
runner.run
源代碼如下:
class _RunningLoop(threading.local):
loop_pid = (None, None)
# ...
_running_loop = _RunningLoop()
# ...
def _get_running_loop():
running_loop, pid = _running_loop.loop_pid
if running_loop is not None and pid == os.getpid():
return running_loop
# -------------
class Runner:
def __init__(self, *, debug=None, loop_factory=None):
self._state = _State.CREATED # INITIALIZED
self._debug = debug
self._loop_factory = loop_factory # _UnixSelectorEventLoop 實(shí)例對(duì)象
self._loop = None
self._context = None # dict
self._interrupt_count = 0
self._set_event_loop = False # True
def _lazy_init(self):
if self._state is _State.CLOSED:
raise RuntimeError("Runner is closed")
# 2. 直接返回
if self._state is _State.INITIALIZED:
return
# ...
def run(self, coro, *, context=None):
# 若不是一個(gè)協(xié)程函數(shù),則拋出異常
if not coroutines.iscoroutine(coro):
raise ValueError("a coroutine was expected, got {!r}".format(coro))
# 若 event loop 已經(jīng)運(yùn)行了,則拋出異常
# 這里還沒有運(yùn)行
if events._get_running_loop() is not None:
raise RuntimeError(
"Runner.run() cannot be called from a running event loop")
# 1. 運(yùn)行 _lazy_init
self._lazy_init()
# 3. 不是 None
if context is None:
context = self._context
# 4. 創(chuàng)建協(xié)程并發(fā)任務(wù)
task = self._loop.create_task(coro, context=context)
# .. 后面再看
if (threading.current_thread() is threading.main_thread()
and signal.getsignal(signal.SIGINT) is signal.default_int_handler
):
sigint_handler = functools.partial(self._on_sigint, main_task=task)
try:
signal.signal(signal.SIGINT, sigint_handler)
except ValueError:
sigint_handler = None
else:
sigint_handler = None
self._interrupt_count = 0
try:
return self._loop.run_until_complete(task)
except exceptions.CancelledError:
if self._interrupt_count > 0:
uncancel = getattr(task, "uncancel", None)
if uncancel is not None and uncancel() == 0:
raise KeyboardInterrupt()
raise # CancelledError
finally:
if (sigint_handler is not None
and signal.getsignal(signal.SIGINT) is sigint_handler
):
signal.signal(signal.SIGINT, signal.default_int_handler)
self._loop.create_task
_UnixSelectorEventLoop 和其父類 BaseSelectorEventLoop 本身沒有實(shí)現(xiàn) create_task() 方法,是在其超類 BaseEventLoop 所實(shí)現(xiàn)。
BaseEventLoop.create_task() 實(shí)際上就是 asyncio.create_task() 方法的底層。
class BaseEventLoop(events.AbstractEventLoop):
def __init__(self):
self._timer_cancelled_count = 0
self._closed = False
self._stopping = False
self._ready = collections.deque()
self._scheduled = []
self._default_executor = None
self._internal_fds = 0
self._thread_id = None
self._clock_resolution = time.get_clock_info('monotonic').resolution
self._exception_handler = None
self.set_debug(coroutines._is_debug_mode())
self.slow_callback_duration = 0.1
self._current_handle = None
self._task_factory = None
self._coroutine_origin_tracking_enabled = False
self._coroutine_origin_tracking_saved_depth = None
self._asyncgens = weakref.WeakSet()
self._asyncgens_shutdown_called = False
self._executor_shutdown_called = False
def create_task(self, coro, *, name=None, context=None):
"""Schedule a coroutine object.
Return a task object.
"""
# 先檢查是否關(guān)閉,返回的結(jié)果必定是 False
self._check_closed()
# 任務(wù)工廠為 None
if self._task_factory is None:
task = tasks.Task(coro, loop=self, name=name, context=context)
if task._source_traceback:
del task._source_traceback[-1]
# 若通過(guò) asyncio.get_running_loop().set_task_factory() 設(shè)置了任務(wù)工廠函數(shù)的話
# 那么就運(yùn)行 else 的代碼塊
else:
if context is None:
# Use legacy API if context is not needed
task = self._task_factory(self, coro)
else:
task = self._task_factory(self, coro, context=context)
tasks._set_task_name(task, name)
return task
def _check_closed(self):
# 若是關(guān)閉狀態(tài)則直接拋出異常
if self._closed:
raise RuntimeError('Event loop is closed')
def set_task_factory(self, factory):
if factory is not None and not callable(factory):
raise TypeError('task factory must be a callable or None')
self._task_factory = factory
tasks.Task
tasks.Task 的源碼如下:
# ----- futures -----
class Future:
_state = _PENDING
_result = None
_exception = None
_loop = None
_source_traceback = None
_cancel_message = None
_cancelled_exc = None
_asyncio_future_blocking = False
__log_traceback = False
def __init__(self, *, loop=None):
# 2. loop 傳入的不是 None、所以這里直接走 else
if loop is None:
self._loop = events._get_event_loop()
else:
self._loop = loop
self._callbacks = []
if self._loop.get_debug():
self._source_traceback = format_helpers.extract_stack(
sys._getframe(1))
# ...
_PyFuture = Future
# ----- tasks -----
_task_name_counter = itertools.count(1).__next__
# ...
class Task(futures._PyFuture):
_log_destroy_pending = True
def __init__(self, coro, *, loop=None, name=None, context=None):
# 1. 運(yùn)行 super 也就是 Future 的 __init__ 方法
super().__init__(loop=loop)
if self._source_traceback:
del self._source_traceback[-1]
# 若不是一個(gè) coroutine 則拋出異常
if not coroutines.iscoroutine(coro):
self._log_destroy_pending = False
raise TypeError(f"a coroutine was expected, got {coro!r}")
# 若沒有指定 name 則生成一個(gè) name
if name is None:
self._name = f'Task-{_task_name_counter()}'
else:
self._name = str(name)
self._num_cancels_requested = 0
self._must_cancel = False
self._fut_waiter = None
self._coro = coro
if context is None:
self._context = contextvars.copy_context()
else:
self._context = context
# 運(yùn)行 _UnixSelectorEventLoop 的 call_soon 方法
self._loop.call_soon(self.__step, context=self._context)
_register_task(self)
BaseEventLoop.call_soon
_UnixSelectorEventLoop 未實(shí)現(xiàn) call_soon() 方法,而是在其超類 BaseEventLoop 中實(shí)現(xiàn):
class BaseEventLoop(events.AbstractEventLoop):
def __init__(self):
# ...
# deque 雙端隊(duì)列
self._ready = collections.deque()
def _call_soon(self, callback, args, context):
# handle 的源碼可參照上面初始化 event loop 時(shí)的操作
handle = events.Handle(callback, args, self, context)
if handle._source_traceback:
del handle._source_traceback[-1]
# 將 handle 放入 _ready 中
self._ready.append(handle)
return handle
def call_soon(self, callback, *args, context=None):
# callback: Task.__step 方法
# args: ()
# context: dict
self._check_closed()
# 不走 debug,沒必要細(xì)看
if self._debug:
self._check_thread()
self._check_callback(callback, 'call_soon')
handle = self._call_soon(callback, args, context)
if handle._source_traceback:
del handle._source_traceback[-1]
return handle
_register_task
在 loop.call_soon() 執(zhí)行執(zhí)行完畢后,Task 的 __init__() 方法最后會(huì)運(yùn)行 _register_task() 方法。
# 包含所有活動(dòng)任務(wù)的 WeakSet。
_all_tasks = weakref.WeakSet()
def _register_task(task):
"""在 asyncio 中注冊(cè)一個(gè)由循環(huán)執(zhí)行的新任務(wù)。"""
_all_tasks.add(task)
runner.run
現(xiàn)在,讓我們繼續(xù)回到 runner.run() 方法中。
class Runner:
def __init__(self, *, debug=None, loop_factory=None):
self._state = _State.CREATED # INITIALIZED
self._debug = debug
self._loop_factory = loop_factory # _UnixSelectorEventLoop 實(shí)例對(duì)象
self._loop = None
self._context = None # dict
self._interrupt_count = 0
self._set_event_loop = False # True
def run(self, coro, *, context=None):
if not coroutines.iscoroutine(coro):
raise ValueError("a coroutine was expected, got {!r}".format(coro))
if events._get_running_loop() is not None:
raise RuntimeError(
"Runner.run() cannot be called from a running event loop")
self._lazy_init()
if context is None:
context = self._context
task = self._loop.create_task(coro, context=context)
# 如果當(dāng)前線程是主線程并且當(dāng)前使用了 SIGNAL 的默認(rèn)處理程序結(jié)果是 True
# 這里是 ctrl + c 終止程序的信號(hào)
if (threading.current_thread() is threading.main_thread()
and signal.getsignal(signal.SIGINT) is signal.default_int_handler
):
# 則信號(hào)處理程序設(shè)置為 self._on_sigint 程序, 并將主任務(wù)傳遞進(jìn)去
sigint_handler = functools.partial(self._on_sigint, main_task=task)
# 嘗試設(shè)置當(dāng)前的信號(hào)處理程序
try:
signal.signal(signal.SIGINT, sigint_handler)
except ValueError:
sigint_handler = None
else:
sigint_handler = None
self._interrupt_count = 0
try:
# 核心代碼
return self._loop.run_until_complete(task)
except exceptions.CancelledError:
# 異常處理邏輯
uncancel = getattr(task, "uncancel", None)
if uncancel is not None and uncancel() == 0:
raise KeyboardInterrupt()
raise # CancelledError
finally:
# 解綁 ctrl+c 的信號(hào)處理
if (sigint_handler is not None
and signal.getsignal(signal.SIGINT) is sigint_handler
):
signal.signal(signal.SIGINT, signal.default_int_handler)
def _on_sigint(self, signum, frame, main_task):
# 主線程里 +1
self._interrupt_count += 1
if self._interrupt_count == 1 and not main_task.done():
# 取消主任務(wù)
main_task.cancel()
self._loop.call_soon_threadsafe(lambda: None)
return
raise KeyboardInterrupt()
BaseEventLoop.run_until_complete
_UnixSelectorEventLoop 并未實(shí)現(xiàn) run_until_complete() 方法。而是由其超類 BaseEventLoop 所實(shí)現(xiàn)。
BaseEventLoop.run_until_complete() 源碼如下:
class BaseEventLoop(events.AbstractEventLoop):
def __init__(self):
self._timer_cancelled_count = 0
self._closed = False
self._stopping = False
self._ready = collections.deque() # 應(yīng)該塞了一個(gè) handle
self._scheduled = []
self._default_executor = None
self._internal_fds = 0
def _check_closed(self):
if self._closed:
raise RuntimeError('Event loop is closed')
def is_running(self):
"""Returns True if the event loop is running."""
return (self._thread_id is not None)
def _check_running(self):
if self.is_running():
raise RuntimeError('This event loop is already running')
if events._get_running_loop() is not None:
raise RuntimeError(
'Cannot run the event loop while another loop is running')
def run_until_complete(self, future):
# future 就是 main coroutine 的入口函數(shù)的 task
# 1. 未關(guān)閉
self._check_closed()
# 2. self._thread_id 現(xiàn)在是 None,所以這里不會(huì)報(bào)錯(cuò)
self._check_running()
# False
new_task = not futures.isfuture(future)
# 3. 將 task 傳入,返回一個(gè) future 對(duì)象
future = tasks.ensure_future(future, loop=self)
if new_task:
future._log_destroy_pending = False
# 5. 給 main coroutine 的入口函數(shù)的 task 添加一個(gè)回調(diào)函數(shù)
future.add_done_callback(_run_until_complete_cb)
try:
# 6. 開始運(yùn)行
self.run_forever()
except:
if new_task and future.done() and not future.cancelled():
future.exception()
raise
finally:
# 執(zhí)行完成后,解綁毀回調(diào)函數(shù)
future.remove_done_callback(_run_until_complete_cb)
# 若報(bào)錯(cuò),則代表事件循環(huán)關(guān)閉了
if not future.done():
raise RuntimeError('Event loop stopped before Future completed.')
# 返回未來(lái)對(duì)象的結(jié)果
return future.result()
# ---- tasks ----
def ensure_future(coro_or_future, *, loop=None):
return _ensure_future(coro_or_future, loop=loop)
def _ensure_future(coro_or_future, *, loop=None):
# 4. 保證是 future
# True
if futures.isfuture(coro_or_future):
# False
if loop is not None and loop is not futures._get_loop(coro_or_future):
raise ValueError('The future belongs to a different loop than '
'the one specified as the loop argument')
# 直接 return
return coro_or_future
# 如果不是一個(gè) coro 或者 future、則進(jìn)行其他處理
called_wrap_awaitable = False
if not coroutines.iscoroutine(coro_or_future):
if inspect.isawaitable(coro_or_future):
coro_or_future = _wrap_awaitable(coro_or_future)
called_wrap_awaitable = True
else:
raise TypeError('An asyncio.Future, a coroutine or an awaitable '
'is required')
if loop is None:
loop = events._get_event_loop(stacklevel=4)
try:
return loop.create_task(coro_or_future)
except RuntimeError:
if not called_wrap_awaitable:
coro_or_future.close()
raise
BaseEventLoop.run_forever
BaseEventLoop.run_forever() 的源碼如下:
class BaseEventLoop(events.AbstractEventLoop):
def __init__(self):
self._timer_cancelled_count = 0
self._closed = False
self._stopping = False
self._ready = collections.deque() # 應(yīng)該塞了一個(gè) handle
self._scheduled = []
self._default_executor = None
self._internal_fds = 0
self._thread_id = 111 # 當(dāng)前線程 id
def _check_closed(self):
if self._closed:
raise RuntimeError('Event loop is closed')
def is_running(self):
"""Returns True if the event loop is running."""
return (self._thread_id is not None)
def _check_running(self):
if self.is_running():
raise RuntimeError('This event loop is already running')
if events._get_running_loop() is not None:
raise RuntimeError(
'Cannot run the event loop while another loop is running')
def run_forever(self):
"""Run until stop() is called."""
# 1. 未關(guān)閉
self._check_closed()
# 2. 未運(yùn)行
self._check_running()
# 3. 不重要
self._set_coroutine_origin_tracking(self._debug)
# 4. 獲取舊的異步生成器鉤子
old_agen_hooks = sys.get_asyncgen_hooks()
try:
# 5. 將當(dāng)前事件循環(huán)的 _thread_id 給賦值
self._thread_id = threading.get_ident()
# 6. 設(shè)置異步生成器的鉤子函數(shù)
sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
finalizer=self._asyncgen_finalizer_hook)
# 7. 設(shè)置正在運(yùn)行的 loop
events._set_running_loop(self)
# 8. 調(diào)用 _run_once
while True:
self._run_once()
# 9. 如果 _stopping 為 True、則跳出
if self._stopping:
break
finally:
# 恢復(fù)標(biāo)志位、恢復(fù)生成器鉤子函數(shù)
self._stopping = False
self._thread_id = None
events._set_running_loop(None)
self._set_coroutine_origin_tracking(False)
sys.set_asyncgen_hooks(*old_agen_hooks)
def _asyncgen_firstiter_hook(self, agen):
# 在之前調(diào)用
if self._asyncgens_shutdown_called:
warnings.warn(
f"asynchronous generator {agen!r} was scheduled after "
f"loop.shutdown_asyncgens() call",
ResourceWarning, source=self)
self._asyncgens.add(agen)
def _asyncgen_finalizer_hook(self, agen):
# 在之后調(diào)用
self._asyncgens.discard(agen)
if not self.is_closed():
self.call_soon_threadsafe(self.create_task, agen.aclose())
# ---- events ----
def _set_running_loop(loop):
_running_loop.loop_pid = (loop, os.getpid())
BaseEventLoop._run_once
BaseEventLoop._run_once() 方法源碼如下:
class BaseEventLoop(events.AbstractEventLoop):
def __init__(self):
self._timer_cancelled_count = 0
self._closed = False
self._stopping = False
self._ready = collections.deque() # 應(yīng)該塞了 1 個(gè) handle
self._scheduled = []
self._default_executor = None
self._internal_fds = 0
self._thread_id = 111 # 當(dāng)前線程 id
def _run_once(self):
sched_count = len(self._scheduled)
# 1. 判斷當(dāng)前需要調(diào)度的數(shù)量,是否大于 _MIN_SCHEDULED_TIMER_HANDLES
# 并且已取消的計(jì)時(shí)器句柄數(shù)量除以需要調(diào)度的數(shù)量大于 _MIN_SCHEDULED_TIMER_HANDLES
# 這里的條件肯定是不滿足的
if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
self._timer_cancelled_count / sched_count >
_MIN_CANCELLED_TIMER_HANDLES_FRACTION):
new_scheduled = []
for handle in self._scheduled:
if handle._cancelled:
handle._scheduled = False
else:
new_scheduled.append(handle)
heapq.heapify(new_scheduled)
self._scheduled = new_scheduled
self._timer_cancelled_count = 0
else:
# 2. 這里的 while 循環(huán)也跑不起來(lái)的,因?yàn)?self._scheduled 是 []
while self._scheduled and self._scheduled[0]._cancelled:
self._timer_cancelled_count -= 1
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
# 3. timeout 這里應(yīng)該是滿足條件的,置 0
timeout = None
if self._ready or self._stopping:
timeout = 0
elif self._scheduled:
when = self._scheduled[0]._when
timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
# 4. BaseSelectorEventLoop 子類中有這個(gè) _selector,這里直接開啟監(jiān)聽。
# 這里監(jiān)聽的對(duì)象只有 1 個(gè) socket 對(duì)象,由于沒有事件觸發(fā),所以這里會(huì)直接跳過(guò)
event_list = self._selector.select(timeout)
# BaseEventLoop._process_events
self._process_events(event_list)
event_list = None
end_time = self.time() + self._clock_resolution
# 5. 不會(huì)進(jìn)行循環(huán)
while self._scheduled:
handle = self._scheduled[0]
if handle._when >= end_time:
break
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
self._ready.append(handle)
# 6. self._ready 的長(zhǎng)度應(yīng)該為 1,里面放了一個(gè) handle
ntodo = len(self._ready)
for i in range(ntodo):
# 彈出第一個(gè) handle,若沒取消則運(yùn)行其 _run 方法
handle = self._ready.popleft()
if handle._cancelled:
continue
# 若開啟了調(diào)試模式,則還需要記錄時(shí)間
if self._debug:
try:
self._current_handle = handle
t0 = self.time()
handle._run()
dt = self.time() - t0
if dt >= self.slow_callback_duration:
logger.warning('Executing %s took %.3f seconds',
_format_handle(handle), dt)
finally:
self._current_handle = None
else:
# 運(yùn)行其 _run 方法
handle._run()
handle = None
handle._run
handle 對(duì)象是用戶所創(chuàng)建的任務(wù)對(duì)象的抽象層。
因?yàn)?Task 內(nèi)部實(shí)際上是調(diào)用了 loop.call_soon() 方法將 coroutine 放在 Task 對(duì)象中,而 Task 對(duì)象的 __step() 方法又將作為 callback 封裝給 handle. 并 register task 至 _all_tasks 這個(gè) WeakSet 中。
換而言之、事件循環(huán)總是通過(guò) _ready 隊(duì)列拿到不同的 handle,并通過(guò) handle 來(lái)執(zhí)行最初的 coroutine 任務(wù)。
以下是 handle._run() 方法的源碼:
class Handle:
def __init__(self, callback, args, loop, context=None):
if context is None:
context = contextvars.copy_context()
self._context = context # Task 對(duì)象創(chuàng)建時(shí)的上下文環(huán)境
self._loop = loop # 當(dāng)前的 event loop
self._callback = callback # 就是 Task 對(duì)象的 __step
self._args = args
self._cancelled = False
self._repr = None
if self._loop.get_debug():
self._source_traceback = format_helpers.extract_stack(
sys._getframe(1))
else:
self._source_traceback = None
def cancel(self):
if not self._cancelled:
self._cancelled = True
if self._loop.get_debug():
self._repr = repr(self)
self._callback = None
self._args = None
def cancelled(self):
return self._cancelled
def _run(self):
try:
# 運(yùn)行 Task 對(duì)象的 __step 方法
self._context.run(self._callback, *self._args)
# 若有異常,則交由默認(rèn)的異常處理函數(shù)進(jìn)行處理
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
cb = format_helpers._format_callback_source(
self._callback, self._args)
msg = f'Exception in callback {cb}'
context = {
'message': msg,
'exception': exc,
'handle': self,
}
if self._source_traceback:
context['source_traceback'] = self._source_traceback
self._loop.call_exception_handler(context)
self = None # 發(fā)生異常時(shí)需要中斷循環(huán)。
Task.__step
Task.__step() 中的邏輯是如何運(yùn)行傳入的協(xié)程函數(shù):
# 包含所有正在活動(dòng)的任務(wù)
_all_tasks = weakref.WeakSet()
# 一個(gè)字典,包含當(dāng)前正在活動(dòng)的任務(wù) {loop: task}
_current_tasks = {}
def _enter_task(loop, task):
# 4. 為當(dāng)前的 loop 添加活動(dòng)任務(wù)
# 若當(dāng)前 loop 已經(jīng)有一個(gè)活動(dòng)任務(wù),則拋出 RuntimeError
current_task = _current_tasks.get(loop)
if current_task is not None:
raise RuntimeError(f"Cannot enter into task {task!r} while another "
f"task {current_task!r} is being executed.")
_current_tasks[loop] = task
def _leave_task(loop, task):
# 10. 取消活動(dòng)任務(wù)
current_task = _current_tasks.get(loop)
if current_task is not task:
raise RuntimeError(f"Leaving task {task!r} does not match "
f"the current task {current_task!r}.")
del _current_tasks[loop]
class Task(futures._PyFuture):
_log_destroy_pending = True
def __init__(self, coro, *, loop=None, name=None, context=None):
super().__init__(loop=loop)
if self._source_traceback:
del self._source_traceback[-1]
if not coroutines.iscoroutine(coro):
self._log_destroy_pending = False
raise TypeError(f"a coroutine was expected, got {coro!r}")
if name is None:
self._name = f'Task-{_task_name_counter()}'
else:
self._name = str(name)
self._num_cancels_requested = 0
self._must_cancel = False
self._fut_waiter = None
# 當(dāng)前運(yùn)行時(shí)來(lái)看,這里應(yīng)該是入口函數(shù)的 coroutine
# 即為 asyncio.run(main()) 的 main()
self._coro = coro
if context is None:
self._context = contextvars.copy_context()
else:
self._context = context
self._loop.call_soon(self.__step, context=self._context)
_register_task(self)
def __step(self, exc=None):
# 1. 若當(dāng)前任務(wù)已經(jīng) done 掉則拋出異常(這里的異常會(huì)被 handle._run 捕獲的)
if self.done():
raise exceptions.InvalidStateError(
f'_step(): already done: {self!r}, {exc!r}')
# 2. 若需要取消,且 exc 不是 CancelledError 類型的異常,則創(chuàng)建一個(gè)取消任務(wù)
# 實(shí)際上就是將 exc 賦值成一個(gè) CancelledError 的對(duì)象
if self._must_cancel:
if not isinstance(exc, exceptions.CancelledError):
exc = self._make_cancelled_error()
self._must_cancel = False
coro = self._coro
self._fut_waiter = None
# 3. 調(diào)用 _enter_task() 函數(shù)
_enter_task(self._loop, self)
# Call either coro.throw(exc) or coro.send(None).
try:
# 主動(dòng)啟動(dòng)協(xié)程對(duì)象
if exc is None:
# 我們直接使用 `send` 方法,因?yàn)閰f(xié)程
# 沒有 __iter__ 和 __next__ 方法。
result = coro.send(None)
else:
# 如果有 exc 則通過(guò) throw 向協(xié)程函數(shù)內(nèi)部拋出異常
result = coro.throw(exc)
except StopIteration as exc:
# 4. 若協(xié)程函數(shù)執(zhí)行完畢則判斷是否需要取消
if self._must_cancel:
# 通過(guò)調(diào)度后嘗試取消任務(wù)(下次事件循環(huán)過(guò)程中觸發(fā))
self._must_cancel = False
super().cancel(msg=self._cancel_message)
else:
# 設(shè)置結(jié)果
super().set_result(exc.value)
except exceptions.CancelledError as exc:
# 5. 保存原始異常,以便我們稍后將其鏈接起來(lái)
self._cancelled_exc = exc
# 通過(guò)調(diào)度后嘗試取消任務(wù)(下次事件循環(huán)過(guò)程中觸發(fā))
super().cancel() # I.e., Future.cancel(self).
except (KeyboardInterrupt, SystemExit) as exc:
# 6. 如果是 <c-c> 或者系統(tǒng)推出,則設(shè)置異常任務(wù),立即觸發(fā)
super().set_exception(exc)
raise
except BaseException as exc:
# 7. 若是其他基本異常,則設(shè)置異常任務(wù),立即觸發(fā)
super().set_exception(exc)
else:
# 8. 沒有異常,對(duì)協(xié)程結(jié)果開始進(jìn)行判定
# 首先,查看 result 是否具有 _asyncio_future_blocking 屬性
blocking = getattr(result, '_asyncio_future_blocking', None)
if blocking is not None:
# 如果 result 對(duì)象所屬的事件循環(huán)與當(dāng)前任務(wù)的事件循環(huán)不一致
# 則拋出 RuntimeError 異常(下次事件循環(huán)過(guò)程中觸發(fā))
if futures._get_loop(result) is not self._loop:
new_exc = RuntimeError(
f'Task {self!r} got Future '
f'{result!r} attached to a different loop')
self._loop.call_soon(
self.__step, new_exc, context=self._context)
# 如果 blocking 為 True
elif blocking:
# 如果返回的結(jié)果就是 Task 本身, 則引發(fā) RuntimeError
# (下次事件循環(huán)過(guò)程中觸發(fā))
if result is self:
new_exc = RuntimeError(
f'Task cannot await on itself: {self!r}')
self._loop.call_soon(
self.__step, new_exc, context=self._context)
# 將 self.__wakeup 設(shè)置為 result 對(duì)象的回調(diào)函數(shù)
# 并將 result 對(duì)象作為等待者保存在 _fut_waiter 屬性中
# 如果此時(shí)任務(wù)需要取消,并且成功取消了等待者,則將 _must_cancel 標(biāo)志設(shè)置為 False。
else:
result._asyncio_future_blocking = False
result.add_done_callback(
self.__wakeup, context=self._context)
self._fut_waiter = result
if self._must_cancel:
if self._fut_waiter.cancel(
msg=self._cancel_message):
self._must_cancel = False
# 如果 blocking 值為 False
# 則拋出 RuntimeError 異常(下次事件循環(huán)過(guò)程中觸發(fā))
else:
new_exc = RuntimeError(
f'yield was used instead of yield from '
f'in task {self!r} with {result!r}')
self._loop.call_soon(
self.__step, new_exc, context=self._context)
# 如果結(jié)果對(duì)象 result 為 None
# 表示協(xié)程使用了 yield 語(yǔ)句,它調(diào)度一個(gè)新的事件循環(huán)迭代,即再次調(diào)用 __step 方法。
# 直到 StopIteration 被觸發(fā)后,協(xié)程函數(shù)才真正運(yùn)行完畢
elif result is None:
self._loop.call_soon(self.__step, context=self._context)
# 如果結(jié)果對(duì)象 result 是一個(gè)生成器對(duì)象
# 則拋出 RuntimeError 異常,表示協(xié)程在生成器中使用了錯(cuò)誤的語(yǔ)法。
# (下次事件循環(huán)過(guò)程中觸發(fā))
elif inspect.isgenerator(result):
# Yielding a generator is just wrong.
new_exc = RuntimeError(
f'yield was used instead of yield from for '
f'generator in task {self!r} with {result!r}')
self._loop.call_soon(
self.__step, new_exc, context=self._context)
else:
# 對(duì)于其他類型的結(jié)果對(duì)象,拋出 RuntimeError 異常,表示協(xié)程產(chǎn)生了無(wú)效的
# 結(jié)果(下次事件循環(huán)過(guò)程中觸發(fā))
new_exc = RuntimeError(f'Task got bad yield: {result!r}')
self._loop.call_soon(
self.__step, new_exc, context=self._context)
finally:
# 9. 最后,使用 _leave_task 取消活動(dòng)任務(wù)
_leave_task(self._loop, self)
# 發(fā)生異常,需要中斷循環(huán)
self = None
關(guān)于回調(diào)函數(shù)的處理
眾所周知,無(wú)論是 task 對(duì)象還是 future 未來(lái)對(duì)象,我們都可以通過(guò) add_done_callback() 方法來(lái)為其新增一個(gè)回調(diào)函數(shù)。
那么在上面 task.__step() 方法運(yùn)行的過(guò)程中,回調(diào)函數(shù)是在何時(shí)運(yùn)行呢?
先從 add_done_callback() 方法看起,它其實(shí)是由 Task 類的父類 Future 實(shí)現(xiàn):
class Future:
_state = _PENDING
_result = None
_exception = None
_loop = None
_source_traceback = None
_cancel_message = None
_cancelled_exc = None
def __init__(self, *, loop=None):
"""
if loop is None:
self._loop = events._get_event_loop()
else:
self._loop = loop
self._callbacks = []
if self._loop.get_debug():
self._source_traceback = format_helpers.extract_stack(
sys._getframe(1))
# ...
def add_done_callback(self, fn, *, context=None):
# 若當(dāng)前對(duì)象的狀態(tài)不是 peding
# 則將 callback 放在下次事件循環(huán)中運(yùn)行
if self._state != _PENDING:
self._loop.call_soon(fn, self, context=context)
else:
# 否則,將回調(diào)函數(shù)放在列表中
if context is None:
context = contextvars.copy_context()
self._callbacks.append((fn, context))
調(diào)用 callback 的方法是由 Future 實(shí)現(xiàn),方法名為 __schedule_callbacks,源碼如下:
def __schedule_callbacks(self):
callbacks = self._callbacks[:]
if not callbacks:
return
self._callbacks[:] = []
# 循環(huán)所有回調(diào)函數(shù)、統(tǒng)一將其安排在下一次循環(huán)中按順序執(zhí)行
for callback, ctx in callbacks:
self._loop.call_soon(callback, self, context=ctx)
接下來(lái)我們只需要找到在那些方法中會(huì)調(diào)用 __schedule_callbacks 就知道了其執(zhí)行時(shí)機(jī),以下方法均為 Future 類提供:
def set_result(self, result):
if self._state != _PENDING:
raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
self._result = result
self._state = _FINISHED # 修改任務(wù)狀態(tài)
self.__schedule_callbacks()
def set_exception(self, exception):
if self._state != _PENDING:
raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
if isinstance(exception, type):
exception = exception()
if type(exception) is StopIteration:
raise TypeError("StopIteration interacts badly with generators "
"and cannot be raised into a Future")
self._exception = exception
self._exception_tb = exception.__traceback__
self._state = _FINISHED # 修改任務(wù)狀態(tài)
self.__schedule_callbacks()
self.__log_traceback = True
def cancel(self, msg=None):
self.__log_traceback = False
if self._state != _PENDING:
return False
self._state = _CANCELLED # 修改任務(wù)狀態(tài)
self._cancel_message = msg
self.__schedule_callbacks()
return True
以此可見,回調(diào)函數(shù)的執(zhí)行會(huì)放在事件循環(huán)的就緒隊(duì)列中,如果 task 或者 future 的 callback 在執(zhí)行過(guò)程中擁有較長(zhǎng)的阻塞時(shí)長(zhǎng)時(shí),將會(huì)阻塞整個(gè)事件循環(huán)!
除此之外,每一次 callback 的執(zhí)行必須是在當(dāng)前主任務(wù)運(yùn)行完畢后執(zhí)行。舉個(gè)例子:
ready = [task1, task2, task3]
若第一個(gè) task 有 callback, 則其 callback 會(huì)放在最后:
ready = [task2, task3, task1_cb]
callback 運(yùn)行前必須先運(yùn)行 task2 和 task3。
主協(xié)程任務(wù)的結(jié)束
當(dāng)主協(xié)程任務(wù)結(jié)束后,所有的子協(xié)程任務(wù)也會(huì)結(jié)束掉。這是為什么呢?我們繼續(xù)從源碼角度進(jìn)行分析。
首先在 BaseEventLoop.run_until_complete() 方法中,_ready 隊(duì)列會(huì)在下次循環(huán)中添加 1 個(gè) callback:
future.add_done_callback(_run_until_complete_cb)
def add_done_callback(self, fn, *, context=None):
if self._state != _PENDING:
self._loop.call_soon(fn, self, context=context)
else:
# 主協(xié)程任務(wù)的狀態(tài)此時(shí)應(yīng)該是 peding
# 所以他只會(huì)在主協(xié)程任務(wù)結(jié)束后將回調(diào)添加到 ready 隊(duì)列中
if context is None:
context = contextvars.copy_context()
self._callbacks.append((fn, context))
當(dāng)主協(xié)程任務(wù)在 BaseEventLoop.__step() 方法中被運(yùn)行 set_result()、set_exception()、或者 cancel() 任意一個(gè)時(shí),base_events._run_until_complete_cb() 都會(huì)被添加進(jìn) _ready 隊(duì)列中。
而 base_events._run_until_complete_cb() 方法的實(shí)現(xiàn)如下:
def _run_until_complete_cb(fut):
if not fut.cancelled():
exc = fut.exception()
if isinstance(exc, (SystemExit, KeyboardInterrupt)):
# Issue #22429: run_forever() already finished, no need to
# stop it.
return
futures._get_loop(fut).stop()
事件循環(huán)的 stop() 方法實(shí)現(xiàn)、直接看 BaseEventLoop.stop() 即可,因?yàn)?_UnixSelectorEventLoop 包括 BaseSelectorEventLoop 都未實(shí)現(xiàn)該方法:
def stop(self):
self._stopping = True
最后再回過(guò)頭看 BaseEventLoop.run_forever() 方法,是不是明了了些?:
def run_forever(self):
# ...
try:
self._thread_id = threading.get_ident()
sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
finalizer=self._asyncgen_finalizer_hook)
events._set_running_loop(self)
while True:
self._run_once()
if self._stopping:
break
# ...
總結(jié)、在主協(xié)程任務(wù)運(yùn)行時(shí),其 callback 方法 base_events._run_until_complete_cb() 并不會(huì)馬上添加至 ready 隊(duì)列中。
一但主協(xié)程任務(wù)運(yùn)行完畢(調(diào)用 cancel()、set_result()、set_exception())時(shí),callback 會(huì)立即添加到 ready 隊(duì)列中。
這意味著事件循環(huán)即將結(jié)束,但在 callback 之前的子任務(wù)還可以繼續(xù)運(yùn)行,一旦當(dāng) callback 執(zhí)行完畢,那么就意味著事件循環(huán)被關(guān)閉掉了。BaseEventLoop._run_once() 方法也不會(huì)繼續(xù)運(yùn)行。至此整個(gè)事件循環(huán)的生命周期才真正結(jié)束。
事件循環(huán)啟動(dòng)和任務(wù)執(zhí)行流程圖
基本的事件循環(huán)啟動(dòng)和任務(wù)執(zhí)行流程圖如下:
本章結(jié)語(yǔ)
由于平時(shí)要忙工作什么的,算下來(lái)這篇文章總共花了我大概小半個(gè)月時(shí)間,不過(guò)算起來(lái)收獲頗豐。
至少筆者在讀完 asyncio 事件循環(huán)后,也有了一些新的感悟:
- 每一個(gè)事件循環(huán)都有一個(gè) sock 對(duì)象和一個(gè)系統(tǒng)選擇器,這也是 loop.create_server() 方法的基礎(chǔ),在每次運(yùn)行 BaseEventLoop._run_once() 方法時(shí)都會(huì)去檢測(cè)一遍系統(tǒng)選擇器有沒有準(zhǔn)備好的事件描述符,若有則運(yùn)行其他邏輯(當(dāng)然這部分還沒有深入研究,但大體上是不會(huì)錯(cuò)的)
- 事件循環(huán)中有很多對(duì) loop 的操作,如 new_event_loop()、set_event_loop()、get_event_loop()、get_running_loop() 等等,通過(guò)源碼閱讀可以更好的清楚他們的作用
- 清楚了 create_task() 以及 call_soon() 方法的關(guān)系,明白了 Task 對(duì)象和 Future 對(duì)象以及 Handle 對(duì)象的關(guān)系
- 知道了事件循環(huán)是定序執(zhí)行子任務(wù)的,也知道了回調(diào)函數(shù)的添加以及執(zhí)行時(shí)機(jī),更重要的是明白了事件循環(huán)是如何實(shí)現(xiàn)的
- 知曉了一些鉤子函數(shù)的真實(shí)作用,如 set_task_factory() 等等
其實(shí) asyncio 不止單單一個(gè)事件循環(huán)、除此之外還有 socket、流式傳輸、各種鎖的應(yīng)用等等,事件循環(huán)只能說(shuō)是 asyncio 中的基礎(chǔ)。
最后的最后,希望大家繼續(xù)努力,保持學(xué)習(xí),不忘初心。
還是開篇那句話 「路漫漫其修遠(yuǎn)兮, 吾將上下而求索」與諸君共勉之。