Python的并发方案讨论

Python
184
0
0
2024-05-17

Python提供了三种并发方案:multiprocessingthreadingasyncio。从名字来看就是多进程,多线程和异步io。但你知道他们都适合什么场景使用,各有什么优缺点吗?

一 多进程multiprocessiog

multiprocessing是一个使用类似于该threading模块的 API 支持生成进程的包。该multiprocessing包提供本地和远程并发,通过使用子进程而不是线程有效地回避全局解释器锁。因此,它能充分利用给定机器上的多个处理器。

1.1 用法:

代码运行次数:0

复制

Cloud Studio 代码运行

from multiprocessing import Process
import os

def info(title):
    print(title)  
    print('module name:', __name__)
    print('parent process:', os.getppid())  # 打印父进程id  
    print('process id:', os.getpid())  # 打印当前进程id 

def f(name):
    info('function f')  # 打印当前进程信息
    print('hello', name)

if __name__ == '__main__':
    info('main line')  # 打印主进程信息 
    p = Process(target=f, args=('bob',))  # 生成子进程对象
    p.start()  # 执行子进程
    p.join()  # 等待子进程结束

这里列举了一个multiprocessing的另一种写法。一种通过map的处理多个输入写法。

这里的Pool设置并法度为5,之后并行执行f(1), f(2), f(3)

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

因为多进程能充分利用多CPU核,这里我介绍几个实战例子。

有两个文件logcolor.py和tailf.py

logcolor.py这里过滤log的error等级,对WARNING、ERROR和CRITICAL的日志分别以对应颜色打印到屏幕上。

代码运行次数:0

复制

Cloud Studio 代码运行

#!/usr/bin/env python

ESC = chr(0x1B)

RESET = ESC + '[0m'

BLACK     = ESC + '[30m'
RED       = ESC + '[31m'
GREEN     = ESC + '[32m'
YELLOW    = ESC + '[33m'
BLUE      = ESC + '[34m'
MAGENTA   = ESC + '[35m'
CYAN      = ESC + '[36m'
WHITE     = ESC + '[37m'
BGBLACK     = ESC + '[40m'
BGRED       = ESC + '[41m'
BGGREEN     = ESC + '[42m'
BGYELLOW    = ESC + '[43m'
BGBLUE      = ESC + '[44m'
BGMAGENTA   = ESC + '[45m'
BGCYAN      = ESC + '[46m'
BGWHITE     = ESC + '[47m'

def process(line):
    if line.startswith('WARNING'):  # 如果日志记录以WARNING开头
        return YELLOW + line + RESET  # 用黄色打印 

    if line.startswith('ERROR'): # 如果日志记录以ERROR开头
        return BGRED + BLACK + line + RESET  # 用红色背景黑字打印 

    if line.startswith('CRITICAL'): # 如果日志记录以CRITICLE开头
        return BGCYAN + BLACK + line + RESET # 用绿色背景黑字打印 

    return line

def logcolor():
    try:
        while 1:
            line = raw_input()  # 读取输入fd
            print process(line)  # 执行颜色打印输出
    except KeyboardInterrupt:
        return
    except EOFError:
        return

if __name__ == '__main__':
    logcolor()

tailf.py则对a.log的日志输出做监控。具体来说,启动了p1进程去读取a.log的内容。同时启动了p2进程对p1读到的内容进行颜色打印。

代码运行次数:0

复制

Cloud Studio 代码运行

def tail():
    cmd = ['tail', '-100f', 'a.log']

    pipe_stdout = subprocess.PIPE if os.isatty(1) else None
    p1 = subprocess.Popen(  # p1 执行tail 读取a.log的输出内容
        cmd, stdout=pipe_stdout)  # 将p1的输出stdout重定向到pipe_stdout,即p2的输入
    if pipe_stdout:
        p2 = subprocess.Popen([  # p2执行颜色处理a.log的内容
            'python3', './logcolor.py'],
            stdin=p1.stdout)   # 将p2的输入stdin重定向到p1的输出
        p1.stdout.close()
        p2.wait()
    p1.wait()

if __name__ == '__main__':
    tail()

效果如下:

二、多threading

多threading的用法比较简单。threading.Thread生成实例,然后start(),最后join

代码运行次数:1

复制

Cloud Studio 代码运行

job_handler = JobHandler()  # JobHandler定义了__call__方法
job_worker_thread = threading.Thread(None, job_handler, "JobWorker-{}".format(i))
job_worker_thread.daemon = True
job_worker_thread.start()
job_worker_thread.join()

看一个实际的例子,用来并发处理Job。

代码运行次数:0

复制

Cloud Studio 代码运行

from queue import Queue
import threading
    
class Job(object):
    def __init__(self):
        self.next_job = None

    def set_next_job(self, next_job):
        self.next_job = next_job

    def handle(self, job_handler):
        r, out = self.do()
        if out:
            out = (self.__class__.__name__, out)
        else:
            out = tuple()
        if r is not None:
            r = '>'.join([self.__class__.__name__, r])
        else:
            if self.next_job is not None:
                self.next_job.trigger(job_handler)

        return r, out

    def trigger(self, job_handler):
        job_handler.add_next_job(self)

    def do(self):  # 执行Job
        raise NotImplementedError


class JobWrapper(Job):
    def __init__(self, prefix, current_job, next_job):
        super(JobWrapper, self).__init__()
        self.prefix = prefix
        self.current_job = current_job
        self.set_next_job(next_job)

    def handle(self, job_handler):
        self.current_job.set_next_job(self.next_job)
        job_res = self.current_job.handle(job_handler)
        if job_res is not None:
            r, out = job_res

            if out:
                out = (">".join([self.prefix, out[0]]), out[1])
            else:
                out = tuple()

            if r is not None:
                r = '>'.join([self.prefix, r])

            return r, out
        else:
            return None


class ChainedJobs(Job):
    def __init__(self, *jobs):
        super(ChainedJobs, self).__init__()
        self.jobs = jobs

    def handle(self, job_handler):
        next_job = self.next_job
        for current_job in self.jobs[::-1]:
            next_job = JobWrapper(self.__class__.__name__, current_job, next_job)
        job_handler.add_next_job(next_job)  #串联一系列job 


class ParallelJobs(Job):
    def __init__(self, *jobs):
        super(ParallelJobs, self).__init__()
        self.jobs = jobs

    def add_job(self, job):
        self.jobs.append(job)

    class _NotifyJob(Job):  # _NotifyJob继承了Job,改写了trigger方法
        def __init__(self, remain, next_job):
            super(ParallelJobs._NotifyJob, self).__init__()
            self.set_next_job(next_job)
            self.remain = remain
            self.lock = threading.Lock()

        def trigger(self, job_handler):
            continue_next_job = False
            with self.lock:
                self.remain -= 1  # 使用remain计数,每次trigger都进行减1,也就是执行权提升1,直到trigger结束
                if self.remain == 0:
                    continue_next_job = True

            if continue_next_job and self.next_job:
                self.next_job.trigger(job_handler)

    def handle(self, job_handler):
        notify_job = ParallelJobs._NotifyJob(len(self.jobs), self.next_job)
        for job in self.jobs:
            job.set_next_job(notify_job)
            job.trigger(job_handler)
        return None


class JobHandler(object):
    def __init__(self, job_mgr, handler_id):
        self.q = job_mgr.job_queue
        self.outs, self.errors = [], []

    def add_next_job(self, next_job):
        self.q.put_nowait(next_job)

    def __call__(self, *args, **kwargs):
        while True:
            j = self.q.get()
            try:
                jres = j.handle(self)
                if jres is not None:
                    r, out = jres
                    if r is not None:
                        self.errors.append(r)
                    if out:
                        self.outs.append(out)
            except Exception as e:
                imolog.error("Unknown exception thrown in handler:" + str(e))
                traceback.print_exc()
            finally:
                self.q.task_done()

    def get_errors(self):
        return self.errors

    def get_outs(self):
        return self.outs


class JobManager(object):
    def __init__(self):
        self.job_queue = Queue()
        self.job_handlers = []

    def start(self):  #启动了33个线程
        for i in range(1, 33):
            job_handler = JobHandler(self, i)
            job_worker_thread = threading.Thread(None, job_handler, "JobWorker-{}".format(i))
            job_worker_thread.daemon = True
            job_worker_thread.start()
            self.job_handlers.append(job_handler) # all are non-deamon threads

    def wait(self):
        self.job_queue.join()

    def collect_errors(self):
        es = []
        for job_handler in self.job_handlers:
            es.extend(job_handler.get_errors())
        return es

    def collect_outs(self):
        outs = []
        for job_handler in self.job_handlers:
            outs.extend(job_handler.get_outs())
        return outs

    def add_job(self, job):
        self.job_queue.put_nowait(job)
        
if __name__ == '__main__':
    job_manager = JobManager()
    job_manager.start()

三、asyino方案

Python使用await 来等待coroutine的执行完成。协程使用async描述。

代码运行次数:0

复制

Cloud Studio 代码运行

import asyncio
from concurrent.futures import Executor
from functools import partial
from typing import Any, Callable, Optional, TypeVar

T = TypeVar("T")

async def run_in_executor(  # 协程用async修饰
    executor: Optional[Executor],
    func: Callable[..., T],
    /,
    *args: Any,
    **kwargs: Any,
) -> T:
    """
    Run `func(*args, **kwargs)` asynchronously, using an executor.

    If the executor is None, use the default ThreadPoolExecutor.
    """
    return await asyncio.get_running_loop().run_in_executor(
        executor,
        partial(func, *args, **kwargs),
    )

# Example usage for running `print` in a thread.
async def main():
    await run_in_executor(None, print, "O" * 100_000)

asyncio.run(main())

四、三种方案比较

  • Procession强调的是cpu运算,比如说你可以把shell命令放在subprocess.Popen里面执行。比如说你有命令要派发到很多机器上执行。如果你有8核cpu,你可以把cpu计算任务派发到8份子任务,最后汇总。这样任务得到了8倍加速。多进程会更擅长处理cpu-bound受限的任务。
  • 在多threading,不需要多cpu。比如说你这个应用程序用来发送很多HTTP请求,然后等待网络响应到达后继续处理。在cpu等待的时间片内就可以用来处理别的线程任务。多threading擅长I/O受限的任务。
  • asyncio是处理并发任务的高效方式。它指的是一种任务的并行处理方式 。asynio是你来通过代码决定哪里什么时候进行上下文切换,而不是像多threading是由cpu决定何时切换协程。
  • CPython(主要的Python实现方案) 仍然有全局解释锁。所以多线程应用不是优先选择。这也是multiprocessing threading更推荐的原因。但是有些问题不需要分解成那么多份,特别是需要跨进程通信的场景。这也是multiprocessing 没有比 threading 更推荐的原因.

这是一段伪代码来决定你的场景需要用什么样的并发方案。

if io_bound:
    if io_very_slow:
        print("Use Asyncio")
    else:
        print("Use Threads")
else:
    print("Multi Processing")

所以你应该这样考虑:

  • CPU Bound => Multi Processing
  • I/O Bound, Fast I/O, Limited Number of Connections => Multi Threading
  • I/O Bound, Slow I/O, Many connections => Asyncio

I/O bound指的是应用需要花很多时间和慢的IO设备交谈, 比如说网络连接, 硬盘, 打印设备, 或者需要一定睡眠时间的event loop. 因此在blocking模式, 可以选择threading或者asyncio。如果IO很慢, 协程式多任务 (asyncio) 是更优的选择 (i.e. 避免资源过度争用, 死锁和竞争条件)

五、性能比较:

以下列举了3种并发方案执行test函数的运行时间对比:

concurrency

execute time

none

60.39s

multiprocessiing

53.61s

threading

48.23s

asyncio

47.92s

代码运行次数:0

复制

Cloud Studio 代码运行

# "process_test.py"

from multiprocessing import Process
from threading import Thread
import time
import asyncio
start_time = time.time()

def test():
    num = 100000
    primes = 0
    for i in range(2, num + 1):
        for j in range(2, i):
            if i % j == 0:
                break
        else:
            primes += 1
    print(primes)
    
async def async_test():
    num = 100000
    primes = 0
    for i in range(2, num + 1):
        for j in range(2, i):
            if i % j == 0:
                break
        else:
            primes += 1
    print(primes)
    
async def call_tests():
    tasks = []

    for _ in range(0, 2): # 2 asyncio tasks
        tasks.append(async_test())

    await asyncio.gather(*tasks)

if __name__ == "__main__": # This is needed to run processes on Windows
    process_list = []

    test()
    test()

    print("non-concurrency:", round((time.time() - start_time), 2), "seconds") # non-concurrency
    
    start_time = time.time()


    process_list = []

    for _ in range(0, 2): # 2 processes
        process = Process(target=test)
        process_list.append(process)

    for process in process_list:
        process.start()

    for process in process_list:
        process.join()
        
    print("multi-processing:", round((time.time() - start_time), 2), "seconds") # multi-processing

    start_time = time.time()

    thread_list = []

    for _ in range(0, 2): # 2 threads
        thread = Thread(target=test)
        thread_list.append(thread)
    
    for thread in thread_list:
        thread.start()

    for thread in thread_list:
        thread.join()

    print("threading:", round((time.time() - start_time), 2), "seconds") # threading
    
    start_time = time.time()

    asyncio.run(call_tests())

    print("asyncio:", round((time.time() - start_time), 2), "seconds") # asyncio
    

输出: