Python多进程
multiprocessing
是一个支持使用与 threading
模块类似的 API
来产生进程的包。
multiprocessing
包同时提供了本地和远程并发操作,通过使用子进程而非线程有效地绕过了
全局解释器锁。
因此,multiprocessing
模块允许程序员充分利用给定机器上的多个处理器。 它在 Unix
和
Windows
上均可运行。 multiprocessing
模块还引入了 Pool
对象,它提供了一种快捷的方法,赋予函数并行化处理一系列输入值的能力,可以将输入数据分配给不同进程处理(数据并行)。
Process
通过创建一个 Process
对象然后调用它的
start()
方法来生成进程。
from multiprocessing import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
Queue 队列
进程之间的通信通道之一
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get()) # prints "[42, None, 'hello']"
p.join()
Pipe 管道
进程之间的通信通道之一,返回一个由管道连接的连接对象,默认情况下是双工(双向)。
返回的两个连接对象表示管道的两端。每个连接对象都有 send()
和 recv()
方法(相互之间的)。 { note waring no-icon}
请注意,如果两个进程(或线程)同时尝试读取或写入管道的同一端,则管道中的数据可能会损坏。当然,在不同进程中同时使用管道的不同端的情况下不存在损坏的风险。
{ endnote}
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv()) # prints "[42, None, 'hello']"
p.join()
进程间同步
使用锁来确保一次只有一个进程打印到标准输出
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
进程间共享状态
共享内存
可以使用 Value
或 Array
将数据存储在共享内存映射中。
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value)
# 3.1415927
print(arr[:])
# [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
创建 num
和 arr
时使用的 'd'
和 'i'
参数是 array
模块使用的类型的
typecode
: 'd'
表示双精度浮点数,
'i'
表示有符号整数。这些共享对象将是进程和线程安全的。为了更灵活地使用共享内存,可以使用
multiprocessing.sharedctypes
模块,该模块支持创建从共享内存分配的任意ctypes
对象。
服务进程
由 Manager()
返回的管理器对象控制一个服务进程,该进程保存Python对象并允许其他进程使用代理操作它们。
Manager()
返回的管理器支持类型: list
、
dict
、 Namespace
、 Lock
、
RLock
、 Semaphore
、
BoundedSemaphore
、 Condition
、
Event
、 Barrier
、 Queue
、
Value
和 Array
。例如
from multiprocessing import Process, Manager
def f(d, l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.reverse()
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
l = manager.list(range(10))
p = Process(target=f, args=(d, l))
p.start()
p.join()
print(d)
# {0.25: None, 1: '1', '2': 2}
print(l)
# [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
使用服务进程的管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以通过网络由不同计算机上的进程共享。但是,它们比使用共享内存慢。
使用工作进程 Pool()
Pool
类表示一个工作进程池。它具有允许以几种不同方式将任务分配到工作进程的方法。
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
# 开 4 个工作进程
with Pool(processes=4) as pool:
# print "[0, 1, 4,..., 81]"
print(pool.map(f, range(10)))
# 以任意顺序打印相同的数字
for i in pool.imap_unordered(f, range(10)):
print(i)
# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print(res.get(timeout=1)) # prints "400"
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print(res.get(timeout=1)) # prints the PID of that process
# 异步启动多个 evaluations *可能*使用更多进程
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])
# 使一个进程 sleep 10 秒
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("我们得到一个 multiprocessing.TimeoutError")
print("目前,pool 仍然可供更多工作")
# exiting the 'with'-block has stopped the pool
print("现在 pool 已关闭,不再可用")
{ note info } 参数 processes
默认大小是CPU的核数,
map(func, iterator) 获取结果,参数 func 是进程执行的函数,iterator
是可迭代对象
特点:保持阻塞直到获得结果,对于很长的迭代对象,可能消耗很多内存,可以考虑使用
imap() 或 imap_unordered() 并且显示指定 chunksize 以提升效率。
apply_async(func[, args]) 获取结果,参数 func 是进程执行的函数,args
是可迭代对象 { endnote }