0%

Python 并发编程札记 (Part 2)

Python 并发编程笔记的第二篇长文,主要重点在于解释进程和线程之间的关系与区别,另外还会涉及到 Python 的多进程、多线程编程以及协程在网络编程中的应用。

进程和线程

进程

根据《深入理解计算机系统》的描述,进程是计算机科学中最重要和最成功的概念之一,“它给人一种假象,好像系统上只有这个程序在运行,让它独占的使用处理器、主存储、和 I/O 设备”。

进程(process)和程序的概念,其实和 Docker 里面的容器跟镜像类似,一个是动态的概念,一个是静态的概念。当我们双击打开一个 .doc 文件就启动了一个 Word 进程去处理它。每个进程都拥有自己的地址空间、内存、数据栈以及其他用于跟踪执行的辅助数据。操作系统管理其上所有进程的执行,并为这些进程合理地分配时间。

进程一般由 程序数据集进程控制块 三部分组成。

  • 程序 用来描述进程要完成哪些功能以及如何完成
  • 数据集 则是程序在执行过程中所需要使用的资源
  • 进程控制块 用来记录进程的外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志。

进程间通信(IPC,InterProcess Communication)一般有以下的方式:

  1. 管道(pipe)以及具名管道(named pipe)
  2. 共享内存(shared memory)
  3. 套接字(socket)
  4. 信号(signal)
  5. 消息队列(message queue)

经典的三态模型下,进程的声明周期具有三种状态(这从侧面反映出进程是动态的这一概念),分别是 运行态(running)、就绪态(ready)以及 等待态(wait)/阻塞态(blocked)。在部分系统中还会存在 挂起状态,而在实际系统中为了便于管理,通常还会有 创建态(new)以及 终止态(exit)。

线程

线程(thread),有时会被称作轻量级进程(light weight process,LWP,与之对应进程就是 heavy weight process,HWP),因为在 Linux 下线程是通过进程来模拟的。进程可以理解为线程的集合,每个进程至少有一个线程,即主线程。同一进程下的线程共享相同的地址空间和其他资源,除此之外,线程还会有自己的栈和栈指针、程序计数器等寄存器。

我们常说,线程是 CPU 调度和分派的基本单位,换言之,线程是操作系统直接支持的执行单元。

线程也有它自己的状态,新建 new等待 wait就绪 ready运行 run 以及 阻塞 block 五种状态。它有一个指令指针,用于记录当前运行的上下文。当其他线程运行时,它可以被抢占(中断)和临时挂起(也称为睡眠)—— 这种做法叫做让步(yielding)。

进程和线程的关系和区别

  1. 进程偏向于内存管理,是资源分配的最小单位;线程倾向于 CPU 运行,是 CPU 调度和分派的基本单位;
  2. 进程拥有自己独立的地址空间,而线程没有,因此线程必须依赖进程而存在;
  3. 线程的出现主要是因为进行进程调度的时候,创建、撤销和切换进程的开销比较大,和往告诉发展的 CPU 不匹配,因此出现这个“轻量级进程”来提高性能。
  4. 一个进程崩溃后,在保护模式下不会对其他进程产生影响,但是一个线程崩溃整个进程都死掉 —— 所以多进程要比多线程健壮。

阮一峰的博客有一篇图文并茂的 blog 来阐述进程和线程之间的关系,可以简述为 “工厂——车间——工人” 的形式,适合向别人浅白易懂的解释他们的关系。

个人理解

进程和线程的存在都是为了 CPU 时间片这个“大厨”而产生的,而这个“大厨”特殊在它是自备案台的。进程调度相当于我要用这个“大厨”处理若干道菜式,那么原材料和厨具这些都是需要在处理每一道菜之前全部提供到给“大厨”的(类似于内存资源等的分配)。而每一道菜都有它们各自的工序步骤(划分为多个线程),我们可以利用这个“大厨”处理各种如拣菜,制作酱料等的工序,这些工序虽然需时有快有慢,厨具材料都在案台上,“大厨”挪一下身子就好处理另一道工序;而如果要干另外一道菜,“大厨”就需要换个案台,把前一个案台的东西封起来,再准备另一道菜的工具材料,这样依赖开销就大了,“大厨”也就不能专心做菜了。

多进程编程

我们可能已经知道 Linux 提供了 fork() 调用来创建进程,这个可以用 os 模块来调用,但对于 Windows 平台,这就比较乏力了。但是 Python 是跨平台的语言,自然在标准库里集成了多进程的支持,那个支持就是 multiprocessing 库。

multiprocessing 标准库简介

当我们使用 help(multiprocessing) 来打印 multiprocessing 库的帮助信息会发现,这个库类似于我们下面将要介绍的 threading 库,但它操作对象是 process 而不是 thread。这个库尽可能提供大部分类似功能的 API,同时它也提供了一个包含的库 multiprocessing.dummy 来实现对 threading 库的简单封装。

实例

1. 创建新进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from multiprocessing import Process

def func(a, b):
import time
time.sleep(10)
print(a + b)

if __name__ == '__main__':
p = Process(target=func, args=(1, 2,))
p.start()
print(p.is_alive())
# 输出 True
p.join()
# 输出 3
print(p.is_alive())
# 输出 False

这里有 Process 的方法:

  • start —— 启动一个进程
  • join —— 阻塞当前进程,直到调用 join 方法的进程结束后再继续执行当前进程
  • is_alive —— 判断进程是否存在

2. run 的使用

runProcess 类上的方法,如果没有在初始化 Process 时指定 target,那么它默认会执行 run 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# coding: utf-8
from multiprocessing import Process

def func(a, b):
print(a + b)
return a + b

# 使用方法一: 直接指定 run 方法
p1 = Process()
p1.run = func
p1.run(1, 2)
# 输出 3

# 使用方法二: 在子类中 override 了 run 方法
class SubProcess(Process):
def run(self, *args, **kwargs):
print(sum(args))

p2 = SubProcess()
p2.run(1, 2, 3)
# 输出 6

startrun 方法的区别

我们可以留意到 Processrun 方法的源码:

1
2
3
4
5
6
def run(self):
'''
Method to be run in sub-process; can be overridden in sub-class
'''
if self._target:
self._target(*self._args, **self._kwargs)

如果我们调用 Process 或 继承 Process 的子类的 run 方法的话,实际上相当于调用一个可调用对象(callable object),这个过程不涉及任何进程相关的内容,类似于调用一个普通的方法,因此你会留意到如果在 run 方法体里面有延迟操作,它是表现为阻塞当前进程的;

而在 start 方法的源码中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def start(self):
'''
Start child process
'''
assert self._popen is None, 'cannot start a process twice'
assert self._parent_pid == os.getpid(), \
'can only start a process object created by current process'
assert not _current_process._config.get('daemon'), \
'daemonic processes are not allowed to have children'
_cleanup()
self._popen = self._Popen(self)
self._sentinel = self._popen.sentinel
# Avoid a refcycle if the target function holds an indirect
# reference to the process object (see bpo-30775)
del self._target, self._args, self._kwargs
_children.add(self)

如何理解这段代码呢?首先,Process_Popen 封装了子进程的操作,同时用 _popen 代表子进程。start 方法会进行三次断言:

  1. 不能重复 start 一个进程
  2. 只能 start 当前进程自己创建的子进程
  3. daemon 进程不能有子进程(因此也不能 start)

然后 _cleanup() 清理当前已完成的进程,之后操作子进程 _Popen

因此 start 方法才是进程相关的方法,我们启动进程只能使用 start 方法。对于 run 方法的用法,一般都会把 run 覆写为无参的可调用对象,在调用 start 方法时被调用。

3. 守护进程 daemon

守护进程,顾名思义,一般是一个持久运行的进程,随着主进程的结束而结束,因此有“守护”之意。

注意(其实从上面 start 源码可以知道),守护进程是不允许创建子进程的。因为如果主进程退出了,守护进程跟着结束,它的子进程就会成为孤儿进程残留于系统中。

设置一个进程为守护进程,只需要设置它的 daemon 属性,它是一个布尔值(注意这个属性必须在 start 方法之前设置)。它的默认值会继承于创建它的进程。

1
2
p = Process()
p.daemon = True

守护进程也等待其退出,我们只需要在 join 中传入浮点数值 n,等待 n 秒之后就取消等待。

4. 终止进程

让进程退出,我们通常使用信号的方式(posion pill)来让其退出,但是我们也希望有一种手段可以强制性(force)地终结进程,terminate 就是这样的方法。

1
2
3
4
p = Process(target=time.sleep, args=(1000,))
p.start()
p.join(3)
p.terminate()

注意对子进程 terminate ,其后代子进程不会被终止 —— 这会导致它们成为“孤儿进程”。

5. 进程间同步

一般来说,多进程应该避免共享资源。在多线程中,我们可以比较容易地共享资源,比如使用全局变量或者传递参数。在多进程情况下,由于每个进程有自己独立的内存空间,以上方法并不合适。一旦需要在进程间共享资源,就不得不面对进程间的同步与互斥(这两个问题几乎和资源分配彼此相生),multiprocessing 也提供了类似于 threading 的同步原语的实现:

模块 描述
Lock 互斥锁
RLock 可重入的互斥锁(同一个进程可以多次获取,同时不会造成阻塞)
Semaphore 信号量
BoundedSemaphore 有边界的信号量
Barrier 进程阻拦
Event 同步事件
Condition 条件变量

这部分内容和 threading 中的大同小异,具体将在 threading 中的线程同步一节给出示例。

5. IPC 进程间通信

我们这里提及有关 Python multiprocessing 提供的 IPC 方式主要有两种,一种是 管道 Pipe,一种是 队列 Queue注意,进程间通信所传递的对象,都必须是可序列化的

当然对于进程间通信你也可以采用 共享资源 的方式,配合上一个小节提供的进程同步原语来控制同步与互斥,但我们在多进程开发中一般都是避免由自己设计共享资源的 IPC 方案。

Connection 对象

Connection 对象允许发送和接收可序列化对象以及字符串,它们通常是使用 Pipe 模块创建的。

方法 说明
send(obj) 发送一个对象 obj 到管道另一端的 Connection 对象,后者可以使用 recv 获取这个对象。注意非常大的可序列化对象可能会抛出 ValueError 异常
recv() Connection 对象可以获得管道另一端通过 send 方法发送过来的对象。(调用这个方法之后)会一直阻塞直到获取到对象。如果没有获取到对象而且另一端被关闭了会抛出 EOFError 异常
send_bytes(buffer[, offset[, size]]) 把一个 bytes-like object 类字节对象作为完整的信息以字节数据的形式发送
recv_bytes([maxlength]) / recv_bytes_into(buffer[, offset]) 以字节数据的形式获取信息,前者和 recv 类似,后者会把信息读取到一个类子节对象中并返回读取的字节数
close() 关闭一个 Connection 对象,在 GC 时会被自行调用
1
2
3
4
5
6
7
8
9
# 这里演示的是官方网站的一段例子
>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'Hello World')
>>> a.recv_bytes()
b'Hello World'

Pipe

调用 Pipe 方法会返回一个 Connection 对象的元组,代表管道的两端。

Pipe 可以时单向(half-duplex),也可以时双向的(duplex),默认是 双向 的。在创建管道时可以指定创建单向的管道:Pipe(duplex=False),得到的二元组前者只读,后者只写。单向管道只允许一端输入,而双向的允许两端输入。

一个进程可以从 Pipe 的一端发送对象,然后被 Pipe 另一端的进程接收,这两个进程需要有共同的父进程或其中一个是父进程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# Pipe 进程间通信的简单例子
from multiprocessing import Process, Pipe
def f(conn):
"""
:type conn: multiprocessing.Connection
"""
conn.send([42, None, 'Hello'])
conn.close()

if __name__ == '__main__':
parent, child = Pipe(duplex=False)
p = Process(target=f, args=(child,))
p.start()
print(parent.recv())
p.join()
# output [42, None, 'Hello']

Queue

multiprocessing.Queue 和 queue.Queue 很类似,也是一种 FIFO 的结构,除了 task_donejoin 方法之外,queue.Queue 的方法它都具有。调用 Queue([maxsize]) 方法会返回一个进程共享队列(由一个 pipe 管道以及少量的 lock 锁和 semaphores 信号量实现)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# Queue 进程间通信的简单例子
from multiprocessing import Process, Queue
import time, random

def producer(q):
"""
模拟生产者
:type q: multiprocessing.Queue
"""
for value in ['apple', 'banana', 'orange', 'pineapple']:
q.put(value)
print("Produce %s" % value)
time.sleep(random.random())

def consummer(q):
""""
模拟消费者
:type q: multiprocessing.Queue
"""
while True:
value = q.get(True)
print("Get %s." % value)

if __name__ == "__main__":
q = Queue(3)
p = Process(target=producer, args=(q,))
c = Process(target=consummer, args=(q,))
# 启动进程
p.start()
c.start()
p.join()
# 没有采用信号量,所以需要手动终止消费者进程
c.terminate()

multiprocessing.Queue 还有另外两个变种,分别是 multiprocessing.SimpleQueue 和 multiprocessing.JoinableQueue

6. 进程池

multiprocessing 提供了 Pool 类来实现进程池。进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池中的序列没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。

我们使用 Pool() 创建进程实例,其中它的参数列表:

  • processes —— 表示进程池内部的工作进程数目
  • initializer 和 initargs 搭配使用,如果 initializer 不为空,则对于每个调用 start 方法的进程都会调用 initializer(*initargs)
  • maxtasksperchild —— 用于指定进程序列中的进程在完成若干个任务之后会退出并被新的进程所刷新取代。默认为 None 的情况下进程池会和它其中的进程共存
  • context —— 可以用来指定进程序列中的进程使用什么样的上下文环境

对于 Pool 类的实例,我们有以下方法:

方法 说明
apply(func[, args[, kwds]]) 对指定的 func 使用 args 入参和关键字参数列表 kwds,在线程池的一个工作进程中运行,它会一直阻塞直到函数返回结果,也就是说进程池的任务是排队执行的
apply_async(func[, args[, kwds[, callback[, error_callback]]]]) apply 方法的一个变种,以异步回调的方式实现,返回的是一个 AsyncResult 对象。这个方法会直接把 func 返回的结果直接应用于单一入参的可调用对象 callback 上(如果指定了 callback);如果调用 func 失败,则会把异常实例作为入参作用于 error_callback(如果指定了 error_callback)。注意回调函数们都应该立即返回否则处理结果所在的线程会一直阻塞
map(func, iterable[, chunksize]) map() 函数的并行版本,它会把可迭代对象 iterable 划分成若干块,然后提交给进程池,让其作为独立的任务运行。chunksize 可以被指定为要划分的块数
map_async(func, iterable[, chunksize[, callback[, error_callback]]]) map 方法的一个变种,和 map 的关系类似于 applyapply_async 的关系
imap(func, iterable[, chunksize]) map() 函数的延迟加载版本,返回的时一个 迭代器,可以在其上使用 next 方法。对于非常大的可迭代对象使用一个比较大的 chunksize 可以比默认为 1 时完成得快得多
imap_unordered(func, iterable[, chunksize]) 类似于 imap 方法,区别在于处理可迭代对象的顺序是不固定得(乱序执行),除非进程池只分配了一个工作进程(才会“正确”顺序执行)
close() 阻止更多的任务提交到进程池序列中,当所有工作进程的任务都完成之后会直接退出
terminate() 马上停止工作进程并不进行任何扫尾工作。GC 回收进程池对象时会马上调用这个方法
join() 等待工作进程全部退出,在使用 join 之前必须使用 closeterminate

※ AsyncResult 对象

由方法 Pool.apply_async() 和 Pool.map_async() 方法返回,具有以下方法属性:

方法 说明
get([timeout]) 当 func 结果返回时可以通过这个方法获得返回值。如果 timeout 不为 None 且在 timeout 秒内得不到返回值,会报错 TimeoutError
wait([timeout]) 等待返回结果或者经过 timeout 秒
ready() 返回布尔值,指示 func 的调用是否已经结束
successful() 返回布尔值,指示 func 的调用是否没有异常地结束

官方文档的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from multiprocessing import Pool
import time

def func(x: int)->int:
return x*(x+1)

if __name__ == "__main__":
with Pool(processes=10) as pool:
result = pool.apply_async(func, (8349, ))
print(result.get(timeout=2))
# output 69714150

print(pool.map(func, range(10)))
# output [0, 2, 6, 12, 20, 30, 42, 56, 72, 90]

it = pool.imap(func, range(10))
print(next(it))
# output 0
print(it.next(timeout=10))
# output 2

多线程编程

对比于多进程,我们更常见的一种编程模式是多线程编程。Python 的线程是 Posix Thread,而不是所谓的 “轻量级进程”。它在标准库中提供了两个模块: threadthreadingthreading 是对底层 thread 模块的封装,我们一般只需要使用 threading

但对于 CPython 来说,多线程一般不能发挥它应有的效果(IO 密集型除外),主要原因还在于 GIL (全局解释锁)的存在。

GIL

接触过 Python 的开发者大概都听说过 GIL 的存在,也有不少的博客论述了为什么这么设计,这种设计导致了什么缺陷以及为什么不能解决这种缺陷。

该锁的存在保证在同一个时间只能有一个线程执行任务,也就是多线程并不是真正的并行,只是交替得执行。假如有10个线程运行在 10 核 CPU 上,当前工作的也只能是一个 CPU 上的线程。

关于 GIL 的设计具体可以查看 Python 源码中的 ceval.c 文件:

1
static PyThread_type_lock interpreter_lock = 0; /* 这是 GIL */

对于 Python 虚拟机的执行来说,GIL 一般有以下的介入流程:

  1. 设置 GIL
  2. 切换进一个线程去运行
  3. 执行下面操作之一:
    • 指定数量的字节码指令。
    • 线程主动让出控制权(可以调用 time.sleep(0) 来完成)
  4. 把线程设置回睡眠状态(切换出线程)
  5. 解锁 GIL
  6. 重复上述步骤

当调用外部代码(即任意 C/C++ 扩展的内置函数)时,GIL 会保持锁定,直到函数执行结束(因为在这期间没有 Python 字节码计数)。编写扩展函数的程序员有能力解锁 GIL。

threading 标准库简介

threading 模块在 _thread (Python2 为 thread 模块)之上提供了更高级别的操作线程的 API 接口,以提供对多线程编程的支持。同时这个模块也会提供了简单的锁(如 mutexes 或 binary semaphores)来实现同步原语,以控制多线程的资源共享。

实例

1. 创建新线程

创建线程有两种做法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 用 Thread 新建线程实例,传入可调用对象 callable object
import threading, time, random

def func(m):
print("worker %s" % m)
time.sleep(random.random() * 10)
return

if __name__ == "__main__":
threads = []
for i in range(10):
threads.append(threading.Thread(target=func, args=(str(i), )))
for t in threads:
# t.setDaemon(True)
t.start()
# t.join()

print("Finished")
# 输出类似于 “worker 3” 的文本,顺序随机

另一种思路是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 继承 Thread 子类作为工作线程类,覆写 __init__ 以及 run 方法,再实例化工作线程
import threading, time, random

class Worker(threading.Thread):
def __init__(self, name, Id):
super(Worker, self).__init__()
self.threadId = Id
self.name = name

def run(self):
print("Enter %d-%s" % (self.threadId, self.name))
time.sleep(random.randint(1, 20))
return


if __name__ == "__main__":
threads = []
for i in range(5):
threads.append(Worker("Thread %s" % chr(i+97), i + 1))

for t in threads:
t.setDaemon(False)
t.start()
# t.join()

print("Finish")
# 输出类似 “Enter 1-Thread a” 的文本,其中顺序不固定

其中:

  • start() —— 启动一个线程。
  • setDaemon(True) / t.daemon = True —— 将一个线程设置为守护线程在后台运行,当主线程结束后,守护进程也跟随结束。前者是旧的调用方式。这个设置必须在 start 方法前调用。Python 程序的运行会在没有任何非守护进程存活时结束。
  • join() —— 可以对任何一个线程调用 join 方法,调用方线程会阻塞 block,直到被调用线程终止才会继续执行。

2. 线程同步 —— Lock/RLock

在 python 中“锁”是最低级别的同步原语,直接由 _thread 的扩展模块提供。原语锁有两个状态: 锁定非锁定。同时 Lock 对象也有两个方法 acquire()release()。当锁处于“非锁定”状态时调用 acquire 会将其锁定并马上返回继续执行,而处于“锁定”状态调用时则会阻塞调用的线程直到其他线程调用 release 将其解锁,然后继续正常“非锁定”状态的 acquire 操作,方法的返回值为布尔类型,指示是否成功获取锁。release 只能用于处于“锁定”状态的 Lock 对象,如果不是会抛出 RuntimeError 异常;方法没有返回值。

Lock 同样支持上下文管理器协议(即具有 __enter____exit__ 方法)。

RLock 表示可重入锁原语对象,可重入锁必须由获取他的线程释放。一个线程可以多次获取同一个可重入锁(调用 acquire)而不进入阻塞态。这是为了避免 Lock 可能会导致的“死锁”问题:由于开发人员的疏忽,出现同一个子进程对同一把锁 Lock 连续 acquire 两次,因为第一次没有 release 导致挂起造成“死锁”。RLock 的实现思路是:维护一个计数器 counter 记录线程的 acquire 次数,每一次 acquire 时会有一个 release 操作与之对应执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# Lock 使用示例,以操作 apples 共享资源为基础
import threading
import time
import random

apples = 5
lock = threading.RLock()

def func():
global lock, apples
while apples > 0:
lock.acquire()
if random.randint(1, 3) % 2:
apples -= 1
print("Remove an apple")
else:
apples += 1
print("Add an apple")
lock.release()
# 等价于
# with lock:
# ...
time.sleep(random.random() * 3)


if __name__ == "__main__":
threads = []
for i in range(3):
threads.append(threading.Thread(target=func))
for t in threads:
t.start()

3. 线程同步 —— Event

这是线程间通信来实现同步的一种最简单的办法:一个线程发送一个事件,由另一个线程等待响应这个事件。

一个事件对象管理一个内部标志,这个标记可以通过 set 设置为 True,通过 clear 设置为 False,wait 方法会阻塞当前线程直到标志为 True。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 使用 Event 的示例
import threading, time, random

def sender(apples, event):
while apples > 0:
count = random.randint(1, 3)
if count % 2:
apples -= 1
print("Remove an apple")
else:
event.set()
time.sleep(random.random() * 3)


def receiver(e):
while e.wait(10):
print("Get a signal!")
e.clear()


if __name__ == "__main__":
apples = 5
e = threading.Event()
thread1 = threading.Thread(target=sender, args=(apples, e))
thread2 = threading.Thread(target=receiver, args=(e,))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print("Finish")

4. 线程同步 —— Barrier

Barrier 是 Python 3.2 新增的类型,用于实现指定数量线程的“共同进退” —— 指定的每一个线程都可以尝试通过调用 Barrire 对象的 wait 方法去通过“栅栏” barrier,然后进入阻塞状态,直到全部线程都调用了 wati 方法。此时这些线程同时释放并运行。

类似于赛马比赛,每一匹马(相当于线程)入闸准备(调用 wait),直到所有马都准备好,一起起跑(同时释放线程)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 官网提供的例子,用于同步一个客户端和一个服务端线程
b = Barrier(2, timeout=5) # 构造 Barrier 对象,控制两个线程,超时时间为 5s

def server():
start_server()
# 阻塞当前线程,直到另一个(客户端)线程也阻塞等待,继续执行,建立连接
b.wait()
while True:
connection = accept_connection()
process_server_connection(connection)


def client():
b.wait()
# 阻塞当前线程,直到另一个(服务端)线程也阻塞等待,继续执行,建立连接
while True:
connection = make_connection()
process_client_connection(connection)

Barrier 对象具有以下方法:

方法 说明
Barrier(parties, action=None, timeout=None) 构造函数,其中 parties 代表要控制的线程数, action 是一个可调用对象,用于在线程释放时优先被调用,timeout 指定超时时间
wait(timeout=None) 让线程尝试通过 barrier,方法会返回 0 到 parties-1 的值给到不同的线程,可以用它来做一些内部处理
reset() 重置 barrier,使其归零,任何因 barrier 而阻塞的线程都会得到 BrokernBarrierError 异常
abort() 将 barrier 置为 broken 状态,任何尝试调用 wait 通过 barrier 的线程不好意思都会吃一个 BrokenBarrierError 异常,这是“避免一个线程需要终止导致 Barrier 中其他线程死锁”的一个做法(也可以用超时来实现)

5. 线程同步 —— Condition

Condition 对象内部维护一个锁(默认是 RLock,也可以再构造函数中传入),可以把条件变量理解为更高级的锁机制。它具有如 acquirewaitnotify 等方法,其中 acquirerelease 是通过操作内部的锁实现的,其他方法 waitwait_fornotify 以及 notify_all 等方法都必须在线程获取到锁才能调用,否则抛出 RuntimeError 异常。

方法 说明
wait(timeout=None) 线程挂起,直到 Condition 对象收到一个 notify 通知或者超时才会被唤醒继续执行。wait 会释放 Lock
wait_for(predicate, timeout=None) 参数 predicate 是一个返回布尔值的可调用对象,这个方法实际上相当于一直调用 wait 方法,直到 predicate 返回 True 或超时。它具有返回值,返回值为 predicate 最后的返回值,如果是超时,返回 False
notify(n=1) 通知其他线程,那些被挂起的线程收到通知后会继续运行,默认是唤醒 1 个等待 Condition 变量的线程(n=1)
notify_all() 通知所有 wait 状态的线程

6. 线程同步 —— Semaphore 和 BoundedSemaphore

Semaphore (信号量)是由 Dijkstra 发明的(使用 PV 原语代替 acquire 以及 release)。一个信号量管理着一个内部计数器,对其执行 acquire 就会递增,执行 release 就会递减。计数器永远在 0 之上计数,当调用 acquire 方法发现计数器归零了,则线程阻塞,等待其他线程调用 release 方法。

Semaphore 对象也支持上下文管理协议(即 __enter__ 方法和 __exit__ 方法)。其中有以下方法:

  • Semaphore(value=1):返回一个 Semaphore 对象,value 值用于指定计数器的大小,默认为 1,则相当于一个 RLock。
  • acquire(blocking=True,timeout=None):获取 Semaphore 对象,方法对于计数器大于 0,将其减一并返回;等于 0 则阻塞;同时如果有多条线程等待解锁,release 调用只会随机唤醒其中一条线程。blocking 默认为 True,如果为 False,则获取失败不阻塞,返回 False;
  • release():释放 Semaphore 对象,使计数器加一,可以唤醒处于等待状态的线程

BoundedSemaphore (带边界信号量)对象会检查内部计数器的值,并保证它不会大于初始值,如果超了,就引发一个 ValueError。它的设计是基于这样一条原则:因为信号量大多数时候是用来保护有限资源的,因此多次“release”是一个不当的行为

7. 线程局部数据

Thread 中有一个 local 类,用于管理 thread-local (线程局部)的数据。对于同一个 local,线程无法访问其他线程设置的属性;自己设置的属性不会被其他线程设置的同名属性替换。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import threading

local = threading.local()

local.tname = 'main'


def func(name):
local.tname = name
print(local.tname)


t = threading.Thread(target=func, args=('thread1',))
t.start()
t.join()

print(local.tname)

# output:
# thread1
# main

8. 定时执行

Timer 对象是一个定时器,负责在一定时间后执行某些动作。因为 TimerThread 的子类,因此可以用来创建自定义线程。它同样通过 start 方法启动,通过 cancel 方法停止定时器(只能在动作发生前停止)。

注意,定时器执行动作前的时间间隔可能和用户指定的间隔不完全相同

1
2
3
4
5
6
def hello():
print("hello world")

t = Timer(30.0, hello)
t.start()
# 30s 后输出 hello world

协程

协程 Coroutine 又称微线程,纤程。

协程和我们的子程序(也叫做函数,在面向过程编程中)类似又有点不一样,函数是由明确的入口,一次返回,执行顺序也是明确固定的,而协程在执行过程中可以中断换到别的函数上执行,在适当的时候再返回。

对其较为简单的理解时:在一个线程中的某个函数,可以在任何地方保存当前函数的一些变量信息然后切换到另一个函数中执行。对比线程来说,协程上下文切换成本低廉(只是 CPU 上下文的切换),同时是由我们自己在程序中决定切换的次数以及切换的时机,跟线程切换主要交给操作系统处理的方式不一样。

Python 中的协程

协程的概念不是 Python 提出的,但由于 GIL 的存在,Python 的多线程不尽人意,因此 Python 借鉴了 Icon 语言,比较早的融入这个概念(从 Python 2.2 时期的生成器改造开始)。

由于协程是在单独的线程中运行的,因此在 Python 中要充分利用 CPU 多核,可以使用 多进程 multiprocessing + 协程 的方式

鉴于 Python 协程的发展历史比较长久,大概分为了三个发展阶段:

  1. 通过改造生成器的 yieldsend 实现协程
  2. yield from (Python 3.3) 到标准库 asyncio.coroutine (Python 3.4)提供
  3. Python 3.5 加入关键字 asyncawait

yield 与 send

yield 这个关键字在 Python 中很特别,因为它只能出现在函数 function 内(即在 def 定义的代码块内),当 yield 出现在函数内,这个函数就是一个 生成器

有人曾提议混淆使用 def 不利于区分正常函数和生成器,应该使用新的关键字如 gen,不过 Guido 并不同意。

生成器 generator 也是一个可迭代对象,但它和 Iterator 不一样的是,它提供了 send 方法让外界往生成器内部发送数据(后面在 Python 2.5 还提供了 throwclose 方法,前者用于在生成器内部 yield 处抛出异常,后者用于终止生成器)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 简单协程的例子
def simple_gen():
print("-> 启动生成器")
x = yield
while x != Ellipsis:
print("-> 生成器获得值 %d" % x)
x = yield
else:
print("-> 获得哨符,终止生成器")
return "Finish"
# return 的值作为 StopIteration 异常的说明,生成器没有办法用 return 返回值



my_gen = simple_gen()
# 协程激活
next(my_gen)
# output: -> 启动生成器
my_gen.send(14)
# output: -> 生成器获得值 14
my_gen.send(Ellipsis)
# output: -> 获得哨符,终止生成器
# 同时由于 yield 之后没有其他生成操作,即生成器没有要再返回的值,抛出 `StopIteration` 异常

※ 协程状态

协程状态可以用 inspect.getgeneratorstate() 函数获得,有以下的可能返回值(字符串):

返回值 说明
GEN_CREATED 等待执行
GEN_RUNNING 解释器正在执行。(只有在多线程应用中才会看到这个状态。此外,生成器对象在自己身上调用此函数也会出现)
GEN_SUSPENDED yield 表达式处暂停
GEN_CLOSED 执行结束

※ 预激活协程

注意到我们的例子有一步 预激活协程 的操作(即让协程执行到第一个 yield 表达式,准备好作为活跃协程使用)。简单来说,我们在 send 之前,必定有一个 next 操作。为了简化使用,我们通常会使用装饰器:

1
2
3
4
5
6
7
8
9
10
11
# 预激活协程装饰器
from functools import wraps
def coroutine(func):
'''装饰器:向前执行到第一个 yield 表达式,预激 func'''
@wraps(func)
# 把被装饰的生成器函数替换成这里的 primer 函数;
# 调用 primer 函数时,返回预激活后的生成器
def primer(*args, **kwargs):
gen = func(*args, **kwargs) # 调用被装饰的函数,获取生成器对象
next(gen) # 预激生成器
return gen # 返回生成器

yield from

yield from 是全新的语言结构,和 yield 不一样。在其他语言中,类似的结构会以 await 关键字定义,它主要表达这样一个意思:在生成器 gen 中使用 yield from subgen() 时,subgen 会获得控制权,把产出的值传给 gen 的调用方,即调用方可以直接控制 subgen。与此同时,gen 会阻塞,等待 subgen 终止。因此我们可以把 gen 理解为管道一样的东西.

yield from 主要功能是打开了双向通道,把调用方和内层的子生成器 subgen 连接起来。

在 PEP 380 中定义了专门的属于来描述 yield from:

  • 委托生成器 gen —— 包含 yield from <iterable> 表达式的生成器函数;
  • 子生成器 subgen —— 从 yield from 表达式中 <iterable> 部分获取的生成器,即 PEP 380 标题说的“子生成器”(subgenerator)。注意,iterable 可迭代对象可以是生成器,也可以是迭代器
  • 调用方 —— PEP 380 使用“调用方”这个术语指代调用委托生成器的客户端代码。

yield from iterable 本质上等于 for item in iterable: yield item 的缩写版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# 使用 yield from 示例:计算分组平均值
from collections import namedtuple

Result = namedtuple("Result", "Count Average")


def calc_anverage():
total = 0.0
count = 0
average = 0.0
while True:
item = yield
if item is None:
break
total += item
count += 1
average = total / count
return Result(count, average)


def gen(results, key):
while True:
results[key] = yield from calc_anverage()


def main():
data = {
"csv": [10.2, 11.0, 12.1, 3.4, 4.5, 12.4, 13.1, 10.0, 14.4],
"html": [11.3, 4.5, 9.9, 3.6, 22.1, 14.5, 16.1]
}
results = {}
for key in data:
generator = gen(results, key)
next(generator)
for value in data[key]:
generator.send(value)
# 显式指定生成器终止:输入哨符
generator.send(None)

print(results)


if __name__ == "__main__":
main()
# output: {'csv': Result(Count=9, Average=10.122222222222222), 'html': Result(Count=7, Average=11.714285714285714)}

asyncio 模块

上面的关于 yield from 的例子都是我们自己手工去切换协程的,利用 asyncio 模块我们可以利用外部的事件循环。asyncio 是一个基于事件循环 eventloop 实现的异步 I/O 的模块。和 C# 当中的 TPL 库的实现非常类似。

asyncio 建构在协程之上,不过采用的协程定义更为严格:在 asyncio 库中,协程(通常) 使用 @asyncio.coroutine 装饰器装饰,而且始终使用 yield from 结构驱动,而不通过直接在协程上调用 .send(...) 方法驱动。当然,在 asyncio 库的底层,协程使用 next(...) 函数和 .send(...) 方法驱动,不过在用户代码中只使用 yield from 结构驱动协程运行。

先从一个简单的倒计时示例入手 asyncio 库:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncio


@asyncio.coroutine
def countdown(n):
# 将普通函数包装为协程
i = 0
while i < n:
print("-> 还剩 %d 秒" % (n - i))
# asyncio.sleep()也是一个 coroutine
yield from asyncio.sleep(1)
i += 1


if __name__ == "__main__":
# 创建这个线程的事件循环
loop = asyncio.get_event_loop()
# 将协程包装为 task
task = asyncio.Task(countdown(10), loop=loop)
# 也可以不封装直接运行协程对象
# loop.run_until_complete(countdown(10))
# 把协程或 task 作为“事件”注册到事件循环上
loop.run_until_complete(task)
# 指定了 loop 也可以使用无限循环
# loop.run_forever()

在这里涉及到 asyncio 的一些基本概念:

  • eventloop 事件循环 —— 一个无限循环,可以注册函数到循环上,当满足条件发生时,调用相应的函数(这些可以注册的函数,概念上我们称为事件,在 Python 的实现上就是协程对象)
  • coroutine 协程 —— 通过 @asyncio.coroutine 装饰的函数,返回的是一个协程对象
  • future 期物 —— 表示终将发生的事情,这个东西和 Twisted 的 Deferred、Tornado 的 Funture 以及 JS 中的 Promise 对象非常相似,可以获取到状态、指定回调函数、获取结果等的操作,比协程对象实用。asyncio 提供方法可以将协程对象封装为期物
  • task 任务 —— 是 future 的子类,在注册事件循环的时候,其实是 run_until_complete 方法将协程包装成为了一个任务(task)对象。

async 和 await

Python 3.5 里面引入的 async 和 await 就不难理解了,他们是 asyncio.coroutine 以及 yield from 的替代,表面上独立于生成器存在,实际将细节都隐藏在 asyncio 模块下。

这里有一段自 《流畅的 Python》的总结:
事件驱动型框架(如 Tornado 和 asyncio)的运作方式,都是在单个线程中使用一个主循环驱动协程执行并发活动。使用协程做面向事件编程时,协程会不断把控制权让步给主循环,激活并向前运行其他协程,从而执行各个并发活动。这是一种协作式多任务:协程 显式自主地把控制权让步给中央调度程序。而多线程实现的是 抢占式多任务。调度程序可以在任何时刻暂停线程(即使在执行一个语句的过程中) ,把控制权让给其他线程。

从这里开始要进入到 Python 并发编程中的 异步编程 的概念了,这一部分将会在下一节讲述,同时会简述诸如 TwistedTornadogevent 等的第三方库。