Python multiprocessing多进程模块的用法(附带实例)
Python 提供了 multiprocessing 模块管理多进程,可以轻松实现多进程的程序设计。multiprocessing 模块支持子进程、通信和数据共享,提供了多种形式的同步机制及 Process、Queue、Pipe、Lock 等组件。
multiprocessing 模块常用的组件及功能如下:
1) 进程的创建与管理组件:
2) 子进程同步组件:
Process 组件还提供了多个实例方法和属性:
1) 实例方法:
2) 属性:
使用 Process 组件创建多个子进程,代码为:
还可以将进程定义为类,代码为:
例如,有两个进程同时写一个文件 share.txt,在代码中就可以通过 Lock 的使用避免写入文件时发生混乱,即:
使用 RLock 则不会出现这种情况,RLock 支持给同一资源上多把锁,上多少把锁,就释放多少次。
通俗来讲就是,该资源有多个门,每个门对应一把锁。一个进程访问了该资源,锁了门,还有其他门可以使用。如果所有的门都被锁了,那么新的进程就必须等待现有进程退出并释放锁后才可以访问。测试程序为:
Pipe 顾名思义,就是管道。Queue 是队列。
Queue 是多进程安全的队列,可以实现多进程之间的数据传递。put 用于插入数据到队列,还有两个可选参数:blocked 和 timeout。如果 blocked 为 True(默认值),并且 timeout 为正值,则 put 会阻塞 timeout 指定的时间,直到队列有剩余的空间。如果超时,则会抛出 Queue.Full 异常。如果 blocked 虽为 False,但 Queue 已满,则会立即抛出 Queue.Full 异常。
get 可以从队列中读取并删除一个元素,有两个可选参数:blocked 和 timeout。如果 blocked 为 True(默认值),并且 timeout 为正值,在等待时间内没有读取到任何元素,则会抛出 Queue.Empty 异常。如果 blocked 为 False,则有两种情况存在:如果 Queue 有一个值可用,则立即返回该值;否则,如果队列为空,则立即抛出 Queue.Empty 异常
以 Queue 为例创建两个子进程:一个子进程往 Queue 中写数据;另外一个子进程从 Queue 中读数据,代码为:
multiprocessing 模块常用的组件及功能如下:
1) 进程的创建与管理组件:
- Process:用于创建子进程,可以实现多进程的创建、启动、关闭等操作。
- Pool:用于创建管理进程池,当子进程非常多且需要控制子进程数量时使用。
- Manager:通常与Pool一起使用,用于资源共享。
- P ipe:用于进程间的管道通信。
- Queue:用于进程通信。
- Value,Array:用于进程通信,资源共享。
2) 子进程同步组件:
- Condition:条件变量。
- Event:用来实现进程间的同步通信。
- Lock:锁。
- RLock:多重锁。
- Semaphore:用来控制对共享资源的访问数量。
Python multiprocessing模块
multiprocessing 模块提供了 Process 组件用于创建子进程,方法为:Process([group [,target [,name [,args [,kwargs]]]]])其中,group 表示线程组,目前还没有实现;target 代表要执行的方法;name 为进程名;args/kwargs 为要传入的参数。
Process 组件还提供了多个实例方法和属性:
1) 实例方法:
- is_alive():返回进程是否在运行。
- join([timeout]):阻塞当前上下文环境的进程,直到调用此方法的进程终止或到达指定的 timeout(可选参数)。
- start():进程准备就绪,等待 CPU 调度。
- run():start() 调用 run方 法,如果实例进程时未指定传入 target,则 star 默认执行 run()。
- terminate():不管任务是否完成,立即停止进程。
2) 属性:
- daemon:与线程 setDeamon 的功能一样(将父进程设置为守护进程,当父进程结束时,子进程也结束);
- exitcode:进程在运行时为 None,如果为 -N,则表示被信号 N 结束;
- name:进程名字;
- pid:进程号。
使用 Process 组件创建多个子进程,代码为:
import multiprocessing import time def process1(interval): while True: print('process1 is running') time.sleep(interval) def process2(interval): while True: print('process2 is running') time.sleep(interval) if __name__ == '__main__': p1 = multiprocessing.Process(target = process1, args = (2,)) p2 = multiprocessing.Process(target = process2, args = (2,)) p1.start() p2.start() while True: for p in multiprocessing.active_children(): print('child Process:' + p.name + '\t,id:' + str(p.pid) + ' is alive') print('main process is running') time.sleep(2)在程序代码中,首先导入 multiprocessing 模块,然后定义两个函数 process1() 和 process2(),调用 Process 组件将函数名赋予 target,通过 start() 启动两个子进程后,周期性地检测进程状态。程序的运行结果为:
child Process:Process-1,id:4525 is alive child Process:Process-2,id:4526 is alive main process is running process1 is running process2 is running child Process:Process-1,id:4525 is alive process1 is running process2 is running child Process:Process-2,id:4526 is alive main process is running
还可以将进程定义为类,代码为:
import multiprocessing import time class ChildProcess(multiprocessing.Process): def __init__(self, interval): multiprocessing.Process.__init__(self) self.interval = interval def run(self): while True: print('ChildProcess is running') time.sleep(self.interval) if __name__ == '__main__': p = ChildProcess(2) p.start() while True: print('MainProcess is running') time.sleep(2)执行程序代码,当 p 调用 start() 方法时,会自动调用 run() 方法启动子进程,运行结果为:
MainProcess is running
ChildProcess is running
MainProcess is running
ChildProcess is running
进程同步
当多个进程需要访问共享资源时,为了避免冲突,multiprocessing 模块提供了多种机制实现进程间的同步。1) Lock锁机制
multiprocessing 提供了Lock(锁)机制,通过对共享资源上锁的方式可避免多个进程的访问冲突。例如,有两个进程同时写一个文件 share.txt,在代码中就可以通过 Lock 的使用避免写入文件时发生混乱,即:
import multiprocessing import sys def process1(lock, f): with lock: fs = open(f, 'a+') times = 10 while times > 0: fs.write('process1 write\n') times -= 1 fs.close() def process2(lock, f): lock.acquire() try: fs = open(f, 'a+') times = 10 while times > 0: fs.write('process2 write\n') times -= 1 fs.close() finally: lock.release() if __name__ == '__main__': lock = multiprocessing.Lock() f = 'share.txt' p1 = multiprocessing.Process(target = process1, args=(lock, f)) p2 = multiprocessing.Process(target = process2, args=(lock, f)) p1.start() p2.start() p1.join() p2.join()在程序代码中,进程 process2 通过 lock.acquire() 上锁,当完成对文件 share.txt 的写操作时,通过 lock.release() 解锁,只有在解锁的情况下,进程 process1 才有权利对 share.txt 执行写操作。
2) RLock
RLock 是 Lock 的递归版。lock.acquire() 是请求锁。当前的锁为锁定状态时,lock.acquire() 会阻塞等待锁释放。如果写了两个 lock.acquire() 会产生死锁,则第二个 lock.acquire() 会永远等待。使用 RLock 则不会出现这种情况,RLock 支持给同一资源上多把锁,上多少把锁,就释放多少次。
3) Semaphore
Semaphore 有信号量的意思,与 Lock 有些类似,可以指定允许访问资源的进程数量。通俗来讲就是,该资源有多个门,每个门对应一把锁。一个进程访问了该资源,锁了门,还有其他门可以使用。如果所有的门都被锁了,那么新的进程就必须等待现有进程退出并释放锁后才可以访问。测试程序为:
import multiprocessing import time def process1(): s.acquire() print('process1 acquire and it will sleep 5 s') time.sleep(5) print('process1 release') s.release() def process2(): s.acquire() print('process2 acquire and it will sleep 5 s') time.sleep(5) print('process2 release') s.release() def process3(): print('process3 try to start') s.acquire() print('process3 acquire and it will sleep 5 s') time.sleep(5) print('process3 release') s.release() if __name__ == '__main__': s = multiprocessing.Semaphore(2) p1 = multiprocessing.Process(target = process1) p2 = multiprocessing.Process(target = process2) p3 = multiprocessing.Process(target = process3) p1.start() time.sleep(1) p2.start() time.sleep(1) p3.start() time.sleep(1)运行结果为:
process1 acquire and it will sleep 5 s
process2 acquire and it will sleep 5 s
process3 try to start
process1 release
process3 acquire and it will sleep 5 s
process2 release
process3 release
进程间通信
进程之间有时是需要互相通信的。Python 的 multiprocessing 模块提供了 Pipe、Queue 等多种方式实现进程间通信。Pipe 顾名思义,就是管道。Queue 是队列。
Queue 是多进程安全的队列,可以实现多进程之间的数据传递。put 用于插入数据到队列,还有两个可选参数:blocked 和 timeout。如果 blocked 为 True(默认值),并且 timeout 为正值,则 put 会阻塞 timeout 指定的时间,直到队列有剩余的空间。如果超时,则会抛出 Queue.Full 异常。如果 blocked 虽为 False,但 Queue 已满,则会立即抛出 Queue.Full 异常。
get 可以从队列中读取并删除一个元素,有两个可选参数:blocked 和 timeout。如果 blocked 为 True(默认值),并且 timeout 为正值,在等待时间内没有读取到任何元素,则会抛出 Queue.Empty 异常。如果 blocked 为 False,则有两种情况存在:如果 Queue 有一个值可用,则立即返回该值;否则,如果队列为空,则立即抛出 Queue.Empty 异常
以 Queue 为例创建两个子进程:一个子进程往 Queue 中写数据;另外一个子进程从 Queue 中读数据,代码为:
import multiprocessing import time def writer(q): for value in ['1', '2', '3']: print('Process-writer put %s in queue' % value) q.put(value) time.sleep(1) def reader(q): while True: value = q.get(True) print('Process-reader get %s from queue' % value) if __name__ == '__main__': q = multiprocessing.Queue() pw = multiprocessing.Process(target=writer, args=(q,)) pr = multiprocessing.Process(target=reader, args=(q,)) pw.start() pr.start() pw.join() pr.terminate()运行结果为:
Process-writer put 1 in queue
Process-reader get 1 from queue
Process-writer put 2 in queue
Process-reader get 2 from queue
Process-writer put 3 in queue
Process-reader get 3 from queue