Python multiprocessing多进程模块的用法(附带实例)
Python 提供了 multiprocessing 模块管理多进程,可以轻松实现多进程的程序设计。multiprocessing 模块支持子进程、通信和数据共享,提供了多种形式的同步机制及 Process、Queue、Pipe、Lock 等组件。
multiprocessing 模块常用的组件及功能如下:
1) 进程的创建与管理组件:
2) 子进程同步组件:
Process 组件还提供了多个实例方法和属性:
1) 实例方法:
2) 属性:
使用 Process 组件创建多个子进程,代码为:
还可以将进程定义为类,代码为:
例如,有两个进程同时写一个文件 share.txt,在代码中就可以通过 Lock 的使用避免写入文件时发生混乱,即:
使用 RLock 则不会出现这种情况,RLock 支持给同一资源上多把锁,上多少把锁,就释放多少次。
通俗来讲就是,该资源有多个门,每个门对应一把锁。一个进程访问了该资源,锁了门,还有其他门可以使用。如果所有的门都被锁了,那么新的进程就必须等待现有进程退出并释放锁后才可以访问。测试程序为:
Pipe 顾名思义,就是管道。Queue 是队列。
Queue 是多进程安全的队列,可以实现多进程之间的数据传递。put 用于插入数据到队列,还有两个可选参数:blocked 和 timeout。如果 blocked 为 True(默认值),并且 timeout 为正值,则 put 会阻塞 timeout 指定的时间,直到队列有剩余的空间。如果超时,则会抛出 Queue.Full 异常。如果 blocked 虽为 False,但 Queue 已满,则会立即抛出 Queue.Full 异常。
get 可以从队列中读取并删除一个元素,有两个可选参数:blocked 和 timeout。如果 blocked 为 True(默认值),并且 timeout 为正值,在等待时间内没有读取到任何元素,则会抛出 Queue.Empty 异常。如果 blocked 为 False,则有两种情况存在:如果 Queue 有一个值可用,则立即返回该值;否则,如果队列为空,则立即抛出 Queue.Empty 异常
以 Queue 为例创建两个子进程:一个子进程往 Queue 中写数据;另外一个子进程从 Queue 中读数据,代码为:
	
	
multiprocessing 模块常用的组件及功能如下:
1) 进程的创建与管理组件:
- Process:用于创建子进程,可以实现多进程的创建、启动、关闭等操作。
- Pool:用于创建管理进程池,当子进程非常多且需要控制子进程数量时使用。
- Manager:通常与Pool一起使用,用于资源共享。
- P ipe:用于进程间的管道通信。
- Queue:用于进程通信。
- Value,Array:用于进程通信,资源共享。
2) 子进程同步组件:
- Condition:条件变量。
- Event:用来实现进程间的同步通信。
- Lock:锁。
- RLock:多重锁。
- Semaphore:用来控制对共享资源的访问数量。
Python multiprocessing模块
multiprocessing 模块提供了 Process 组件用于创建子进程,方法为:Process([group [,target [,name [,args [,kwargs]]]]])其中,group 表示线程组,目前还没有实现;target 代表要执行的方法;name 为进程名;args/kwargs 为要传入的参数。
Process 组件还提供了多个实例方法和属性:
1) 实例方法:
- is_alive():返回进程是否在运行。
- join([timeout]):阻塞当前上下文环境的进程,直到调用此方法的进程终止或到达指定的 timeout(可选参数)。
- start():进程准备就绪,等待 CPU 调度。
- run():start() 调用 run方 法,如果实例进程时未指定传入 target,则 star 默认执行 run()。
- terminate():不管任务是否完成,立即停止进程。
2) 属性:
- daemon:与线程 setDeamon 的功能一样(将父进程设置为守护进程,当父进程结束时,子进程也结束);
- exitcode:进程在运行时为 None,如果为 -N,则表示被信号 N 结束;
- name:进程名字;
- pid:进程号。
使用 Process 组件创建多个子进程,代码为:
import multiprocessing
import time
def process1(interval):
    while True:
        print('process1 is running')
        time.sleep(interval)
def process2(interval):
    while True:
        print('process2 is running')
        time.sleep(interval)
if __name__ == '__main__':
    p1 = multiprocessing.Process(target = process1, args = (2,))
    p2 = multiprocessing.Process(target = process2, args = (2,))
    p1.start()
    p2.start()
    while True:
        for p in multiprocessing.active_children():
            print('child Process:' + p.name + '\t,id:' + str(p.pid) + ' is alive')
        print('main process is running')
        time.sleep(2)
在程序代码中,首先导入 multiprocessing 模块,然后定义两个函数 process1() 和 process2(),调用 Process 组件将函数名赋予 target,通过 start() 启动两个子进程后,周期性地检测进程状态。程序的运行结果为:
child Process:Process-1,id:4525 is alive child Process:Process-2,id:4526 is alive main process is running process1 is running process2 is running child Process:Process-1,id:4525 is alive process1 is running process2 is running child Process:Process-2,id:4526 is alive main process is running
还可以将进程定义为类,代码为:
import multiprocessing
import time
class ChildProcess(multiprocessing.Process):
    def __init__(self, interval):
        multiprocessing.Process.__init__(self)
        self.interval = interval
    def run(self):
        while True:
            print('ChildProcess is running')
            time.sleep(self.interval)
if __name__ == '__main__':
    p = ChildProcess(2)
    p.start()
    while True:
        print('MainProcess is running')
        time.sleep(2)
执行程序代码,当 p 调用 start() 方法时,会自动调用 run() 方法启动子进程,运行结果为:
	MainProcess is running
	ChildProcess is running
	MainProcess is running
	ChildProcess is running
进程同步
当多个进程需要访问共享资源时,为了避免冲突,multiprocessing 模块提供了多种机制实现进程间的同步。1) Lock锁机制
multiprocessing 提供了Lock(锁)机制,通过对共享资源上锁的方式可避免多个进程的访问冲突。例如,有两个进程同时写一个文件 share.txt,在代码中就可以通过 Lock 的使用避免写入文件时发生混乱,即:
import multiprocessing
import sys
def process1(lock, f):
    with lock:
        fs = open(f, 'a+')
        times = 10
        while times > 0:
            fs.write('process1 write\n')
            times -= 1
        fs.close()
def process2(lock, f):
    lock.acquire()
    try:
        fs = open(f, 'a+')
        times = 10
        while times > 0:
            fs.write('process2 write\n')
            times -= 1
        fs.close()
    finally:
        lock.release()
if __name__ == '__main__':
    lock = multiprocessing.Lock()
    f = 'share.txt'
    p1 = multiprocessing.Process(target = process1, args=(lock, f))
    p2 = multiprocessing.Process(target = process2, args=(lock, f))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
在程序代码中,进程 process2 通过 lock.acquire() 上锁,当完成对文件 share.txt 的写操作时,通过 lock.release() 解锁,只有在解锁的情况下,进程 process1 才有权利对 share.txt 执行写操作。2) RLock
RLock 是 Lock 的递归版。lock.acquire() 是请求锁。当前的锁为锁定状态时,lock.acquire() 会阻塞等待锁释放。如果写了两个 lock.acquire() 会产生死锁,则第二个 lock.acquire() 会永远等待。使用 RLock 则不会出现这种情况,RLock 支持给同一资源上多把锁,上多少把锁,就释放多少次。
3) Semaphore
Semaphore 有信号量的意思,与 Lock 有些类似,可以指定允许访问资源的进程数量。通俗来讲就是,该资源有多个门,每个门对应一把锁。一个进程访问了该资源,锁了门,还有其他门可以使用。如果所有的门都被锁了,那么新的进程就必须等待现有进程退出并释放锁后才可以访问。测试程序为:
import multiprocessing
import time
def process1():
    s.acquire()
    print('process1 acquire and it will sleep 5 s')
    time.sleep(5)
    print('process1 release')
    s.release()
def process2():
    s.acquire()
    print('process2 acquire and it will sleep 5 s')
    time.sleep(5)
    print('process2 release')
    s.release()
def process3():
    print('process3 try to start')
    s.acquire()
    print('process3 acquire and it will sleep 5 s')
    time.sleep(5)
    print('process3 release')
    s.release()
if __name__ == '__main__':
    s = multiprocessing.Semaphore(2)
    p1 = multiprocessing.Process(target = process1)
    p2 = multiprocessing.Process(target = process2)
    p3 = multiprocessing.Process(target = process3)
    p1.start()
    time.sleep(1)
    p2.start()
    time.sleep(1)
    p3.start()
    time.sleep(1)
运行结果为:
	process1 acquire and it will sleep 5 s
	process2 acquire and it will sleep 5 s
	process3 try to start
	process1 release
	process3 acquire and it will sleep 5 s
	process2 release
	process3 release
进程间通信
进程之间有时是需要互相通信的。Python 的 multiprocessing 模块提供了 Pipe、Queue 等多种方式实现进程间通信。Pipe 顾名思义,就是管道。Queue 是队列。
Queue 是多进程安全的队列,可以实现多进程之间的数据传递。put 用于插入数据到队列,还有两个可选参数:blocked 和 timeout。如果 blocked 为 True(默认值),并且 timeout 为正值,则 put 会阻塞 timeout 指定的时间,直到队列有剩余的空间。如果超时,则会抛出 Queue.Full 异常。如果 blocked 虽为 False,但 Queue 已满,则会立即抛出 Queue.Full 异常。
get 可以从队列中读取并删除一个元素,有两个可选参数:blocked 和 timeout。如果 blocked 为 True(默认值),并且 timeout 为正值,在等待时间内没有读取到任何元素,则会抛出 Queue.Empty 异常。如果 blocked 为 False,则有两种情况存在:如果 Queue 有一个值可用,则立即返回该值;否则,如果队列为空,则立即抛出 Queue.Empty 异常
以 Queue 为例创建两个子进程:一个子进程往 Queue 中写数据;另外一个子进程从 Queue 中读数据,代码为:
import multiprocessing
import time
def writer(q):
    for value in ['1', '2', '3']:
        print('Process-writer put %s in queue' % value)
        q.put(value)
        time.sleep(1)
def reader(q):
    while True:
        value = q.get(True)
        print('Process-reader get %s from queue' % value)
if __name__ == '__main__':
    q = multiprocessing.Queue()
    pw = multiprocessing.Process(target=writer, args=(q,))
    pr = multiprocessing.Process(target=reader, args=(q,))
    pw.start()
    pr.start()
    pw.join()
    pr.terminate()
运行结果为:
	Process-writer put 1 in queue
	Process-reader get 1 from queue
	Process-writer put 2 in queue
	Process-reader get 2 from queue
	Process-writer put 3 in queue
	Process-reader get 3 from queue
 
	 ICP备案:
 ICP备案: 公安联网备案:
 公安联网备案: