Juntao
Published on 2023-10-10 / 340 Visits
0

Python concurrent.futures 标准库

concurrent.futures 是 Python 标准库中的一个模块,提供了高级接口,用于并发执行函数,通常使用线程池或进程池。这个库的主要作用是简化并发编程,并提供了一种简单的方式来并行执行任务。以下是 concurrent.futures 的主要作用和典型使用场景:

  1. 并发执行任务concurrent.futures 允许您在多个线程或进程中同时执行函数,从而提高了程序的性能。

  2. 线程池和进程池:它提供了 ThreadPoolExecutorProcessPoolExecutor 两种执行器,分别用于创建线程池和进程池。您可以根据任务的性质选择合适的执行器。

  3. 简化并发编程:库提供了高级的 API,使得编写并发代码更加简单和可读。您不必手动管理线程或进程,而是依靠库来处理。

  4. 异步结果处理:您可以提交函数执行任务并立即获得一个 Future 对象,然后在需要时等待获取任务的结果。这样可以使您的代码更具响应性。

  5. 错误处理concurrent.futures 允许您在任务执行期间捕获异常并处理它们,以确保程序的稳定性。

  6. 限制并发数量:您可以设置最大并发数,以控制同时执行的任务数量,从而避免资源竞争问题。

  7. 批量处理:通常用于对大量数据进行相同的处理,例如批量下载文件、批量处理图像等。

典型的使用场景包括:

  • I/O 密集型任务:当您的程序需要执行大量 I/O 操作,例如文件读写、网络请求等时,使用 concurrent.futures 可以提高效率,因为它允许并行执行这些阻塞任务。

  • CPU 密集型任务:如果您有大量的 CPU 密集型计算任务,使用 ProcessPoolExecutor 可以充分利用多核处理器,提高计算速度。

  • 并行化迭代:当您需要对一个可迭代对象的元素进行相同的操作时,使用 executor.map 可以方便地并行化处理。

  • 异步处理:您可以使用 Future 对象的 .result() 方法等待异步任务的完成,而不必在任务执行期间一直等待。

以下是 concurrent.futures 不同典型用法的代码示例,每个示例都包含了一个简单的使用情况:

  1. 并发执行 I/O 密集型任务(线程池)

    import concurrent.futures
    
    def fetch_url(url):
        # 模拟一个网络请求
        import time
        time.sleep(2)
        return f"Data from {url}"
    
    urls = ["https://example.com", "https://google.com", "https://github.com"]
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        results = list(executor.map(fetch_url, urls))
    
    for result in results:
        print(result)
    
  2. 并发执行 CPU 密集型任务(进程池)

    import concurrent.futures
    
    def compute_square(number):
        return number * number
    
    numbers = [1, 2, 3, 4, 5]
    
    with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
        results = list(executor.map(compute_square, numbers))
    
    for result in results:
        print(result)
    
  3. 异步结果处理

    import concurrent.futures
    
    def slow_function():
        import time
        time.sleep(2)
        return "Slow function is done"
    
    with concurrent.futures.ThreadPoolExecutor() as executor:
        future = executor.submit(slow_function)
    
    # 在需要结果时等待任务完成
    result = future.result()
    print(result)
    
  4. 批量处理任务

    import concurrent.futures
    
    def process_image(image_path):
        # 模拟图像处理任务
        return f"Processed {image_path}"
    
    image_paths = ["image1.jpg", "image2.jpg", "image3.jpg"]
    
    with concurrent.futures.ThreadPoolExecutor() as executor:
        results = list(executor.map(process_image, image_paths))
    
    for result in results:
        print(result)