首页 > 编程笔记 > Python笔记 阅读:18

Python multiprocessing多进程模块的用法(附带实例)

Python 提供了 multiprocessing 模块管理多进程,可以轻松实现多进程的程序设计。multiprocessing 模块支持子进程、通信和数据共享,提供了多种形式的同步机制及 Process、Queue、Pipe、Lock 等组件。

multiprocessing 模块常用的组件及功能如下:
1) 进程的创建与管理组件:
2) 子进程同步组件:

Python multiprocessing模块

multiprocessing 模块提供了 Process 组件用于创建子进程,方法为:
Process([group [,target [,name [,args [,kwargs]]]]])
其中,group 表示线程组,目前还没有实现;target 代表要执行的方法;name 为进程名;args/kwargs 为要传入的参数。

Process 组件还提供了多个实例方法和属性:
1) 实例方法:
2) 属性:
使用 Process 组件创建多个子进程,代码为:
import multiprocessing
import time

def process1(interval):
    while True:
        print('process1 is running')
        time.sleep(interval)

def process2(interval):
    while True:
        print('process2 is running')
        time.sleep(interval)

if __name__ == '__main__':
    p1 = multiprocessing.Process(target = process1, args = (2,))
    p2 = multiprocessing.Process(target = process2, args = (2,))

    p1.start()
    p2.start()

    while True:
        for p in multiprocessing.active_children():
            print('child Process:' + p.name + '\t,id:' + str(p.pid) + ' is alive')
        print('main process is running')
        time.sleep(2)
在程序代码中,首先导入 multiprocessing 模块,然后定义两个函数 process1() 和 process2(),调用 Process 组件将函数名赋予 target,通过 start() 启动两个子进程后,周期性地检测进程状态。程序的运行结果为:
child Process:Process-1,id:4525 is alive
child Process:Process-2,id:4526 is alive
main process is running
process1 is running
process2 is running
child Process:Process-1,id:4525 is alive
process1 is running
process2 is running
child Process:Process-2,id:4526 is alive
main process is running

还可以将进程定义为类,代码为:
import multiprocessing
import time

class ChildProcess(multiprocessing.Process):
    def __init__(self, interval):
        multiprocessing.Process.__init__(self)
        self.interval = interval

    def run(self):
        while True:
            print('ChildProcess is running')
            time.sleep(self.interval)

if __name__ == '__main__':
    p = ChildProcess(2)
    p.start()
    while True:
        print('MainProcess is running')
        time.sleep(2)
执行程序代码,当 p 调用 start() 方法时,会自动调用 run() 方法启动子进程,运行结果为:

MainProcess is running
ChildProcess is running
MainProcess is running
ChildProcess is running

进程同步

当多个进程需要访问共享资源时,为了避免冲突,multiprocessing 模块提供了多种机制实现进程间的同步。

1) Lock锁机制

multiprocessing 提供了Lock(锁)机制,通过对共享资源上锁的方式可避免多个进程的访问冲突。

例如,有两个进程同时写一个文件 share.txt,在代码中就可以通过 Lock 的使用避免写入文件时发生混乱,即:
import multiprocessing
import sys
def process1(lock, f):
    with lock:
        fs = open(f, 'a+')
        times = 10
        while times > 0:
            fs.write('process1 write\n')
            times -= 1
        fs.close()

def process2(lock, f):
    lock.acquire()
    try:
        fs = open(f, 'a+')
        times = 10
        while times > 0:
            fs.write('process2 write\n')
            times -= 1
        fs.close()
    finally:
        lock.release()

if __name__ == '__main__':
    lock = multiprocessing.Lock()
    f = 'share.txt'
    p1 = multiprocessing.Process(target = process1, args=(lock, f))
    p2 = multiprocessing.Process(target = process2, args=(lock, f))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
在程序代码中,进程 process2 通过 lock.acquire() 上锁,当完成对文件 share.txt 的写操作时,通过 lock.release() 解锁,只有在解锁的情况下,进程 process1 才有权利对 share.txt 执行写操作。

2) RLock

RLock 是 Lock 的递归版。lock.acquire() 是请求锁。当前的锁为锁定状态时,lock.acquire() 会阻塞等待锁释放。如果写了两个 lock.acquire() 会产生死锁,则第二个 lock.acquire() 会永远等待。

使用 RLock 则不会出现这种情况,RLock 支持给同一资源上多把锁,上多少把锁,就释放多少次。

3) Semaphore

Semaphore 有信号量的意思,与 Lock 有些类似,可以指定允许访问资源的进程数量。

通俗来讲就是,该资源有多个门,每个门对应一把锁。一个进程访问了该资源,锁了门,还有其他门可以使用。如果所有的门都被锁了,那么新的进程就必须等待现有进程退出并释放锁后才可以访问。测试程序为:
import multiprocessing
import time

def process1():
    s.acquire()
    print('process1 acquire and it will sleep 5 s')
    time.sleep(5)
    print('process1 release')
    s.release()

def process2():
    s.acquire()
    print('process2 acquire and it will sleep 5 s')
    time.sleep(5)
    print('process2 release')
    s.release()

def process3():
    print('process3 try to start')
    s.acquire()
    print('process3 acquire and it will sleep 5 s')
    time.sleep(5)
    print('process3 release')
    s.release()

if __name__ == '__main__':
    s = multiprocessing.Semaphore(2)
    p1 = multiprocessing.Process(target = process1)
    p2 = multiprocessing.Process(target = process2)
    p3 = multiprocessing.Process(target = process3)
    p1.start()
    time.sleep(1)
    p2.start()
    time.sleep(1)
    p3.start()
    time.sleep(1)
运行结果为:

process1 acquire and it will sleep 5 s
process2 acquire and it will sleep 5 s
process3 try to start
process1 release
process3 acquire and it will sleep 5 s
process2 release
process3 release

在测试程序中,通过 Semaphore(2) 限制为最多两个进程同时访问,随后依次启动了 3 个进程,当前两个进程未退出时,进程 3 尝试访问失败,当进程 1 退出后,进程 3 才获得权限。

进程间通信

进程之间有时是需要互相通信的。Python 的 multiprocessing 模块提供了 Pipe、Queue 等多种方式实现进程间通信。

Pipe 顾名思义,就是管道。Queue 是队列。

Queue 是多进程安全的队列,可以实现多进程之间的数据传递。put 用于插入数据到队列,还有两个可选参数:blocked 和 timeout。如果 blocked 为 True(默认值),并且 timeout 为正值,则 put 会阻塞 timeout 指定的时间,直到队列有剩余的空间。如果超时,则会抛出 Queue.Full 异常。如果 blocked 虽为 False,但 Queue 已满,则会立即抛出 Queue.Full 异常。

get 可以从队列中读取并删除一个元素,有两个可选参数:blocked 和 timeout。如果 blocked 为 True(默认值),并且 timeout 为正值,在等待时间内没有读取到任何元素,则会抛出 Queue.Empty 异常。如果 blocked 为 False,则有两种情况存在:如果 Queue 有一个值可用,则立即返回该值;否则,如果队列为空,则立即抛出 Queue.Empty 异常

以 Queue 为例创建两个子进程:一个子进程往 Queue 中写数据;另外一个子进程从 Queue 中读数据,代码为:
import multiprocessing
import time

def writer(q):
    for value in ['1', '2', '3']:
        print('Process-writer put %s in queue' % value)
        q.put(value)
        time.sleep(1)

def reader(q):
    while True:
        value = q.get(True)
        print('Process-reader get %s from queue' % value)

if __name__ == '__main__':
    q = multiprocessing.Queue()
    pw = multiprocessing.Process(target=writer, args=(q,))
    pr = multiprocessing.Process(target=reader, args=(q,))
    pw.start()
    pr.start()
    pw.join()
    pr.terminate()
运行结果为:

Process-writer put 1 in queue
Process-reader get 1 from queue
Process-writer put 2 in queue
Process-reader get 2 from queue
Process-writer put 3 in queue
Process-reader get 3 from queue

负责写的进程 writer 分别向 queue 中写入 1、2、3,负责读的进程 reader 依次将内容从 queue 中读出。

相关文章