python協(xié)程與asyncio庫(kù)詳情
python 中協(xié)程概念是從 3.4 版本增加的,但 3.4 版本采用是生成器實(shí)現(xiàn),為了將協(xié)程和生成器的使用場(chǎng)景進(jìn)行區(qū)分,使語(yǔ)義更加明確,在 python 3.5 中增加了?async?和?await?關(guān)鍵字,用于定義原生協(xié)程。...
前言:
python 中協(xié)程概念是從 3.4 版本增加的,但 3.4 版本采用是生成器實(shí)現(xiàn),為了將協(xié)程和生成器的使用場(chǎng)景進(jìn)行區(qū)分,使語(yǔ)義更加明確,在 python 3.5 中增加了 async
和 await
關(guān)鍵字,用于定義原生協(xié)程。
1.asyncio 異步 i/o 庫(kù)
python 中的 asyncio 庫(kù)提供了管理事件、協(xié)程、任務(wù)和線程的方法,以及編寫(xiě)并發(fā)代碼的原語(yǔ),即 async
和 await
。
該模塊的主要內(nèi)容:
- 事件循環(huán):event_loop,管理所有的事件,是一個(gè)無(wú)限循環(huán)方法,在循環(huán)過(guò)程中追蹤事件發(fā)生的順序?qū)⑺鼈兎旁陉?duì)列中,空閑時(shí)則調(diào)用相應(yīng)的事件處理者來(lái)處理這些事件;
- 協(xié)程:
coroutine
,子程序的泛化概念,協(xié)程可以在執(zhí)行期間暫停,等待外部的處理(i/o 操作)完成之后,再?gòu)臅和5牡胤嚼^續(xù)運(yùn)行,函數(shù)定義式使用async
關(guān)鍵字,這樣這個(gè)函數(shù)就不會(huì)立即執(zhí)行,而是返回一個(gè)協(xié)程對(duì)象; future
和task
:future
對(duì)象表示尚未完成的計(jì)算,task
是future
的子類(lèi),包含了任務(wù)的各個(gè)狀態(tài),作用是在運(yùn)行某個(gè)任務(wù)的同時(shí)可以并發(fā)的運(yùn)行多個(gè)任務(wù)。
異步函數(shù)的定義
異步函數(shù)本質(zhì)上依舊是函數(shù),只是在執(zhí)行過(guò)程中會(huì)將執(zhí)行權(quán)交給其它協(xié)程,與普通函數(shù)定義的區(qū)別是在 def
關(guān)鍵字前增加 async
。
# 異步函數(shù)
import asyncio
# 異步函數(shù)
async def func(x):
????print("異步函數(shù)")
????return x ** 2
ret = func(2)
print(ret)
運(yùn)行代碼輸入如下內(nèi)容:
sys:1: runtimewarning: coroutine 'func' was never awaited
<coroutine object func at 0x0000000002c8c248>
函數(shù)返回一個(gè)協(xié)程對(duì)象,如果想要函數(shù)得到執(zhí)行,需要將其放到事件循環(huán) event_loop
中。
事件循環(huán) event_loop
event_loop
是 asyncio
模塊的核心,它將異步函數(shù)注冊(cè)到事件循環(huán)上。 過(guò)程實(shí)現(xiàn)方式為:由 loop
在適當(dāng)?shù)臅r(shí)候調(diào)用協(xié)程,這里使用的方式名為 asyncio.get_event_loop()
,然后由 run_until_complete(協(xié)程對(duì)象)
將協(xié)程注冊(cè)到事件循環(huán)中,并啟動(dòng)事件循環(huán)。
import asyncio
# 異步函數(shù)
async def func(x):
????print("異步函數(shù)")
????return x ** 2
# 協(xié)程對(duì)象,該對(duì)象不能直接運(yùn)行
coroutine1 = func(2)
# 事件循環(huán)對(duì)象
loop = asyncio.get_event_loop()
# 將協(xié)程對(duì)象加入到事件循環(huán)中,并執(zhí)行
ret = loop.run_until_complete(coroutine1)
print(ret)
首先在 python 3.7 之前的版本中使用異步函數(shù)是安裝上述流程:
- 先通過(guò)
asyncio.get_event_loop()
獲取事件循環(huán)loop
對(duì)象; - 然后通過(guò)不同的策略調(diào)用
loop.run_until_complete()
或者loop.run_forever()
執(zhí)行異步函數(shù)。
在 python 3.7 之后的版本,直接使用 asyncio.run()
即可,該函數(shù)總是會(huì)創(chuàng)建一個(gè)新的事件循環(huán)并在結(jié)束時(shí)進(jìn)行關(guān)閉。
最新的官方文檔 都采用的是run
方法。 官方案例
import asyncio
async def main():
????print('hello')
????await asyncio.sleep(1)
????print('world')
asyncio.run(main())
接下來(lái)在查看一個(gè)完整的案例,并且結(jié)合await
關(guān)鍵字。
import asyncio
import time
# 異步函數(shù)1
async def task1(x):
????print("任務(wù)1")
????await asyncio.sleep(2)
????print("恢復(fù)任務(wù)1")
????return x
# 異步函數(shù)2
async def task2(x):
????print("任務(wù)2")
????await asyncio.sleep(1)
????print("恢復(fù)任務(wù)2")
????return x
async def main():
????start_time = time.perf_counter()
????ret_1 = await task1(1)
????ret_2 = await task2(2)
????print("任務(wù)1 返回的值是", ret_1)
????print("任務(wù)2 返回的值是", ret_2)
????print("運(yùn)行時(shí)間", time.perf_counter() - start_time)
if __name__ == '__main__':
????# 創(chuàng)建一個(gè)事件循環(huán)
????loop = asyncio.get_event_loop()
????# 將協(xié)程對(duì)象加入到事件循環(huán)中,并執(zhí)行
????loop.run_until_complete(main())
代碼輸出如下所示:
任務(wù)1
恢復(fù)任務(wù)1
任務(wù)2
恢復(fù)任務(wù)2
任務(wù)1 返回的值是 1
任務(wù)2 返回的值是 2
運(yùn)行時(shí)間 2.99929154
上述代碼創(chuàng)建了 3 個(gè)協(xié)程,其中 task1
和 task2
都放在了協(xié)程函數(shù) main
中,i/o 操作通過(guò) asyncio.sleep(1)
進(jìn)行模擬,整個(gè)函數(shù)運(yùn)行時(shí)間為 2.9999 秒,接近 3 秒,依舊是串行進(jìn)行,如果希望修改為并發(fā)執(zhí)行,將代碼按照下述進(jìn)行修改。
import asyncio
import time
# 異步函數(shù)1
async def task1(x):
????print("任務(wù)1")
????await asyncio.sleep(2)
????print("恢復(fù)任務(wù)1")
????return x
# 異步函數(shù)2
async def task2(x):
????print("任務(wù)2")
????await asyncio.sleep(1)
????print("恢復(fù)任務(wù)2")
????return x
async def main():
????start_time = time.perf_counter()
????ret_1,ret_2 = await asyncio.gather(task1(1),task2(2))
????print("任務(wù)1 返回的值是", ret_1)
????print("任務(wù)2 返回的值是", ret_2)
????print("運(yùn)行時(shí)間", time.perf_counter() - start_time)
if __name__ == '__main__':
????loop = asyncio.get_event_loop()
????loop.run_until_complete(main())
上述代碼最大的變化是將task1
和task2
放到了asyncio.gather()
中運(yùn)行,此時(shí)代碼輸出時(shí)間明顯變短。
任務(wù)1
任務(wù)2
恢復(fù)任務(wù)2 # 任務(wù)2 由于等待時(shí)間短,先返回。
恢復(fù)任務(wù)1
任務(wù)1 返回的值是 1
任務(wù)2 返回的值是 2
運(yùn)行時(shí)間 2.0005669480000003
asyncio.gather()
可以更換為asyncio.wait()
,修改代碼如下所示:
import asyncio
import time
# 異步函數(shù)1
async def task1(x):
????print("任務(wù)1")
????await asyncio.sleep(2)
????print("恢復(fù)任務(wù)1")
????return x
# 異步函數(shù)2
async def task2(x):
????print("任務(wù)2")
????await asyncio.sleep(1)
????print("恢復(fù)任務(wù)2")
????return x
async def main():
????start_time = time.perf_counter()
????done, pending = await asyncio.wait([task1(1), task2(2)])
????print(done)
????print(pending)
????print("運(yùn)行時(shí)間", time.perf_counter() - start_time)
if __name__ == '__main__':
????loop = asyncio.get_event_loop()
????loop.run_until_complete(main())
asyncio.wait()
返回一個(gè)元組,其中包含一個(gè)已經(jīng)完成的任務(wù)集合,一個(gè)未完成任務(wù)的集合。
gather 和 wait 的區(qū)別:
gather
:需要所有任務(wù)都執(zhí)行結(jié)束,如果任意一個(gè)協(xié)程函數(shù)崩潰了,都會(huì)拋異常,不會(huì)返回結(jié)果;wait
:可以定義函數(shù)返回的時(shí)機(jī),可以設(shè)置為first_completed
(第一個(gè)結(jié)束的),first_exception
(第一個(gè)出現(xiàn)異常的),all_completed
(全部執(zhí)行完,默認(rèn)的)。
done,pending = await asyncio.wait([task1(1),task2(2)],return_when=asyncio.tasks.first_exception)
創(chuàng)建 task
由于協(xié)程對(duì)象不能直接運(yùn)行,在注冊(cè)到事件循環(huán)時(shí),是run_until_complete
方法將其包裝成一個(gè) task
對(duì)象。該對(duì)象是對(duì)coroutine
對(duì)象的進(jìn)一步封裝,它比coroutine
對(duì)象多了運(yùn)行狀態(tài),例如 pending
,running
,finished
,可以利用這些狀態(tài)獲取協(xié)程對(duì)象的執(zhí)行情況。
下面顯示的將coroutine
對(duì)象封裝成task
對(duì)象,在上述代碼基礎(chǔ)上進(jìn)行修改。
import asyncio
import time
# 異步函數(shù)1
async def task1(x):
????print("任務(wù)1")
????await asyncio.sleep(2)
????print("恢復(fù)任務(wù)1")
????return x
# 異步函數(shù)2
async def task2(x):
????print("任務(wù)2")
????await asyncio.sleep(1)
????print("恢復(fù)任務(wù)2")
????return x
async def main():
????start_time = time.perf_counter()
????# 封裝 task 對(duì)象
????coroutine1 = task1(1)
????task_1 = loop.create_task(coroutine1)
????coroutine2 = task2(2)
????task_2 = loop.create_task(coroutine2)
????ret_1, ret_2 = await asyncio.gather(task_1, task_2)
????print("任務(wù)1 返回的值是", ret_1)
????print("任務(wù)2 返回的值是", ret_2)
????print("運(yùn)行時(shí)間", time.perf_counter() - start_time)
if __name__ == '__main__':
????loop = asyncio.get_event_loop()
????loop.run_until_complete(main())
由于task
對(duì)象是future
對(duì)象的子類(lèi)對(duì)象,所以上述代碼也可以按照下述內(nèi)容修改:
# task_2 = loop.create_task(coroutine2)
task_2 = asyncio.ensure_future(coroutine2)
下面將task
對(duì)象的各個(gè)狀態(tài)進(jìn)行打印輸出。
import asyncio
import time
# 異步函數(shù)1
async def task1(x):
????print("任務(wù)1")
????await asyncio.sleep(2)
????print("恢復(fù)任務(wù)1")
????return x
# 異步函數(shù)2
async def task2(x):
????print("任務(wù)2")
????await asyncio.sleep(1)
????print("恢復(fù)任務(wù)2")
????return x
async def main():
????start_time = time.perf_counter()
????# 封裝 task 對(duì)象
????coroutine1 = task1(1)
????task_1 = loop.create_task(coroutine1)
????coroutine2 = task2(2)
????# task_2 = loop.create_task(coroutine2)
????task_2 = asyncio.ensure_future(coroutine2)
????# 進(jìn)入 pending 狀態(tài)
????print(task_1)
????print(task_2)
????# 獲取任務(wù)的完成狀態(tài)
????print(task_1.done(), task_2.done())
????# 執(zhí)行任務(wù)
????await task_1
????await task_2
????# 再次獲取完成狀態(tài)
????print(task_1.done(), task_2.done())
????# 獲取返回結(jié)果
????print(task_1.result())
????print(task_2.result())
????print("運(yùn)行時(shí)間", time.perf_counter() - start_time)
if __name__ == '__main__':
????loop = asyncio.get_event_loop()
????loop.run_until_complete(main())
await task_1
表示的是執(zhí)行該協(xié)程,執(zhí)行結(jié)束之后,task.done()
返回 true
,task.result()
獲取返回值。
回調(diào)返回值
當(dāng)協(xié)程執(zhí)行完畢,需要獲取其返回值,剛才已經(jīng)演示了一種辦法,使用 task.result()
方法獲取,但是該方法僅當(dāng)協(xié)程運(yùn)行完畢時(shí),才能獲取結(jié)果,如果協(xié)程沒(méi)有運(yùn)行完畢,result()
方法會(huì)返回 asyncio.invalidstateerror
(無(wú)效狀態(tài)錯(cuò)誤)。
一般編碼都采用第二種方案,通過(guò)add_done_callback()
方法綁定回調(diào)。
import asyncio
import requests
async def request_html():
????url = 'https://www.csdn.net'
????res = requests.get(url)
????return res.status_code
def callback(task):
????print('回調(diào):', task.result())
loop = asyncio.get_event_loop()
coroutine = request_html()
task = loop.create_task(coroutine)
# 綁定回調(diào)
task.add_done_callback(callback)
print(task)
print("*"*100)
loop.run_until_complete(task)
print(task)
上述代碼當(dāng)coroutine
執(zhí)行完畢時(shí),會(huì)調(diào)用callback
函數(shù)。
如果回調(diào)函數(shù)需要多個(gè)參數(shù),請(qǐng)使用functools
模塊中的偏函數(shù)(partial
)方法
循環(huán)事件關(guān)閉
建議每次編碼結(jié)束之后,都調(diào)用循環(huán)事件對(duì)象close()
方法,徹底清理loop
對(duì)象。
2.本節(jié)爬蟲(chóng)項(xiàng)目
本節(jié)課要采集的站點(diǎn)由于全部都是 coser 圖片,所以地址在代碼中查看即可。
完整代碼如下所示:
import threading
import asyncio
import time
import requests
import lxml
from bs4 import beautifulsoup
async def get(url):
????return requests.get(url)
async def get_html(url):
????print("準(zhǔn)備抓取:", url)
????res = await get(url)
????return res.text
async def save_img(img_url):
????# thumbmid_5ae3e05fd3945 將小圖替換為大圖
????img_url = img_url.replace('thumb','thumbmid')
????img_url = "http://mycoser.com/" + img_url
????print("圖片下載中:", img_url)
????res = await get(img_url)
????if res is not none:
????????with open(f'./imgs/{time.time()}.jpg', 'wb') as f:
????????????f.write(res.content)
????????????return img_url,"ok"
async def main(url_list):
????# 創(chuàng)建 5 個(gè)任務(wù)
????tasks = [asyncio.ensure_future(get_html(url_list[_])) for _ in range(len(url_list))]
????dones, pending = await asyncio.wait(tasks)
????for task in dones:
????????html = task.result()
????????soup = beautifulsoup(html, 'lxml')
????????divimg_tags = soup.find_all(attrs={'class': 'workimage'})
????????for div in divimg_tags:
????????????ret = await save_img(div.a.img["data-original"])
????????????print(ret)
if __name__ == '__main__':
????urls = [f"http://mycoser.com/picture/lists/p/{page}" for page in range(1, 17)]
????totle_page = len(urls) // 5 if len(urls) % 5 == 0 else len(urls) // 5 + 1
????# 對(duì) urls 列表進(jìn)行切片,方便采集
????for page in range(0, totle_page):
????????start_page = 0 if page == 0 else page * 5
????????end_page = (page + 1) * 5
????????# 循環(huán)事件對(duì)象
????????loop = asyncio.get_event_loop()
????????loop.run_until_complete(main(urls[start_page:end_page]))
代碼說(shuō)明:上述代碼中第一個(gè)要注意的是await
關(guān)鍵字后面只能跟如下內(nèi)容:
- 原生的協(xié)程對(duì)象;
- 一個(gè)包含
await
方法的對(duì)象返回的一個(gè)迭代器。
所以上述代碼get_html
函數(shù)中嵌套了一個(gè)協(xié)程 get
。主函數(shù) main
里面為了運(yùn)算方便,直接對(duì) urls 進(jìn)行了切片,然后通過(guò)循環(huán)進(jìn)行運(yùn)行。
當(dāng)然上述代碼的最后兩行,可以直接修改為:
# 循環(huán)事件對(duì)象
# loop = asyncio.get_event_loop()
#
# loop.run_until_complete(main(urls[start_page:end_page]))
asyncio.run(main(urls[start_page:end_page]))
輕松獲取一堆高清圖片:
到此這篇關(guān)于python協(xié)程與 asyncio 庫(kù)詳情的文章就介紹到這了,更多相關(guān)python 協(xié)程內(nèi)容請(qǐng)搜索以前的文章或繼續(xù)瀏覽下面的相關(guān)文章
Python之父再發(fā)聲:我們能為中國(guó)的“996”程序員做什么?
日前,Python之父再度為“中國(guó)程序員996工作制”發(fā)聲,他在Python上發(fā)帖表示,一周前一些中國(guó)程序員創(chuàng)建了996.icu抱怨惡劣的工作條件,現(xiàn)在該網(wǎng)站已被各種中國(guó)瀏覽器禁止...