Python协程和猴子补丁

协程基础

协程概念

协程(Coroutines)是一种特殊的软件构造,是一种用户态的轻量级线程。它允许程序在执行过程中暂停并恢复执行,而不会丢失当前的执行上下文。与线程和进程不同,协程在单个线程中运行,通过调度机制实现并发,降低了上下文切换的开销,提高了程序的执行效率。协程通常用于处理I/O密集型任务,如网络请求、文件读写等。在 Python 中协程就是一个可以暂停执行的函数,听起来和生成器的概念一样。

生成器与yield的原理

生成器(Generators)是Python中实现协程的一种方式,它通过内置的yield关键字来暂停和恢复执行。当函数遇到yield时,会暂停执行并返回一个值,下次调用时会从上次暂停的地方继续执行。yield实际上是一个特殊的return语句,它会保存当前的状态(包括局部变量和执行上下文),当再次调用时,这些状态会被恢复。

协程与多线程/多进程的区别

  • 多线程:线程是操作系统层面的并行执行单位,线程间通信需要锁等同步机制,上下文切换开销大,适合CPU密集型任务。
  • 多进程:进程是独立的执行环境,拥有自己的内存空间,适合I/O密集型任务,但创建和销毁进程开销大。
  • 协程:协程在单线程中通过控制流切换实现并发,没有线程切换开销,但资源占用相对较少,适合I/O等待任务。

协程的生命周期与状态转换

  • 创建:函数定义为生成器,使用yield关键字。
  • 启动:通过调用生成器实例的next()send()方法开始执行,直到遇到yield
  • 暂停:遇到yield时,函数暂停,保存当前状态。
  • 恢复:通过send()方法传入值,函数从上次暂停的地方继续执行。
  • 结束:当没有更多yield可执行,或遇到return语句时,协程结束

实例1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# coding: utf-8

def coroutine_work():
print ('->start work')

while True:
rc = (yield)
print('->work', rc)


def coroutine_eat():
print('->start eat')

while True:
rc = (yield)
print('->eat', rc)


cw = coroutine_work()
ce = coroutine_eat()
next(cw) # next启动work协程
next(ce) # next启动eat协程
cw.send('1')
ce.send('1')
cw.send('2')
ce.send('2')
ce.close()
cw.close()

输出:

1
2
3
4
5
6
->start work
->start eat
->work 1
->eat 1
->work 2
->eat 2

实例2:使用协程计算移动平均值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def averager():
total = 0.0
count = 0
avg = None

while True:
num = yield avg
total += num
count += 1
avg = total/count

# run
ag = averager()
# 预激协程
print(next(ag)) # None

print(ag.send(10)) # 10
print(ag.send(20)) # 15

输出:

1
2
3
None
10.0
15.0

解释:

  1. 调用 next(ag) 函数后,协程会向前执行到 yield 表达式,产出 average 变量的初始值——None。
  2. 此时,协程在 yield 表达式处暂停。
  3. 使用 send() 激活协程,把发送的值赋给 num,并计算出 avg 的值,avd算完之后会被返回。
  4. 使用 print 打印出 yield 返回的数据。

实例3:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import time


def producer():
while True:
time.sleep(1)
print("+++++ 1个包子", time.strftime("%X"))
yield

def consumer():
while True:
next(prd)
print("----- 1个包子", time.strftime("%X"))

if __name__ == "__main__":
prd = producer()
consumer()

输出:

1
2
3
4
5
6
7
8
9
10
+++++ 1个包子 17:34:05
----- 1个包子 17:34:05
+++++ 1个包子 17:34:06
----- 1个包子 17:34:06
+++++ 1个包子 17:34:07
----- 1个包子 17:34:07
+++++ 1个包子 17:34:08
----- 1个包子 17:34:08
+++++ 1个包子 17:34:09
----- 1个包子 17:34:09

解释:上述代码中定义了两个生成器函数,producer 函数用于生产包子,consumer 函数用于消费包子。在 consumer 函数中,使用 next 函数来获取 producer 函数中 yield 语句产出的值,从而实现了两个函数的交替执行。

yield from获取协程的返回值

为了得到返回值,协程必须正常终止;然后生成器对象会抛出StopIteration 异常,异常对象的 value 属性保存着返回的值

yield from 结构会在内部自动捕获 StopIteration 异常。对 yield from 结构来说,解释器不仅会捕获 StopIteration 异常,还会把value 属性的值变成 yield from 表达式的值。

1
2
3
4
5
6
7
8
9
10
def gen():
for c in 'AB':
yield c

print(list(gen()))

def gen_new():
yield from 'AB'

print(list(gen_new()))

输出:

1
2
['A', 'B']
['A', 'B']

yield from x 表达式对 x 对象所做的第一件事是,调用 iter(x),从中获取迭代器,因此,x 可以是任何可迭代的对象,这只是 yield from 最基础的用法,感觉yield可以看作一个return。

使用yield完成协程任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 在函数中有yield,函数就变成了生成器,就可以使用next()方法
def task1():
for i in range(3):
print('A' + str(i))
yield
time.sleep(1)


def task2():
for i in range(3):
print('B' + str(i))
yield
time.sleep(2)


if __name__ == '__main__':
g1 = task1()
g2 = task2()

while True:
try:
next(g1)
next(g2)
except:
break
1
2
3
4
5
6
A0
B0
A1
B1
A2
B2

使用greenlet完成协程任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 使用greenlet完成协程任务
import time
from greenlet import greenlet


def a(): # 任务A
for i in range(5):
print('A' + str(i))
gb.switch()
time.sleep(0.1)


def b(): # 任务B
for i in range(5):
print('B' + str(i))
gc.switch()
time.sleep(0.1)


def c(): # 任务C
for i in range(5):
print('C' + str(i))
ga.switch()
time.sleep(0.1)


if __name__ == '__main__':
ga = greenlet(a)
gb = greenlet(b)
gc = greenlet(c)

ga.switch()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
A0
B0
C0
A1
B1
C1
A2
B2
C2
A3
B3
C3
A4
B4
C4

猴子补丁

概念

猴子补丁是一种在运行时动态修改模块、类、函数或对象行为的编程技术。它通常通过替换或扩展原有代码中的函数、方法或属性来实现。由于这种修改是在运行时进行的,因此无需修改原始代码或重新编译。

原理

Python的动态类型系统和解释执行的特性使得猴子补丁成为可能。在Python中,几乎所有的东西都是对象,包括函数、方法、类等。这些对象都存储在模块的全局命名空间中,我们可以通过修改这些命名空间中的对象来改变程序的行为。

举个例子很容易懂

third_party:

1
2
3
4
5
6
7
8
9
def step1():
print("step1")

def step2():
print("step2")

def run():
step1()
step2()

my_code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# my_code.py
import mock
import third_party


def step3():
print('step3')


def my_run():
third_party.step1()
step3()
third_party.step2()


print('invoke3 ...')
third_party.run()
print("============")
print('invoke1 ...')
with mock.patch.object(third_party, 'run', my_run):
third_party.run()
print("============")
print('invoke2 ...')
third_party.run()

1
2
3
4
5
6
7
8
9
10
11
12
invoke3 ...
step1
step2
============
invoke1 ...
step1
step3
step2
============
invoke2 ...
step1
step2

gevent和猴子补丁

greenlet已经实现了协程,但是这个人工切换,是不是觉得太麻烦了,不要着急,python还有一个比greenlet更强大的并且能够自动切换任务的模块gevent

geventPython 的一个并发框架,基于 greenlet 实现,使用了 epoll 事件监听机制以及诸多其他优化而变得高效。其基本思想就是一个 greenlet 就是一个协程,当 greenlet 遇到 IO 操作时,比如访问网络,就会自动切换到其他的 greenlet,等待 IO 完成再切换回来继续执行。gevent 可以帮我们自动实现这个协程切换的过程。

由于IO操作非常耗时,经常使程序处于等待状态,有了gevent我们自动切换协程,就保证总有greenlet在运行,而不是等待IO

猴子补丁,将程序中用到的耗时操作的代码,换为gevent中自己实现的模块,如time.sleep(0.1),gevent中也有sleep()方法

一种在运行时候动态修改类,函数的功能的属性编程技巧

  1. 在运行时候替换方法,属性

  2. 在不修改第三方代码情况下,增加原来不支持对象

  3. 运行时为内存中对象增加PATCH,不是在源代码磁盘中增加

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import time
import gevent
from gevent import monkey

# 有耗时操作时需要
monkey.patch_all() # 打补丁,将程序中用到的耗时操作的代码,换为gevent中自己实现的模块


def a(): # 任务A
for i in range(5):
print('A' + str(i))
time.sleep(0.1)


def b(): # 任务B
for i in range(5):
print('B' + str(i))
time.sleep(0.1)


def c(): # 任务C
for i in range(5):
print('C' + str(i))
time.sleep(0.1)


if __name__ == '__main__':
g1 = gevent.spawn(a)
g2 = gevent.spawn(b)
g3 = gevent.spawn(c)

g1.join()
g2.join()
g3.join()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
A0
B0
C0
A1
B1
C1
A2
B2
C2
A3
B3
C3
A4
B4
C4

由于使用了 monkey.patch_all 对耗时操作进行了替换,gevent 能够在这三个协程之间自动切换执行,而不是按照顺序依次等待每个耗时操作完成。这样可以实现并发效果,提高程序的执行效率。

例如,可能的输出顺序不是严格按照 A0A1A2 ….. B0B1 ….. C0C1 ….. ,而是 A0B0C0A1 等交错进行,具体的输出顺序取决于 gevent 的调度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import requests
import gevent
from gevent import monkey

monkey.patch_all()


def download(url):
response = requests.get(url) # 耗时操作
content = response.text
print('下载了{}的数据,长度:{}'.format(url, len(content)))


if __name__ == '__main__':
urls = ['http://www.163.com', 'http://www.qq.com', 'http://www.baidu.com']
g1 = gevent.spawn(download, urls[0])
g2 = gevent.spawn(download, urls[1])
g3 = gevent.spawn(download, urls[2])

g1.join()
g2.join()
g3.join()

# gevent.joinall(g1, g2, g3)
1
2
3
下载了http://www.qq.com的数据,长度:2382
下载了http://www.163.com的数据,长度:2388
下载了http://www.baidu.com的数据,长度:2381
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import gevent,time

def f1():
for i in range(5):
print('function:@@@f1 | NUM: @@@',i)

# 此处阻塞,gevent会帮我们切换到其他协程去↓
gevent.sleep(0)

def f2():
for i in range(5):
print('function:@@@f2 | NUM: @@@',i)

# 此处阻塞,gevent会帮我们切换到其他协程去↑
gevent.sleep(0)

# 创建两个协程对象,分别去执行两个函数
xc1=gevent.spawn(f1)
xc2=gevent.spawn(f2)

# 将协程们交给gevent去执行
gevent.joinall([xc1,xc2])
1
2
3
4
5
6
7
8
9
10
function:@@@f1 | NUM: @@@ 0
function:@@@f2 | NUM: @@@ 0
function:@@@f1 | NUM: @@@ 1
function:@@@f2 | NUM: @@@ 1
function:@@@f1 | NUM: @@@ 2
function:@@@f2 | NUM: @@@ 2
function:@@@f1 | NUM: @@@ 3
function:@@@f2 | NUM: @@@ 3
function:@@@f1 | NUM: @@@ 4
function:@@@f2 | NUM: @@@ 4

如上,当 gevent 帮我们执行两个协程的时候,首先 xc1 执行到 gevent.sleep(0)时发生阻塞,此时,gevent 帮我们将切换到 xc2xc2 执行到 gevent.sleep(0)时又发生了阻塞,此时,gevent 又帮我们将切换到 xc1 去执行。

gevent 无法捕获的耗时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import gevent,time

def f1():
for i in range(5):
print('function:@@@f1 | NUM: @@@',i)

# 注意这里
time.sleep(0.1)

def f2():
for i in range(5):
print('function:@@@f2 | NUM: @@@',i)

# 注意这里
time.sleep(0.1)

# 创建两个协程对象,分别去执行两个函数
xc1=gevent.spawn(f1)
xc2=gevent.spawn(f2)

# 将协程们交给gevent去执行
gevent.joinall([xc1,xc2])
1
2
3
4
5
6
7
8
9
10
function:@@@f1 | NUM: @@@ 0
function:@@@f1 | NUM: @@@ 1
function:@@@f1 | NUM: @@@ 2
function:@@@f1 | NUM: @@@ 3
function:@@@f1 | NUM: @@@ 4
function:@@@f2 | NUM: @@@ 0
function:@@@f2 | NUM: @@@ 1
function:@@@f2 | NUM: @@@ 2
function:@@@f2 | NUM: @@@ 3
function:@@@f2 | NUM: @@@ 4

如上,你会发现,time.sleep(0.1)耗费的时间,gevent 无法捕捉,导致代码是串行的,虽然我们创建了协程,但是并没有起到异步的作用。

怎么办呢?猴子补丁。

对于无法捕获的耗时,gevent 为我们提供了猴子补丁,当我们为我们的程序打了猴子补丁,那么当我们的程序遇到任何耗时的操作,gevent 都会帮我们去自动切换协程,从而实现异步高并发。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import gevent,time
from gevent import monkey;

monkey.patch_all()#这里很重要

def f1():
for i in range(5):
print('function:@@@f1 | NUM: @@@',i)

# 注意这里
time.sleep(0.1)

def f2():
for i in range(5):
print('function:@@@f2 | NUM: @@@',i)

# 注意这里
time.sleep(0.1)

# 创建两个协程对象,分别去执行两个函数
xc1=gevent.spawn(f1)
xc2=gevent.spawn(f2)

# 将协程们交给gevent去执行
gevent.joinall([xc1,xc2])
1
2
3
4
5
6
7
8
9
10
function:@@@f1 | NUM: @@@ 0
function:@@@f2 | NUM: @@@ 0
function:@@@f1 | NUM: @@@ 1
function:@@@f2 | NUM: @@@ 1
function:@@@f1 | NUM: @@@ 2
function:@@@f2 | NUM: @@@ 2
function:@@@f1 | NUM: @@@ 3
function:@@@f2 | NUM: @@@ 3
function:@@@f1 | NUM: @@@ 4
function:@@@f2 | NUM: @@@ 4

Python协程和猴子补丁
http://example.com/2024/07/13/Python协程与猴子补丁/
Author
Posted on
July 13, 2024
Licensed under