Python 08_Python并发编程之多进程

一、Python 并发编程简介

1.1 Python并发编程简介

并发编程允许程序同时执行多个任务,不同的任务可以在不同的处理器核心上运行,从而提高了整体的性能和效率。特别是在需要处理大量数据、高并发访问或实时性要求较高的应用中,使用并发编程技术是至关重要的。

Python 作为一门广泛使用的编程语言,提供了多种并发编程的工具和技术,使得开发人员能够轻松地在其应用程序中实现并发性,它提供了多种方式来实现并发编程:

  • 多进程(multiprocessing)编程:Python 中的 multiprocessing 模块使得在 Python 中创建和管理进程变得简单,每个进程都有自己的内存空间,可以实现真正的并行处理。
  • 多线程(threading)编程: Python 中的 threading 模块可以轻松创建和管理线程,允许程序同时执行多个线程,并在不同的任务之间切换执行。
  • 异步(协程coroutine)编程: Python 中的 asyncio 模块提供了协程(coroutine)的支持,允许程序在等待 I/O 操作的同时执行其它任务,提高了程序的响应性能。

Python 的易用性和灵活性使得它成为了许多项目中首选的编程语言之一。其丰富的并发编程工具和库使得开发者能够根据项目需求选择最合适的并发模型,以提高程序的效率和性能。因此,Python 在并发编程方面具有广泛的应用前景和实际价值。

二、Python 多进程编程概述

2.1 进程(processing)概述

进程(processing) 是程序执行的一个实例,它拥有独立的内存空间、地址空间和资源CPU、内存等)。在操作系统级别,进程可以被看作是独立的执行单元。程序运行时系统就会创建一个进程(processing),并为它分配资源,然后把该进程放入进程就绪队列,进程调度器选中它的时候就会为它分配CPU时间(进程切换需要的资源很最大,效率低),程序开始真正运行。Python 中每次启动执行一个程序,就会产生一个独立的进程。

多进程编程 是指在同一程序中创建多个独立的进程来执行任务,每个进程都有自己独立的内存空间,相互之间不干扰。Python 中因为 GIL 锁的存在,对于 CPU 密集型任务(例如计算密集型操作),使用多进程可以提高程序的效率。

Tips: GIL(全局解释器锁)是CPython解释器中的一个机制,用于防止同一进程下的多个线程同时执行。具体来说:

  • GIL的工作原理:在CPython中,每个线程在执行前必须先获取GIL锁,这确保了任何时候只有一个线程可以执行Python字节码。这种机制防止了多个线程同时修改共享数据,从而避免了潜在的竞争条件和数据不一致问题。
  • GIL的影响:由于GIL的存在,Python的多线程在CPU密集型任务上的性能并不理想,因为即使有多个处理器核心可用,GIL也只允许一个线程在任何给定时间运行。这限制了Python多线程程序在充分利用多核处理器方面的能力。
  • 处理GIL的限制:为了绕过GIL的限制,可以采取多进程编程(每个进程有自己的GIL),使用C扩展(这些扩展不受GIL影响),或采用异步编程模型(如asyncio库),以减少对线程的依赖。
  • 总体来说,GIL是CPython设计中的一个权衡,旨在简化内存管理和避免复杂的线程安全问题,但同时也限制了多线程在CPU密集型任务中的性能。

多进程编程有以下优点:

  • 充分利用多核处理器:多进程可以同时在多个处理器核心上运行,从而充分利用硬件资源,加速程序执行。
  • 改善程序性能:对于CPU密集型任务,多进程可以将计算任务分发到多个进程,从而提高程序性能。
  • 提高程序健壮性:多进程模式可以提高程序的健壮性,一个进程的崩溃不会导致整个应用程序崩溃。
  • 并行执行:多进程允许并行执行多个任务,适用于需要同时处理多个任务的情况,如同时处理多个客户端请求。

Python提供了多个多进程编程的模块,其中两个主要模块是:

  • multiprocessing 模块:提供了多进程编程的基本功能,支持进程的创建、管理和通信。
  • concurrent.futures 模块:构建在multiprocessing之上,提供了更高级的接口,简化了并行编程的任务调度和结果获取。

三、multiprocessing 多进程(processing)编程

3.1 multiprocessing 模块简介

Python 中的 multiprocessing 模块用来支持多进程的创建和管理。它提供了创建进程的类和函数,使得在 Python 中使用多进程变得简单和方便。

multiprocessing 模块提供本地和远程并发,通过使用子进程而不是线程有效地绕过全局解释器锁。因此,多处理模块允许程序员充分利用给定机器上的多个处理器。

multiprocessing 模块实现了 ProcessPool 两个类,Process 类提供了进程创建和管理的基本方法,Pool 类提供了进程池的创建和管理的基本方法:

3.2 Process类

multiprocessing.Process 类用于创建新的子进程。通过实例化 Process 类并传入要执行的函数 及 参数(可选)等信息,可以创建一个新的子进程对象。然后调用 start() 方法启动进程,调用 join() 方法等待进程结束。每个 Process 实例都有自己独立的内存空间等资源。

Process 类定义如下:

1
2
3
4
5
6
7
class multiprocessing.Process(group=None,
                            target=None, 
                            name=None, 
                            args=(), 
                            kwargs={}, 
                            *, 
                            daemon=None)
  • 应始终使用关键字参数调用构造函数。
  • group 应该始终是 None ;它仅用于兼容 threading.Thread 。
  • target 是由 run() 方法调用的可调用对象。它默认为 None ,意味着什么都没有被调用。
  • name 是进程名称。
  • args 是目标调用的参数元组。
  • kwargs 是目标调用的关键字参数字典。
  • 如果提供,则键参数 daemon 将进程 daemon 标志设置为 True 或 False 。如果是 None (默认值),则该标志将从创建的进程继承。

在默认情况下,不会将任何参数传递给 target。 args 参数默认值为 (),可被用来指定要传递给 target 的参数列表或元组。

如果子类重写构造函数,它必须确保它在对进程执行任何其它操作之前调用基类构造函数( Process.__init__() )。

官方文档:https://docs.python.org/zh-cn/3.11/library/multiprocessing.html#multiprocessing.Process

Tips: Process 类是 multiprocessing 模块的核心类,它是所有进程的基类。创建 Process 类实例对象时,使用 target 参数指定进程要执行的函数(任务),args 参数传递给该函数(任务)的参数(参数以元组的形式传入,若不需要参数可以不指定。)。

以下是一个使用 multiprocessing 模块 Process 类 创建进程 和 使用的简单示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import os
import multiprocessing

def square_numbers():
    for i in range(2):
        print(f" {i} 的平方 {i * i}")
        print(f"PID: {os.getpid()}, PPID: {os.getppid()}")
 
if __name__ == '__main__':
    print(f"Main: PID: {os.getpid()}")
    # 创建进程
    process = multiprocessing.Process(target=square_numbers)
    print(type(process))
    # 启动进程
    process.start()
    # 等待进程完成
    process.join()
    print("进程结束")
# Output:
# Main: PID: 11598
# <class 'multiprocessing.context.Process'>
#  0 的平方 0
# PID: 11600, PPID: 11598
#  1 的平方 1
# PID: 11600, PPID: 11598
# 进程结束

在这个示例中,在主进程(pid:11598)中使用 square_numbers 函数(任务)来 实例化 multiprocessing.Process 类对象 process(子进程对象),然后调用 process.start() 启动子进程执行 和 process.join() 等待子进程执行完成。从执行输出结果,可以看出,在主进程(pid:11598)中创建了子进程(pid:11600),并在子进程中执行了 square_numbers 函数(任务)。

Python 中 使用 multiprocessing.Process 类 创建进程的方式有两种:直接实例化Process类对象使用Process派生业务类创建进程

  • 直接实例化Process类对象方式创建进程

以 方法包装方式创建进程的主要步骤为:在创建进程时以默认值参数的方式声明目标函数以及传入目标函数的参数(元组的方式)。

示例如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import os
import time
from multiprocessing import Process

def worker(name):
    """进程的创建方式: 1.方法包装"""
    print("当前进程ID:", os.getpid())
    print("父进程ID", os.getppid())
    print(f"Process:{name} start")
    time.sleep(3)
    print(f"Process:{name} end")

if __name__ == "__main__":
    print("当前main进程ID: ", os.getpid())
    # 创建进程
    p1 = Process(target=worker, args=("p1",))
    p2 = Process(target=worker, args=("p2",))
    print("type p1: ", type(p1))
    print("type p2: ", type(p2))

    print("启动 p1,p2 运行...")
    p1.start()
    p2.start()
    print("等待main进程结束")
    p1.join()
    p2.join()
    print("main进程结束")
# Output:
# 当前main进程ID:  18484
# type p1:  <class 'multiprocessing.context.Process'>
# type p2:  <class 'multiprocessing.context.Process'>
# 启动 p1,p2 运行...
# 等待main进程结束
# Process:p1 start
# 当前进程ID: 18486
# 父进程ID 18484
# Process:p2 start
# 当前进程ID: 18487
# 父进程ID 18484
# Process:p1 end
# Process:p2 end
# main进程结束

Tips: args参数元组中如果只有一个元素, 是需要加逗号的!!! 这是因为括号 () 既可以表示tuple,又可以表示数学公式中的小括号,所以如果没有加逗号,那里面放什么类型的数据那么类型就会是什么。

  • 使用Process派生新业务类方式创建进程

以 类包装方式创建进程的主要步骤为:

  • 必须重写 run 方法,这个 run 方法里面就是写业务逻辑,在进程 start() 后将会被调用
  • 根据需要编写 __init__() 方法,可以在 __init__() 方法中设置目标函数(任务)以及传入目标函数的参数。如果有构造函数,必须重载父类的 init 方法。
  • 重写 run 方法,在 run 方法中编写进程的执行代码。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import time
from multiprocessing import Process

class MyProcess(Process):
    """进程的创建方式: 2.类包装"""
    def __init__(self, name):
        # 这里一定要调用父类的初始化函数,否则无法创建进程
        Process.__init__(self)
        self.name = name

    def run(self):
        print(f"进程{self.name} 启动")

        print("当前进程ID:", os.getpid())
        print("父进程ID", os.getppid())
        time.sleep(3)
        print(f"进程{self.name} 结束")


if __name__ == "__main__":
    print("当前main进程ID: ", os.getpid())
    print("创建进程")
    p1 = MyProcess("p1")
    p2 = MyProcess("p2")

    print("type p1: ", type(p1))
    print("type p2: ", type(p2))

    print("启动 p1,p2 运行...")
    p1.start()
    p2.start()
    print("等待main进程结束")
    p1.join()
    p2.join()
    print("main进程结束")
# Output:
# 当前main进程ID:  20373
# 创建进程
# type p1:  <class '__main__.MyProcess'>
# type p2:  <class '__main__.MyProcess'>
# 启动 p1,p2 运行...
# 等待main进程结束
# 进程p1 启动
# 当前进程ID: 20375
# 父进程ID 20373
# 进程p2 启动
# 当前进程ID: 20376
# 父进程ID 20373
# 进程p1 结束
# 进程p2 结束
# main进程结束

3.2 Pool类

进程池是一种预先创建的一组工作进程,这些进程可以重复使用来执行多个任务。通过进程池,可以避免频繁创建和销毁进程的开销,从而提高效率。

如果有大量的任务需要多进程并行执行,如果使用 Process类 创建子进程 去执行这些任务 会导致 系统频繁的创建和销毁进程,造成系统资源的极度浪费。这种情况下可以使用 multiprocessing.Pool类 来创建进程池,它可以方便地管理多个进程。通过 Pool 类的 map()、apply()等方法,可以将任务分配给进程池中的多个进程并行执行。进程池会自动管理进程的创建、复用 和 销毁,极大的提高了并行处理的效率。

一个进程池对象,它控制可以提交作业的工作进程池。它支持带有超时和回调的异步结果,以及一个并行的 map 实现。

Pool 类的定义如下:

1
class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
  • processes 是要使用的工作进程数目。如果 processes 为 None,则使用 os.cpu_count() 返回的值。
  • 如果 initializer 不为 None,则每个工作进程将会在启动时调用 initializer(*initargs)
  • maxtasksperchild 是一个工作进程在它退出或被一个新的工作进程代替之前能完成的任务数量,为了释放未使用的资源。默认的 maxtasksperchild 是 None,意味着工作进程寿与池齐。
  • context 可被用于指定启动的工作进程的上下文。通常一个进程池是使用函数 multiprocessing.Pool() 或者一个上下文对象的 Pool() 方法创建的。在这两种情况下, context 都是适当设置的。

Pool 参考文档:https://docs.python.org/zh-cn/3.11/library/multiprocessing.html#multiprocessing.pool.Pool

下面是一个 使用 multiprocessing 模块的 Pool 类 创建进程池 和 使用的简单的示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import os
import time
import multiprocessing

def worker(num):
    """该函数将在子进程中执行"""
    print(f"Worker {num}, PID: {os.getpid()}, PPID: {os.getppid()}")
    time.sleep(10 - )

if __name__ == '__main__':
    print(f"Main: PID: {os.getpid()}")
    # 创建进程池
    pool = multiprocessing.Pool(4)
    # 启动进程池中的进程
    pool.map(worker, range(10))
    # 关闭进程池
    pool.close()
    # 等待进程池中的进程结束
    pool.join()

    print("Main: 进程结束")
# Output:
# Main: PID: 24570
# Worker 0, PID: 24572, PPID: 24570
# Worker 1, PID: 24573, PPID: 24570
# Worker 2, PID: 24574, PPID: 24570
# Worker 3, PID: 24575, PPID: 24570
# Worker 4, PID: 24572, PPID: 24570
# Worker 5, PID: 24573, PPID: 24570
# Worker 6, PID: 24574, PPID: 24570
# Worker 7, PID: 24575, PPID: 24570
# Worker 8, PID: 24573, PPID: 24570
# Worker 9, PID: 24575, PPID: 24570
# Main: 进程结束

在上面的代码中,Pool类的构造函数中指定了进程池的大小为4,然后通过调用map方法来启动进程池中的进程。map方法会将worker函数和range(10)序列中的每个元素一一对应,然后将它们作为参数传递给进程池中的进程。最后,调用close方法关闭进程池,并调用join方法等待所有进程结束。

从运行结果输出可以看到 ,在主进程(pid:24570)中创建了4个子进程(pid:24572、24573、24574、24575),并在每子进程中执行了 多个 worker 函数(任务),实现了进程复用。

Tips: 在使用 Pool 类创建 和 使用 进程池时,需要注意:

  • 如果不调用 close() 方法,则进程池不会自动销毁,而是直到进程池中的进程执行完毕后才会销毁。
  • 如果不调用 join() 方法,主进程会等待子进程执行完毕后才会结束,但是如果子进程执行过程中出了问题,主进程将无法得知子进程的执行状态,也就无法结束。
  • 主进程中要先使用 close() 关闭进程池 或者 terminate() 立即停止工作进程(不必等待未完成的任务),防止更多的任务提交到该池,才可以执行 join() 阻塞主进程,等待所有子进程完成。

multiprocessing 模块中 Pool 类 常用方法:

  • apply 和 apply_async
    • apply(func, args): 同步执行,类似于普通函数调用,阻塞主进程直到任务完成。
    • apply_async(func, args): 异步执行,不阻塞主进程,通过回调函数获取结果。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
from multiprocessing import Pool

def worker(num):
    print(f"Worker {num}, PID: {os.getpid()}, PPID: {os.getppid()}")
    return num * num

def print_result(result):
    print("Result:", result)

if __name__ == '__main__':
    with Pool(processes=4) as pool:
        # 同步等待结果
        result = pool.apply(worker, (10,))
        print("Synchronous Result:", result)

        # 异步等待结果
        pool.apply_async(worker, (20,), callback=print_result)

        pool.close()
        pool.join()
  • map 和 map_async
    • map(func, iterable): 同步映射,阻塞主进程直到所有任务完成,返回结果列表。
    • map_async(func, iterable): 异步映射,不阻塞主进程,通过回调函数获取结果。

    Tips: 使用 map 方法传参数时,将 iterable 的每个元素作为参数,相当于一次提交多个任务。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
from multiprocessing import Pool

def worker(num):
    print(f"Worker {num}, PID: {os.getpid()}, PPID: {os.getppid()}")
    return num * num

def print_result(results):
    print("Results:", results)

if __name__ == '__main__':
    with Pool(processes=4) as pool:
        # 同步
        results = pool.map(worker, range(10))
        print("Synchronous Results:", results)

        # 异步
        pool.map_async(worker, range(10), callback=print_result)

        pool.close()
        pool.join()
  • starmap 和 starmap_async
    • starmap(func, iterable_of_tuples): 类似于 map,但可以传递多个参数。
    • starmap_async(func, iterable_of_tuples): 异步版本的 starmap。

3.3 Python多进程同步与协调(Semaphore, Lock, Event, Condition)

  • Semaphore(信号量) :用于限制可以同时访问某个资源的进程数。在进程间同步对共享资源的访问非常有用。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import multiprocessing

semaphore = multiprocessing.Semaphore(2)  # 允许两个进程同时访问资源

def worker(semaphore):
    semaphore.acquire()
    try:
        # 执行任务
        pass
    finally:
        semaphore.release()
  • Lock(互斥锁) :一旦一个进程或者线程拿到了锁,后续的任何其它进程或线程的其它请求都会被阻塞直到锁被释放。用于确保一次只有一个进程可以访问共享资源。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import multiprocessing

lock = multiprocessing.Lock()

def worker(lock):
    lock.acquire()
    try:
        # 执行任务
        pass
    finally:
        lock.release()
  • Event(事件) :用于在进程间同步操作,一个进程可以设置或等待事件。
1
2
3
4
5
6
7
8
9
import multiprocessing

event = multiprocessing.Event()

def setter(event):
    event.set()  # 设置事件

def waiter(event):
    event.wait()  # 等待事件被设置
  • Condition(条件变量) :用于在进程间同步操作。与Lock类似,但允许进程在某些条件下等待或通知其它进程, 一个进程可以设置或等待事件,也可以设置或等待多个事件。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import multiprocessing

condition = multiprocessing.Condition()

def worker_with_condition_wait(condition):
    with condition:
        condition.wait()  # 等待通知
        # 执行任务

def worker_with_condition_notify(condition):
    with condition:

        # 完成前置任务
        condition.notify()  # 通知
        # condition.notify_all()  # 通知所有
        

3.4 multiprocessing的一些注意事项

在使用 multiprocessing 模块进行多进程编程时,需要注意以下几点:

  • 全局变量的共享问题

每个子进程都有自己的内存空间,因此全局变量在多进程之间不能直接共享。如果需要共享数据,可以使用 multiprocessing.Value 或 multiprocessing.Array 来创建共享内存。

  • 进程间通信问题

多个进程之间需要相互通信的,可以使用 multiprocessing.Queue 或 multiprocessing.Pipe 来进行进程间通信。

  • 进程池的使用

如果需要同时启动多个进程,可以使用进程池来管理进程。进程池可以避免频繁地启动和关闭进程所带来的开销,提高程序的效率。

  • 子进程的异常处理

每个子进程都是一个独立的进程,当子进程出现异常时,主进程并不会收到通知。因此需要在子进程中进行异常处理,并将异常信息通过进程间通信的方式传递给主进程。

  • 进程的启动方式

可以使用 multiprocessing.Process 来创建进程,也可以使用 multiprocessing.Pool 来创建进程池。进程池可以方便地管理多个进程,避免手动启动和关闭进程所带来的麻烦。

四、multiprocessing模块进程通信

在多进程编程中,不同的进程之间需要进行通信。multiprocessing模块提供了多种进程间通信的方式,例如使用队列、管道、共享内存等。

4.1 队列(Queue)

队列是一种常用的进程间通信方式,多个进程可以通过共享的队列(使用了操作系统给开辟的一个队列空间)进行数据交换,实现进程间的通信。multiprocessing模块中提供了 Queue 类,用来创建队列,队列是线程/进程安全的,可以在多个进程之间安全地传递数据。下面是一个简单的示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import multiprocessing

def producer(q):
    """该函数将在生产者进程中执行"""
    for i in range(10):
        q.put(i)

def consumer(q):
    """该函数将在消费者进程中执行"""
    while True:
        item = q.get()
        if item is None:
            break
        print(item)

if __name__ == '__main__':
    # 创建队列
    q = multiprocessing.Queue()
    # 创建生产者进程
    p1 = multiprocessing.Process(target=producer, args=(q,))
    # 创建消费者进程
    p2 = multiprocessing.Process(target=consumer, args=(q,))
    # 启动进程
    p1.start()
    p2.start()
    # 等待进程结束
    p1.join()
    # 发送结束信号
    q.put(None)
    p2.join()

在上面的代码中,首先创建了一个Queue对象,然后创建了一个生产者进程和一个消费者进程。生产者进程通过调用put方法将0~9的数字放入队列中,消费者进程通过调用get方法从队列中获取数据,并将其打印出来。最后,调用put方法发送结束信号,然后等待两个进程结束。

任务调度:可以使用队列(如multiprocessing.Queue)来调度任务,其中生产者进程将任务放入队列,消费者进程从队列中取出任务并执行。

4.2 管道(Pipe)

管道(Pipe) 是另一种常用的进程间通信方式。multiprocessing模块中 Pipe 类提供了进程间通信的管道。管道包含两个连接,每个连接对应一个进程,可以双向传递数据。Pipe方法返回(conn1, conn2)代表一个管道的两个端,通过 Pipe 可以实现两个进程之间的通信。

Pipe 类实例化创建对象 duplex 参数:

  • 如果duplex参数为True(默认值),那么这个参数是全双工模式,也就是说conn1和conn2均可收发。
  • 若duplex为False,conn1只负责接收消息,conn2只负责发送消息. send和recv方法分别是发送和接受消息的方法。

例如,在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息,如果没有消息可接收,recv方法会一直阻塞. 如果管道已经被关闭,那么recv方法会抛出EOFError。下面是一个简单的管道示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import multiprocessing

def producer(conn):
    """该函数将在生产者进程中执行"""
    for i in range(10):
        conn.send(i)
    conn.close()

def consumer(conn):
    """该函数将在消费者进程中执行"""
    while True:
        item = conn.recv()
        if item is None:
            break
        print(item)

if __name__ == '__main__':
    # 创建管道
    conn1, conn2 = multiprocessing.Pipe()
    # 创建生产者进程
    p1 = multiprocessing.Process(target=producer, args=(conn1,))
    # 创建消费者进程
    p2 = multiprocessing.Process(target=consumer, args=(conn2,))
    # 启动进程
    p1.start()
    p2.start()
    # 等待进程结束
    p1.join()
    # 发送结束信号
    conn1.send(None)
    p2.join()

在上面的代码中,首先创建了一个管道,然后创建了一个生产者进程和一个消费者进程。生产者进程通过调用send方法将0~9的数字发送到管道中,消费者进程通过调用recv方法从管道中获取数据,并将其打印出来。最后,调用send方法发送结束信号,然后等待两个进程结束。

4.3 共享内存(Shared Memory)

共享内存 是一种高效的进程间通信方式,它允许多个进程共享同一块内存区域。multiprocessing模块中提供了Value和Array类,可以用来创建共享内存。下面是一个简单的示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import multiprocessing

def worker1(n):
    """该函数将在进程1中执行"""
    n.value += 1
    print('worker1:', n.value)

def worker2(n):
    """该函数将在进程2中执行"""
    n.value += 1
    print('worker2:', n.value)

if __name__ == '__main__':
    # 创建共享内存
    n = multiprocessing.Value('i', 0)
    # 创建进程1
    p1 = multiprocessing.Process(target=worker1, args=(n,))
    # 创建进程2
    p2 = multiprocessing.Process(target=worker2, args=(n,))
    # 启动进程
    p1.start()
    p2.start()
    # 等待进程结束
    p1.join()
    p2.join()

在上面的代码中,首先创建了一个Value对象,用于存储一个整数值。然后创建了两个进程,每个进程都会将共享内存中的值加1,并将其打印出来。最后,等待两个进程结束。

除了Value类之外,multiprocessing模块还提供了 Array类用于创建共享内存数组 和

Manager管理器 提供了一种创建共享数据的方法,从而可以在不同进程中共享,甚至可以通过网络跨机器共享数据。管理器维护一个用于管理 共享对象 的服务,它能提供 python 所支持的任何数据结构。其它进程可以通过代理访问这些共享对象。

利用Manager创建字典,列表等对象,传入进程,在各进程所对应的方法中修改上面创建的对象:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from multiprocessing import Manager, Process


def func1(name,m_list,m_dict):
     m_dict['area'] = 'beijing'
     m_list.append('跳水')


def func2(name, m_list, m_dict):
    m_dict['work'] = '奥运会'
    m_list.append('体操')

if __name__ == "__main__":
    with Manager() as mgr:
        m_list = mgr.list()
        m_dict = mgr.dict()
        m_list.append("游泳")
        # 两个进程不能直接互相使用对象,需要互相传递
        p1 = Process(target=func1, args=('p1', m_list, m_dict))
        p1.start()
        p1.join()   # 等p1进程结束,主进程继续执行
        print(m_list)
        print(m_dict)
        p2 = Process(target=func2, args=('p1', m_list, m_dict))
        p2.start()
        p2.join()   # 等p2进程结束,主进程继续执行
        print(m_list)
        print(m_dict)

Tips: Pickle 序列化和反序列化 Python 对象

  • pickle模块用于序列化和反序列化 Python 对象,可以将对象转换为字节流进行传输。在进程间通信中,可以使用pickle将对象序列化后传输,再在另一端反序列化得到原始对象。

五、concurrent.futures 模块的多进程编程

5.1 ProcessPoolExecutor简介

在 python 中创建进程池有两种方式,可以使用内置模块 multiprocessing.Pool(),也可以使用 concurrent.futures 中的 ProcessPoolExecutor 类,它是 Executor 的子类,它使用进程池来异步地执行调用。 ProcessPoolExecutor 会使用 multiprocessing 模块,这允许它绕过 全局解释器锁 但也意味着只可以处理和返回可封存的对象。

Tips: main 模块必须可以被工作者子进程导入。这意味着 ProcessPoolExecutor 不可以工作在交互式解释器中。

Executor类定义了三种用于控制进程池的方法:submit()、map()和shutdown()。

  • submit():函数接受一个函数以及对应的参数(分派一个要执行的函数),并将异步执行,调用会立即返回Future对象。
  • map():函数用于将函数应用于可迭代对象(如列表)中的每个元素,该函数对应于元素的每个进程都将异步运行。
  • shutdown():关闭执行器。

Executor在创建类时启动,并且必须通过调用shutdown()显式关闭,这将释放Executtor持有的所有资源,当然也可以自动关闭。

ProcessPoolExecutor 类的定义如下:

1
2
3
4
5
class concurrent.futures.ProcessPoolExecutor(max_workers=None, 
                                            mp_context=None, 
                                            initializer=None, 
                                            initargs=(), 
                                            max_tasks_per_child=None)
  • 一个 Executor 子类使用最多由 max_workers 个进程组成的进程池异步执行调用。 如果 max_workers 是 None 或未给出,则默认为机器上的处理器数量。如果 max_workers 小于或等于 0 ,则会引发 ValueError 。在 Windows 系统中,max_workers 必须小于或等于 61 ,否则会引发 ValueError 。如果 max_workers 为 None ,则默认情况下最大为 61 ,即使有更多的处理器可用。
  • mp_context 可以是多进程上下文,也可以是 None。它将用于启动工作者。如果 mp_context 为None 或未给出,则使用默认的多进程上下文。
  • initializer 是一个可选的可调用对象,它会在每个工作进程启动时被调用;initargs 是传给 initializer 的参数元组。 如果 initializer 引发了异常,则所有当前在等待的任务以及任何向进程池提交更多任务的尝试都将引发 BrokenProcessPool。
  • max_tasks_per_child 是指定单个进程在其退出并替换为新工作进程之前可以执行的最大任务数量的可选参数。 在默认情况下 max_tasks_per_child 为 None 表示工作进程将存活与进程池一样长的时间。 当指定了最大数量时,则如果不存在 mp_context 形参则将默认使用 “spawn” 多进程启动方法。 此特性不能兼容 “fork” 启动方法。

5.2 ProcessPoolExecutor的工作流程(使用方法)

  1. 创建进程池

将进程池的容量(进程数量)设为 N,容量大小是根据cpu核数来确定的,最好不要超过电脑的核数。

1
2
# 创建进程池,并设置进程池容量
executor = ProcessPoolExecutor(6)
  1. 往进程池里添加任务

上一步创建的进程池有6个“空位置”,相当于我工厂里预留了6个生产线,但是这六个生产线还没接到任务,都在空闲状态。现在往进程池里添加任务。比如,现在有个函数task,这个函数执行时需要二个入参:

1
2
3
def my_task(a,b):
    sleep(2)
    return a+b

一次提交一个任务:可以通过调用executor的submit方法,传入函数 task 和其入参来提交任务。

1
2
# 计算88加66
future = executor.submit(my_task, 88,66)

也可以一次提交多个任务:可以通过调用executor的map方法,传入两个等长的列表或者迭代器。

1
2
# 计算11加1、22加2、33加3、44加4、55加5
futures = executor.map(my_task, [11,22,33,44,55],[1,2,3,4,5])
  1. 获取结果

单个任务的话可以直接调用future的result()方法,result()可以加timeout参数,超过timeout指定的时间就会抛出异常。

1
2
3
# 88加66的结果154,result = 154
result = future.result()
# result = future.result(timeout=5)

多个任务获取结果。

1
2
3
4
futures = executor.map(my_task, [11, 22, 33, 44, 55], [1, 2, 3, 4, 5])
# 打印所有进程的运行结果
for i in futures:
    print(i)
  1. 关闭进程池
1
executor.shutdown()

完整示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from time import sleep
from concurrent.futures import ProcessPoolExecutor

def my_task(a, b):
    sleep(2)
    return a + b

def main():
    executor = ProcessPoolExecutor(4)
    futures = executor.map(my_task, [11, 22, 33, 44, 55], [1, 2, 3, 4, 5])
    # 打印所有进程的运行结果
    for i in futures:
        print(i)

if __name__ == '__main__':
    main()

Tips: 使用 __name__=='__main__' 的方法执行 main()函数,不然会报 RuntimeError 错误。

ProcessPoolExecutor进阶方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
#提交多次任务的方法
futures = [executor.submit(task, i) for i in range(10)]

# 获取所有任务数量
num = len(executor._pending_work_items)

# 获取某个任务的状态,'RUNNING','PENDING','FINISHED','CANCELLED'
# 分别表示正在运行,排队,结束,被取消执行
state = future._state

# 取消进程,无法取消正在运行的进程
futures.cancel()

# 如果任务已完成或被取消,则返回 True。
futures.done()

从可调用对象中调用 Executor 或 Future 的方法提交给 ProcessPoolExecutor 会导致死锁。

Tips: 与 multiprocessing.Pool 相比,concurrent.futures.ProcessPoolExecutor 更加面向异步编程,更适合现代 Python 应用。

Licensed under CC BY-NC-SA 4.0