Python多进程
multiprocessing
是一个支持使用与 threading
模块类似的 API
来产生进程的包。
multiprocessing
包同时提供了本地和远程并发操作,通过使用子进程而非线程有效地绕过了
全局解释器锁。
因此,multiprocessing
模块允许程序员充分利用给定机器上的多个处理器。 它在 Unix
和
Windows
上均可运行。 multiprocessing
模块还引入了 Pool
对象,它提供了一种快捷的方法,赋予函数并行化处理一系列输入值的能力,可以将输入数据分配给不同进程处理(数据并行)。
Process
通过创建一个 Process
对象然后调用它的
start()
方法来生成进程。
from multiprocessing import Process def f(name): print('hello', name) if __name__ == '__main__': p = Process(target=f, args=('bob',)) p.start() p.join()
Queue 队列
进程之间的通信通道之一
from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()
Pipe 管道
进程之间的通信通道之一,返回一个由管道连接的连接对象,默认情况下是双工(双向)。
返回的两个连接对象表示管道的两端。每个连接对象都有 send()
和 recv()
方法(相互之间的)。 { note waring no-icon}
请注意,如果两个进程(或线程)同时尝试读取或写入管道的同一端,则管道中的数据可能会损坏。当然,在不同进程中同时使用管道的不同端的情况下不存在损坏的风险。
{ endnote}
from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()
进程间同步
使用锁来确保一次只有一个进程打印到标准输出
from multiprocessing import Process, Lock def f(l, i): l.acquire() try: print('hello world', i) finally: l.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()
进程间共享状态
共享内存
可以使用 Value
或 Array
将数据存储在共享内存映射中。
from multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == '__main__': num = Value('d', 0.0) arr = Array('i', range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print(num.value) # 3.1415927 print(arr[:]) # [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
创建 num
和 arr
时使用的 'd'
和 'i'
参数是 array
模块使用的类型的
typecode
: 'd'
表示双精度浮点数,
'i'
表示有符号整数。这些共享对象将是进程和线程安全的。为了更灵活地使用共享内存,可以使用
multiprocessing.sharedctypes
模块,该模块支持创建从共享内存分配的任意ctypes
对象。
服务进程
由 Manager()
返回的管理器对象控制一个服务进程,该进程保存Python对象并允许其他进程使用代理操作它们。
Manager()
返回的管理器支持类型: list
、
dict
、 Namespace
、 Lock
、
RLock
、 Semaphore
、
BoundedSemaphore
、 Condition
、
Event
、 Barrier
、 Queue
、
Value
和 Array
。例如
使用服务进程的管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以通过网络由不同计算机上的进程共享。但是,它们比使用共享内存慢。from multiprocessing import Process, Manager def f(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.reverse() if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(10)) p = Process(target=f, args=(d, l)) p.start() p.join() print(d) # {0.25: None, 1: '1', '2': 2} print(l) # [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
使用工作进程 Pool()
Pool
类表示一个工作进程池。它具有允许以几种不同方式将任务分配到工作进程的方法。
import time import os def f(x): return x*x if __name__ == '__main__': # 开 4 个工作进程 with Pool(processes=4) as pool: # print "[0, 1, 4,..., 81]" print(pool.map(f, range(10))) # 以任意顺序打印相同的数字 for i in pool.imap_unordered(f, range(10)): print(i) # evaluate "f(20)" asynchronously res = pool.apply_async(f, (20,)) # runs in *only* one process print(res.get(timeout=1)) # prints "400" # evaluate "os.getpid()" asynchronously res = pool.apply_async(os.getpid, ()) # runs in *only* one process print(res.get(timeout=1)) # prints the PID of that process # 异步启动多个 evaluations *可能*使用更多进程 multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)] print([res.get(timeout=1) for res in multiple_results]) # 使一个进程 sleep 10 秒 res = pool.apply_async(time.sleep, (10,)) try: print(res.get(timeout=1)) except TimeoutError: print("我们得到一个 multiprocessing.TimeoutError") print("目前,pool 仍然可供更多工作") # exiting the 'with'-block has stopped the pool print("现在 pool 已关闭,不再可用")
{ note info } 参数 processes
默认大小是CPU的核数,
map(func, iterator) 获取结果,参数 func 是进程执行的函数,iterator
是可迭代对象
特点:保持阻塞直到获得结果,对于很长的迭代对象,可能消耗很多内存,可以考虑使用
imap() 或 imap_unordered() 并且显示指定 chunksize 以提升效率。
apply_async(func[, args]) 获取结果,参数 func 是进程执行的函数,args
是可迭代对象 { endnote }