Python提供了三种并发方案:multiprocessing,threading和asyncio。从名字来看就是多进程,多线程和异步io。但你知道他们都适合什么场景使用,各有什么优缺点吗?
一 多进程multiprocessiog
multiprocessing
是一个使用类似于该threading
模块的 API 支持生成进程的包。该multiprocessing
包提供本地和远程并发,通过使用子进程而不是线程有效地回避全局解释器锁。因此,它能充分利用给定机器上的多个处理器。
1.1 用法:
代码运行次数:0
复制
Cloud Studio 代码运行
from multiprocessing import Process | |
import os | |
def info(title): | |
print(title) | |
print('module name:', __name__) | |
print('parent process:', os.getppid()) # 打印父进程id | |
print('process id:', os.getpid()) # 打印当前进程id | |
def f(name): | |
info('function f') # 打印当前进程信息 | |
print('hello', name) | |
if __name__ == '__main__': | |
info('main line') # 打印主进程信息 | |
p = Process(target=f, args=('bob',)) # 生成子进程对象 | |
p.start() # 执行子进程 | |
p.join() # 等待子进程结束 |
这里列举了一个multiprocessing的另一种写法。一种通过map的处理多个输入写法。
这里的Pool设置并法度为5,之后并行执行f(1), f(2), f(3)
from multiprocessing import Pool | |
def f(x): | |
return x*x | |
if __name__ == '__main__': | |
with Pool(5) as p: | |
print(p.map(f, [1, 2, 3])) |
因为多进程能充分利用多CPU核,这里我介绍几个实战例子。
有两个文件logcolor.py和tailf.py
logcolor.py这里过滤log的error等级,对WARNING、ERROR和CRITICAL的日志分别以对应颜色打印到屏幕上。
代码运行次数:0
复制
Cloud Studio 代码运行
#!/usr/bin/env python | |
ESC = chr(0x1B) | |
RESET = ESC + '[0m' | |
BLACK = ESC + '[30m' | |
RED = ESC + '[31m' | |
GREEN = ESC + '[32m' | |
YELLOW = ESC + '[33m' | |
BLUE = ESC + '[34m' | |
MAGENTA = ESC + '[35m' | |
CYAN = ESC + '[36m' | |
WHITE = ESC + '[37m' | |
BGBLACK = ESC + '[40m' | |
BGRED = ESC + '[41m' | |
BGGREEN = ESC + '[42m' | |
BGYELLOW = ESC + '[43m' | |
BGBLUE = ESC + '[44m' | |
BGMAGENTA = ESC + '[45m' | |
BGCYAN = ESC + '[46m' | |
BGWHITE = ESC + '[47m' | |
def process(line): | |
if line.startswith('WARNING'): # 如果日志记录以WARNING开头 | |
return YELLOW + line + RESET # 用黄色打印 | |
if line.startswith('ERROR'): # 如果日志记录以ERROR开头 | |
return BGRED + BLACK + line + RESET # 用红色背景黑字打印 | |
if line.startswith('CRITICAL'): # 如果日志记录以CRITICLE开头 | |
return BGCYAN + BLACK + line + RESET # 用绿色背景黑字打印 | |
return line | |
def logcolor(): | |
try: | |
while 1: | |
line = raw_input() # 读取输入fd | |
print process(line) # 执行颜色打印输出 | |
except KeyboardInterrupt: | |
return | |
except EOFError: | |
return | |
if __name__ == '__main__': | |
logcolor() |
tailf.py则对a.log的日志输出做监控。具体来说,启动了p1进程去读取a.log的内容。同时启动了p2进程对p1读到的内容进行颜色打印。
代码运行次数:0
复制
Cloud Studio 代码运行
def tail(): | |
cmd = ['tail', '-100f', 'a.log'] | |
pipe_stdout = subprocess.PIPE if os.isatty(1) else None | |
p1 = subprocess.Popen( # p1 执行tail 读取a.log的输出内容 | |
cmd, stdout=pipe_stdout) # 将p1的输出stdout重定向到pipe_stdout,即p2的输入 | |
if pipe_stdout: | |
p2 = subprocess.Popen([ # p2执行颜色处理a.log的内容 | |
'python3', './logcolor.py'], | |
stdin=p1.stdout) # 将p2的输入stdin重定向到p1的输出 | |
p1.stdout.close() | |
p2.wait() | |
p1.wait() | |
if __name__ == '__main__': | |
tail() |
效果如下:
二、多threading
多threading的用法比较简单。threading.Thread生成实例,然后start(),最后join
代码运行次数:1
复制
Cloud Studio 代码运行
job_handler = JobHandler() # JobHandler定义了__call__方法 | |
job_worker_thread = threading.Thread(None, job_handler, "JobWorker-{}".format(i)) | |
job_worker_thread.daemon = True | |
job_worker_thread.start() | |
job_worker_thread.join() |
看一个实际的例子,用来并发处理Job。
代码运行次数:0
复制
Cloud Studio 代码运行
from queue import Queue | |
import threading | |
class Job(object): | |
def __init__(self): | |
self.next_job = None | |
def set_next_job(self, next_job): | |
self.next_job = next_job | |
def handle(self, job_handler): | |
r, out = self.do() | |
if out: | |
out = (self.__class__.__name__, out) | |
else: | |
out = tuple() | |
if r is not None: | |
r = '>'.join([self.__class__.__name__, r]) | |
else: | |
if self.next_job is not None: | |
self.next_job.trigger(job_handler) | |
return r, out | |
def trigger(self, job_handler): | |
job_handler.add_next_job(self) | |
def do(self): # 执行Job | |
raise NotImplementedError | |
class JobWrapper(Job): | |
def __init__(self, prefix, current_job, next_job): | |
super(JobWrapper, self).__init__() | |
self.prefix = prefix | |
self.current_job = current_job | |
self.set_next_job(next_job) | |
def handle(self, job_handler): | |
self.current_job.set_next_job(self.next_job) | |
job_res = self.current_job.handle(job_handler) | |
if job_res is not None: | |
r, out = job_res | |
if out: | |
out = (">".join([self.prefix, out[0]]), out[1]) | |
else: | |
out = tuple() | |
if r is not None: | |
r = '>'.join([self.prefix, r]) | |
return r, out | |
else: | |
return None | |
class ChainedJobs(Job): | |
def __init__(self, *jobs): | |
super(ChainedJobs, self).__init__() | |
self.jobs = jobs | |
def handle(self, job_handler): | |
next_job = self.next_job | |
for current_job in self.jobs[::-1]: | |
next_job = JobWrapper(self.__class__.__name__, current_job, next_job) | |
job_handler.add_next_job(next_job) #串联一系列job | |
class ParallelJobs(Job): | |
def __init__(self, *jobs): | |
super(ParallelJobs, self).__init__() | |
self.jobs = jobs | |
def add_job(self, job): | |
self.jobs.append(job) | |
class _NotifyJob(Job): # _NotifyJob继承了Job,改写了trigger方法 | |
def __init__(self, remain, next_job): | |
super(ParallelJobs._NotifyJob, self).__init__() | |
self.set_next_job(next_job) | |
self.remain = remain | |
self.lock = threading.Lock() | |
def trigger(self, job_handler): | |
continue_next_job = False | |
with self.lock: | |
self.remain -= 1 # 使用remain计数,每次trigger都进行减1,也就是执行权提升1,直到trigger结束 | |
if self.remain == 0: | |
continue_next_job = True | |
if continue_next_job and self.next_job: | |
self.next_job.trigger(job_handler) | |
def handle(self, job_handler): | |
notify_job = ParallelJobs._NotifyJob(len(self.jobs), self.next_job) | |
for job in self.jobs: | |
job.set_next_job(notify_job) | |
job.trigger(job_handler) | |
return None | |
class JobHandler(object): | |
def __init__(self, job_mgr, handler_id): | |
self.q = job_mgr.job_queue | |
self.outs, self.errors = [], [] | |
def add_next_job(self, next_job): | |
self.q.put_nowait(next_job) | |
def __call__(self, *args, **kwargs): | |
while True: | |
j = self.q.get() | |
try: | |
jres = j.handle(self) | |
if jres is not None: | |
r, out = jres | |
if r is not None: | |
self.errors.append(r) | |
if out: | |
self.outs.append(out) | |
except Exception as e: | |
imolog.error("Unknown exception thrown in handler:" + str(e)) | |
traceback.print_exc() | |
finally: | |
self.q.task_done() | |
def get_errors(self): | |
return self.errors | |
def get_outs(self): | |
return self.outs | |
class JobManager(object): | |
def __init__(self): | |
self.job_queue = Queue() | |
self.job_handlers = [] | |
def start(self): #启动了33个线程 | |
for i in range(1, 33): | |
job_handler = JobHandler(self, i) | |
job_worker_thread = threading.Thread(None, job_handler, "JobWorker-{}".format(i)) | |
job_worker_thread.daemon = True | |
job_worker_thread.start() | |
self.job_handlers.append(job_handler) # all are non-deamon threads | |
def wait(self): | |
self.job_queue.join() | |
def collect_errors(self): | |
es = [] | |
for job_handler in self.job_handlers: | |
es.extend(job_handler.get_errors()) | |
return es | |
def collect_outs(self): | |
outs = [] | |
for job_handler in self.job_handlers: | |
outs.extend(job_handler.get_outs()) | |
return outs | |
def add_job(self, job): | |
self.job_queue.put_nowait(job) | |
if __name__ == '__main__': | |
job_manager = JobManager() | |
job_manager.start() |
三、asyino方案
Python使用await 来等待coroutine的执行完成。协程使用async描述。
代码运行次数:0
复制
Cloud Studio 代码运行
import asyncio | |
from concurrent.futures import Executor | |
from functools import partial | |
from typing import Any, Callable, Optional, TypeVar | |
T = TypeVar("T") | |
async def run_in_executor( # 协程用async修饰 | |
executor: Optional[Executor], | |
func: Callable[..., T], | |
/, | |
*args: Any, | |
**kwargs: Any, | |
) -> T: | |
""" | |
Run `func(*args, **kwargs)` asynchronously, using an executor. | |
If the executor is None, use the default ThreadPoolExecutor. | |
""" | |
return await asyncio.get_running_loop().run_in_executor( | |
executor, | |
partial(func, *args, **kwargs), | |
) | |
# Example usage for running `print` in a thread. | |
async def main(): | |
await run_in_executor(None, print, "O" * 100_000) | |
asyncio.run(main()) | |
四、三种方案比较
- Procession强调的是cpu运算,比如说你可以把shell命令放在subprocess.Popen里面执行。比如说你有命令要派发到很多机器上执行。如果你有8核cpu,你可以把cpu计算任务派发到8份子任务,最后汇总。这样任务得到了8倍加速。多进程会更擅长处理cpu-bound受限的任务。
- 在多threading,不需要多cpu。比如说你这个应用程序用来发送很多HTTP请求,然后等待网络响应到达后继续处理。在cpu等待的时间片内就可以用来处理别的线程任务。多threading擅长I/O受限的任务。
- asyncio是处理并发任务的高效方式。它指的是一种任务的并行处理方式 。asynio是你来通过代码决定哪里什么时候进行上下文切换,而不是像多threading是由cpu决定何时切换协程。
- CPython(主要的Python实现方案) 仍然有全局解释锁。所以多线程应用不是优先选择。这也是
multiprocessing
比threading
更推荐的原因。但是有些问题不需要分解成那么多份,特别是需要跨进程通信的场景。这也是multiprocessing
没有比threading
更推荐的原因.
这是一段伪代码来决定你的场景需要用什么样的并发方案。
if io_bound: | |
if io_very_slow: | |
print("Use Asyncio") | |
else: | |
print("Use Threads") | |
else: | |
print("Multi Processing") |
所以你应该这样考虑:
- CPU Bound => Multi Processing
- I/O Bound, Fast I/O, Limited Number of Connections => Multi Threading
- I/O Bound, Slow I/O, Many connections => Asyncio
I/O bound指的是应用需要花很多时间和慢的IO设备交谈, 比如说网络连接, 硬盘, 打印设备, 或者需要一定睡眠时间的event loop. 因此在blocking模式, 可以选择threading或者asyncio。如果IO很慢, 协程式多任务 (asyncio) 是更优的选择 (i.e. 避免资源过度争用, 死锁和竞争条件)
五、性能比较:
以下列举了3种并发方案执行test函数的运行时间对比:
concurrency | execute time |
none | 60.39s |
multiprocessiing | 53.61s |
threading | 48.23s |
asyncio | 47.92s |
代码运行次数:0
复制
Cloud Studio 代码运行
# "process_test.py" | |
from multiprocessing import Process | |
from threading import Thread | |
import time | |
import asyncio | |
start_time = time.time() | |
def test(): | |
num = 100000 | |
primes = 0 | |
for i in range(2, num + 1): | |
for j in range(2, i): | |
if i % j == 0: | |
break | |
else: | |
primes += 1 | |
print(primes) | |
async def async_test(): | |
num = 100000 | |
primes = 0 | |
for i in range(2, num + 1): | |
for j in range(2, i): | |
if i % j == 0: | |
break | |
else: | |
primes += 1 | |
print(primes) | |
async def call_tests(): | |
tasks = [] | |
for _ in range(0, 2): # 2 asyncio tasks | |
tasks.append(async_test()) | |
await asyncio.gather(*tasks) | |
if __name__ == "__main__": # This is needed to run processes on Windows | |
process_list = [] | |
test() | |
test() | |
print("non-concurrency:", round((time.time() - start_time), 2), "seconds") # non-concurrency | |
start_time = time.time() | |
process_list = [] | |
for _ in range(0, 2): # 2 processes | |
process = Process(target=test) | |
process_list.append(process) | |
for process in process_list: | |
process.start() | |
for process in process_list: | |
process.join() | |
print("multi-processing:", round((time.time() - start_time), 2), "seconds") # multi-processing | |
start_time = time.time() | |
thread_list = [] | |
for _ in range(0, 2): # 2 threads | |
thread = Thread(target=test) | |
thread_list.append(thread) | |
for thread in thread_list: | |
thread.start() | |
for thread in thread_list: | |
thread.join() | |
print("threading:", round((time.time() - start_time), 2), "seconds") # threading | |
start_time = time.time() | |
asyncio.run(call_tests()) | |
print("asyncio:", round((time.time() - start_time), 2), "seconds") # asyncio | |
输出: