Python提供了三种并发方案:multiprocessing,threading和asyncio。从名字来看就是多进程,多线程和异步io。但你知道他们都适合什么场景使用,各有什么优缺点吗?
一 多进程multiprocessiog
multiprocessing
是一个使用类似于该threading
模块的 API 支持生成进程的包。该multiprocessing
包提供本地和远程并发,通过使用子进程而不是线程有效地回避全局解释器锁。因此,它能充分利用给定机器上的多个处理器。
1.1 用法:
代码运行次数:0
复制
Cloud Studio 代码运行
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid()) # 打印父进程id
print('process id:', os.getpid()) # 打印当前进程id
def f(name):
info('function f') # 打印当前进程信息
print('hello', name)
if __name__ == '__main__':
info('main line') # 打印主进程信息
p = Process(target=f, args=('bob',)) # 生成子进程对象
p.start() # 执行子进程
p.join() # 等待子进程结束
这里列举了一个multiprocessing的另一种写法。一种通过map的处理多个输入写法。
这里的Pool设置并法度为5,之后并行执行f(1), f(2), f(3)
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
因为多进程能充分利用多CPU核,这里我介绍几个实战例子。
有两个文件logcolor.py和tailf.py
logcolor.py这里过滤log的error等级,对WARNING、ERROR和CRITICAL的日志分别以对应颜色打印到屏幕上。
代码运行次数:0
复制
Cloud Studio 代码运行
#!/usr/bin/env python
ESC = chr(0x1B)
RESET = ESC + '[0m'
BLACK = ESC + '[30m'
RED = ESC + '[31m'
GREEN = ESC + '[32m'
YELLOW = ESC + '[33m'
BLUE = ESC + '[34m'
MAGENTA = ESC + '[35m'
CYAN = ESC + '[36m'
WHITE = ESC + '[37m'
BGBLACK = ESC + '[40m'
BGRED = ESC + '[41m'
BGGREEN = ESC + '[42m'
BGYELLOW = ESC + '[43m'
BGBLUE = ESC + '[44m'
BGMAGENTA = ESC + '[45m'
BGCYAN = ESC + '[46m'
BGWHITE = ESC + '[47m'
def process(line):
if line.startswith('WARNING'): # 如果日志记录以WARNING开头
return YELLOW + line + RESET # 用黄色打印
if line.startswith('ERROR'): # 如果日志记录以ERROR开头
return BGRED + BLACK + line + RESET # 用红色背景黑字打印
if line.startswith('CRITICAL'): # 如果日志记录以CRITICLE开头
return BGCYAN + BLACK + line + RESET # 用绿色背景黑字打印
return line
def logcolor():
try:
while 1:
line = raw_input() # 读取输入fd
print process(line) # 执行颜色打印输出
except KeyboardInterrupt:
return
except EOFError:
return
if __name__ == '__main__':
logcolor()
tailf.py则对a.log的日志输出做监控。具体来说,启动了p1进程去读取a.log的内容。同时启动了p2进程对p1读到的内容进行颜色打印。
代码运行次数:0
复制
Cloud Studio 代码运行
def tail():
cmd = ['tail', '-100f', 'a.log']
pipe_stdout = subprocess.PIPE if os.isatty(1) else None
p1 = subprocess.Popen( # p1 执行tail 读取a.log的输出内容
cmd, stdout=pipe_stdout) # 将p1的输出stdout重定向到pipe_stdout,即p2的输入
if pipe_stdout:
p2 = subprocess.Popen([ # p2执行颜色处理a.log的内容
'python3', './logcolor.py'],
stdin=p1.stdout) # 将p2的输入stdin重定向到p1的输出
p1.stdout.close()
p2.wait()
p1.wait()
if __name__ == '__main__':
tail()
效果如下:
二、多threading
多threading的用法比较简单。threading.Thread生成实例,然后start(),最后join
代码运行次数:1
复制
Cloud Studio 代码运行
job_handler = JobHandler() # JobHandler定义了__call__方法
job_worker_thread = threading.Thread(None, job_handler, "JobWorker-{}".format(i))
job_worker_thread.daemon = True
job_worker_thread.start()
job_worker_thread.join()
看一个实际的例子,用来并发处理Job。
代码运行次数:0
复制
Cloud Studio 代码运行
from queue import Queue
import threading
class Job(object):
def __init__(self):
self.next_job = None
def set_next_job(self, next_job):
self.next_job = next_job
def handle(self, job_handler):
r, out = self.do()
if out:
out = (self.__class__.__name__, out)
else:
out = tuple()
if r is not None:
r = '>'.join([self.__class__.__name__, r])
else:
if self.next_job is not None:
self.next_job.trigger(job_handler)
return r, out
def trigger(self, job_handler):
job_handler.add_next_job(self)
def do(self): # 执行Job
raise NotImplementedError
class JobWrapper(Job):
def __init__(self, prefix, current_job, next_job):
super(JobWrapper, self).__init__()
self.prefix = prefix
self.current_job = current_job
self.set_next_job(next_job)
def handle(self, job_handler):
self.current_job.set_next_job(self.next_job)
job_res = self.current_job.handle(job_handler)
if job_res is not None:
r, out = job_res
if out:
out = (">".join([self.prefix, out[0]]), out[1])
else:
out = tuple()
if r is not None:
r = '>'.join([self.prefix, r])
return r, out
else:
return None
class ChainedJobs(Job):
def __init__(self, *jobs):
super(ChainedJobs, self).__init__()
self.jobs = jobs
def handle(self, job_handler):
next_job = self.next_job
for current_job in self.jobs[::-1]:
next_job = JobWrapper(self.__class__.__name__, current_job, next_job)
job_handler.add_next_job(next_job) #串联一系列job
class ParallelJobs(Job):
def __init__(self, *jobs):
super(ParallelJobs, self).__init__()
self.jobs = jobs
def add_job(self, job):
self.jobs.append(job)
class _NotifyJob(Job): # _NotifyJob继承了Job,改写了trigger方法
def __init__(self, remain, next_job):
super(ParallelJobs._NotifyJob, self).__init__()
self.set_next_job(next_job)
self.remain = remain
self.lock = threading.Lock()
def trigger(self, job_handler):
continue_next_job = False
with self.lock:
self.remain -= 1 # 使用remain计数,每次trigger都进行减1,也就是执行权提升1,直到trigger结束
if self.remain == 0:
continue_next_job = True
if continue_next_job and self.next_job:
self.next_job.trigger(job_handler)
def handle(self, job_handler):
notify_job = ParallelJobs._NotifyJob(len(self.jobs), self.next_job)
for job in self.jobs:
job.set_next_job(notify_job)
job.trigger(job_handler)
return None
class JobHandler(object):
def __init__(self, job_mgr, handler_id):
self.q = job_mgr.job_queue
self.outs, self.errors = [], []
def add_next_job(self, next_job):
self.q.put_nowait(next_job)
def __call__(self, *args, **kwargs):
while True:
j = self.q.get()
try:
jres = j.handle(self)
if jres is not None:
r, out = jres
if r is not None:
self.errors.append(r)
if out:
self.outs.append(out)
except Exception as e:
imolog.error("Unknown exception thrown in handler:" + str(e))
traceback.print_exc()
finally:
self.q.task_done()
def get_errors(self):
return self.errors
def get_outs(self):
return self.outs
class JobManager(object):
def __init__(self):
self.job_queue = Queue()
self.job_handlers = []
def start(self): #启动了33个线程
for i in range(1, 33):
job_handler = JobHandler(self, i)
job_worker_thread = threading.Thread(None, job_handler, "JobWorker-{}".format(i))
job_worker_thread.daemon = True
job_worker_thread.start()
self.job_handlers.append(job_handler) # all are non-deamon threads
def wait(self):
self.job_queue.join()
def collect_errors(self):
es = []
for job_handler in self.job_handlers:
es.extend(job_handler.get_errors())
return es
def collect_outs(self):
outs = []
for job_handler in self.job_handlers:
outs.extend(job_handler.get_outs())
return outs
def add_job(self, job):
self.job_queue.put_nowait(job)
if __name__ == '__main__':
job_manager = JobManager()
job_manager.start()
三、asyino方案
Python使用await 来等待coroutine的执行完成。协程使用async描述。
代码运行次数:0
复制
Cloud Studio 代码运行
import asyncio
from concurrent.futures import Executor
from functools import partial
from typing import Any, Callable, Optional, TypeVar
T = TypeVar("T")
async def run_in_executor( # 协程用async修饰
executor: Optional[Executor],
func: Callable[..., T],
/,
*args: Any,
**kwargs: Any,
) -> T:
"""
Run `func(*args, **kwargs)` asynchronously, using an executor.
If the executor is None, use the default ThreadPoolExecutor.
"""
return await asyncio.get_running_loop().run_in_executor(
executor,
partial(func, *args, **kwargs),
)
# Example usage for running `print` in a thread.
async def main():
await run_in_executor(None, print, "O" * 100_000)
asyncio.run(main())
四、三种方案比较
- Procession强调的是cpu运算,比如说你可以把shell命令放在subprocess.Popen里面执行。比如说你有命令要派发到很多机器上执行。如果你有8核cpu,你可以把cpu计算任务派发到8份子任务,最后汇总。这样任务得到了8倍加速。多进程会更擅长处理cpu-bound受限的任务。
- 在多threading,不需要多cpu。比如说你这个应用程序用来发送很多HTTP请求,然后等待网络响应到达后继续处理。在cpu等待的时间片内就可以用来处理别的线程任务。多threading擅长I/O受限的任务。
- asyncio是处理并发任务的高效方式。它指的是一种任务的并行处理方式 。asynio是你来通过代码决定哪里什么时候进行上下文切换,而不是像多threading是由cpu决定何时切换协程。
- CPython(主要的Python实现方案) 仍然有全局解释锁。所以多线程应用不是优先选择。这也是
multiprocessing
比threading
更推荐的原因。但是有些问题不需要分解成那么多份,特别是需要跨进程通信的场景。这也是multiprocessing
没有比threading
更推荐的原因.
这是一段伪代码来决定你的场景需要用什么样的并发方案。
if io_bound:
if io_very_slow:
print("Use Asyncio")
else:
print("Use Threads")
else:
print("Multi Processing")
所以你应该这样考虑:
- CPU Bound => Multi Processing
- I/O Bound, Fast I/O, Limited Number of Connections => Multi Threading
- I/O Bound, Slow I/O, Many connections => Asyncio
I/O bound指的是应用需要花很多时间和慢的IO设备交谈, 比如说网络连接, 硬盘, 打印设备, 或者需要一定睡眠时间的event loop. 因此在blocking模式, 可以选择threading或者asyncio。如果IO很慢, 协程式多任务 (asyncio) 是更优的选择 (i.e. 避免资源过度争用, 死锁和竞争条件)
五、性能比较:
以下列举了3种并发方案执行test函数的运行时间对比:
concurrency | execute time |
none | 60.39s |
multiprocessiing | 53.61s |
threading | 48.23s |
asyncio | 47.92s |
代码运行次数:0
复制
Cloud Studio 代码运行
# "process_test.py"
from multiprocessing import Process
from threading import Thread
import time
import asyncio
start_time = time.time()
def test():
num = 100000
primes = 0
for i in range(2, num + 1):
for j in range(2, i):
if i % j == 0:
break
else:
primes += 1
print(primes)
async def async_test():
num = 100000
primes = 0
for i in range(2, num + 1):
for j in range(2, i):
if i % j == 0:
break
else:
primes += 1
print(primes)
async def call_tests():
tasks = []
for _ in range(0, 2): # 2 asyncio tasks
tasks.append(async_test())
await asyncio.gather(*tasks)
if __name__ == "__main__": # This is needed to run processes on Windows
process_list = []
test()
test()
print("non-concurrency:", round((time.time() - start_time), 2), "seconds") # non-concurrency
start_time = time.time()
process_list = []
for _ in range(0, 2): # 2 processes
process = Process(target=test)
process_list.append(process)
for process in process_list:
process.start()
for process in process_list:
process.join()
print("multi-processing:", round((time.time() - start_time), 2), "seconds") # multi-processing
start_time = time.time()
thread_list = []
for _ in range(0, 2): # 2 threads
thread = Thread(target=test)
thread_list.append(thread)
for thread in thread_list:
thread.start()
for thread in thread_list:
thread.join()
print("threading:", round((time.time() - start_time), 2), "seconds") # threading
start_time = time.time()
asyncio.run(call_tests())
print("asyncio:", round((time.time() - start_time), 2), "seconds") # asyncio
输出: