接上篇继续。
9 【案例】创建1个进程执⾏任务
python中的多线程其实并不是真正的多线程,如果想要充分地使⽤多核CPU的资源,在python中
⼤部分情况需要使⽤多进程,尤其是密集型计算任务
import multiprocessing | |
import time | |
def worker(interval): | |
print(f"The time is {time.ctime()}") | |
for i in range(interval): | |
# 做⼀些数值计算任务 | |
# ⾮常消耗cpu等 | |
print("---", sep='^') | |
if __name__ == "__main__": | |
p = multiprocessing.Process(target = worker, args = (1000,)) | |
p.start() | |
print("p.pid:", p.pid) | |
print("p.name:", p.name) | |
print("p.is_alive:", p.is_alive()) |
10 【案例】创建3个进程并⾏执⾏任务
将⼀个任务拆分为3个⼦任务,放到每个进程中并⾏执⾏
⼀个任务:计算从1到1500的整数的平⽅,并返回。
假定我们使⽤三个进程来做,可以将任务分为三部分: [1,500) , [500,1000) ,
[1000,1500) ,分别放到三个进程中:
import multiprocessing | |
import time | |
task = list(range(1, 1501)) | |
def subtask_1(): | |
print("subtask_1") | |
subtask = task[:500] | |
res = [] | |
for val in subtask: | |
res.append(val**2) | |
print("> end subtask_1") | |
def subtask_2(): | |
print("subtask_2") | |
subtask = task[500:1000] | |
res = [] | |
for val in subtask: | |
res.append(val**2) | |
print("> end subtask_2") | |
def subtask_3(): | |
print("subtask_3") | |
subtask = task[1000:1500] | |
res = [] | |
for val in subtask: | |
res.append(val**2) | |
print("> end subtask_3") | |
if __name__ == "__main__": | |
p1 = multiprocessing.Process(target = subtask_1) | |
p2 = multiprocessing.Process(target = subtask_2) | |
p3 = multiprocessing.Process(target = subtask_3) | |
p1.start() | |
p2.start() | |
p3.start() |
11 使⽤进程池⾼效管理多进程
当被操作对象数⽬不⼤时,可以直接利⽤multiprocessing中的Process动态成⽣多个进程,⼗⼏
个还好,但如果是上百个,上千个⽬标,⼿动的去限制进程数量却⼜太过繁琐,此时可以发挥进
程池的功效。
⽐如,在利⽤Python进⾏系统管理的时候,特别是同时操作多个⽂件⽬录,或者远程控制多台主
机,并⾏操作可以节约⼤量的时间。
Pool可以提供指定数量的进程,供⽤户调⽤,当有新的请求提交到pool中时,如果池还没有满,
那么就会创建⼀个新的进程⽤来执⾏该请求;
但如果池中的进程数已经达到规定最⼤值,那么该请求就会等待,直到池中有进程结束,才会创
建新的进程来执⾏它。
import multiprocessing | |
task = list(range(1, 150001)) | |
def worker(start, end, process_i): | |
print(f"subtask_{process_i}") | |
subtask = task[start:end] | |
res = [] | |
for val in subtask: | |
res.append(val ** 2) | |
# print(res) | |
print(f"> end subtask_{process_i}") | |
# 进程池 | |
if __name__ == "__main__": | |
pool = multiprocessing.Pool(processes=3) | |
n = 5 | |
start = 0 | |
end = step = 150000 // n | |
for i in range(n): | |
pool.apply_async(func=worker, args=(start, end, i + 1)) | |
start, end = start + step, end + step | |
pool.close() | |
pool.join() | |
print("programming done") |
12 使⽤进程池并关注获取每个进程返回结果
result.append(pool.apply_async(func, (msg, ))) | |
for res in result: | |
res.get() |
13 使⽤Queue实现多进程之间的数据传递
import multiprocessing | |
def writer_proc(q): | |
q.put("write data to mysql") | |
def reader_proc(q): | |
res = q.get() | |
res2 = res.replace("write", "read").replace("to", "from") | |
print(res2) | |
if __name__ == "__main__": | |
q = multiprocessing.Queue() | |
writer = multiprocessing.Process(target=writer_proc, args=(q,)) | |
writer.start() | |
reader = multiprocessing.Process(target=reader_proc, args=(q,)) | |
reader.start() | |
reader.join() | |
writer.join() |
14 什么是协程?它和线程有什么区别?
协程,是运⾏在单个线程中的”并发“
协程与多线程相⽐,有哪些优势?
第⼀,使⽤协程,单个线程中就能做到并发执⾏IO任务;
⽽使⽤线程模型实现IO任务的并发,必须要创建多个线程,⽽多个线程的创建和切换都耗费⽐使
⽤协程更多的时间和资源。
这个区别是明显的,协程相⽐于多线程执⾏效率更⾼。
第⼆,协程何时执⾏、何处中断都完全受开发者的控制,⽽多线程启动后完全受操作系统的控
制,线程的终⽌也完全受操作系统控制。
15 如何创建⼀个协程并运⾏?
从Python3.5后,Python在函数或⽅法前添加async,函数或⽅法就变为⼀个协程。
如下所示,print_hello就是最简单的⼀个协程:
async def print_hello(): | |
print('hello world') |
直接调⽤print_hello函数,并没有打印出结果,⽽是显示协程对象,如下所示:
<coroutine object print_hello at 0x7fbbc96596c0>
要想运⾏⼀个协程,必须扔协程到asyncio的run⽅法中,如下所示:
import asyncio | |
asyncio.run(print_hello()) |
执⾏后,才能正常打印结果:hello world
16 【案例】编程实现多协程并发执⾏任务
import asyncio | |
async def cook_food1(): | |
print("开始炒地三鲜") | |
await asyncio.sleep(3) | |
print("地三鲜出锅") | |
async def cook_food2(): | |
print("开始炒回锅⾁") | |
await asyncio.sleep(3) | |
print("回锅⾁出锅") | |
if __name__ == "__main__": | |
event_loop = asyncio.get_event_loop() | |
event_loop.run_until_complete(asyncio.gather(cook_food1(), cook_food2())) | |
event_loop.close() |
17 【案例】协程实现多任务异步爬⾍案例
使⽤异步web请求框架 aiohttp ,实现异步爬取多个⽹⻚。
如下所示,共使⽤以下模块:
import datetime | |
import asyncio | |
import aiohttp |
使⽤ async 创建协程 crawler_url ,它的第⼀个参数是客户端的session,使⽤ aiohttp
的 ClientSession 创建;第⼆个参数是带爬取的 url
await 实现发起异步请求 url ⽹⻚,同时分别打印 await 前的时间,以及打印获得响应后的
时间,如下所示:
async def crawler_url(session, url): | |
print(f"{datetime.datetime.now().strftime('%H:%M:%S')} 开始请求 {url}") | |
resp = await session.request(method="GET", url=url) | |
print(f"{datetime.datetime.now().strftime('%H:%M:%S')} GET {url} | |
async_crawler 协程实现多任务异步爬取,如下所示,分别创建爬取两个⽹址的对应任务
task1 、 task2 ,最后在这个线程中等待所有任务结束,程序才终⽌:
async def async_crawler(): | |
tasks = [] | |
async with aiohttp.ClientSession() as session: | |
task1 = crawler_url(session, 'https://docs.python.org/zh-cn/3/library | |
task2 = crawler_url(session, 'http://www.zglg.work/') | |
tasks.append(task1) | |
tasks.append(task2) | |
await asyncio.gather(*tasks) |
下⾯执⾏调⽤协程 async_crawler ,如下所示:
if __name__ == "__main__": | |
asyncio.run(async_crawler()) |
【小结】
文章基于Python语言详细介绍了多线程、协程和多进程并发编程。无论哪门编程语言,多线程和高并发都是技术进阶的必备知识之一。