前言:

python 中协程概念是从 3.4 版本增加的,但 3.4 版本采用是生成器实现,为了将协程和生成器的使用场景进行区分,使语义更加明确,在 python 3.5 中增加了 async 和 await 关键字,用于定义原生协程。

1.asyncio 异步 I/O 库

python 中的 asyncio 库提供了管理事件、协程、任务和线程的方法,以及编写并发代码的原语,即 async 和 await

该模块的主要内容:

  • 事件循环:event_loop,管理所有的事件,是一个无限循环方法,在循环过程中追踪事件发生的顺序将它们放在队列中,空闲时则调用相应的事件处理者来处理这些事件;
  • 协程:coroutine,子程序的泛化概念,协程可以在执行期间暂停,等待外部的处理(I/O 操作)完成之后,再从暂停的地方继续运行,函数定义式使用 async关键字,这样这个函数就不会立即执行,而是返回一个协程对象;
  • FutureTaskFuture对象表示尚未完成的计算,Task是 Future的子类,包含了任务的各个状态,作用是在运行某个任务的同时可以并发的运行多个任务。

异步函数的定义

异步函数本质上依旧是函数,只是在执行过程中会将执行权交给其它协程,与普通函数定义的区别是在 def关键字前增加 async

# 异步函数
import asyncio
# 异步函数
async def func(x):
    print("异步函数")
    return x ** 2
ret = func(2)
print(ret)

运行代码输入如下内容:

sys:1: RuntimeWarning: coroutine 'func' was never awaited
<coroutine object func at 0x0000000002C8C248>

函数返回一个协程对象,如果想要函数得到执行,需要将其放到事件循环 event_loop中。

事件循环 event_loop

event_loop是 asyncio模块的核心,它将异步函数注册到事件循环上。 过程实现方式为:由 loop在适当的时候调用协程,这里使用的方式名为 asyncio.get_event_loop(),然后由 run_until_complete(协程对象) 将协程注册到事件循环中,并启动事件循环。

import asyncio
# 异步函数
async def func(x):
    print("异步函数")
    return x ** 2
# 协程对象,该对象不能直接运行
coroutine1 = func(2)
# 事件循环对象
loop = asyncio.get_event_loop()
# 将协程对象加入到事件循环中,并执行
ret = loop.run_until_complete(coroutine1)
print(ret)

首先在 python 3.7 之前的版本中使用异步函数是安装上述流程:

  • 先通过 asyncio.get_event_loop()获取事件循环loop对象;
  • 然后通过不同的策略调用 loop.run_until_complete()或者loop.run_forever()执行异步函数。

在 python 3.7 之后的版本,直接使用 asyncio.run() 即可,该函数总是会创建一个新的事件循环并在结束时进行关闭。

最新的官方文档 都采用的是run方法。 官方案例

import asyncio
async def main():
    print('hello')
    await asyncio.sleep(1)
    print('world')
asyncio.run(main())

接下来在查看一个完整的案例,并且结合await关键字。

import asyncio
import time
# 异步函数1
async def task1(x):
    print("任务1")
    await asyncio.sleep(2)
    print("恢复任务1")
    return x
# 异步函数2
async def task2(x):
    print("任务2")
    await asyncio.sleep(1)
    print("恢复任务2")
    return x
async def main():
    start_time = time.perf_counter()
    ret_1 = await task1(1)
    ret_2 = await task2(2)
    print("任务1 返回的值是", ret_1)
    print("任务2 返回的值是", ret_2)
    print("运行时间", time.perf_counter() - start_time)
if __name__ == '__main__':
	# 创建一个事件循环
    loop = asyncio.get_event_loop()
    # 将协程对象加入到事件循环中,并执行
    loop.run_until_complete(main())

代码输出如下所示:

任务1
恢复任务1
任务2
恢复任务2
任务1 返回的值是 1
任务2 返回的值是 2
运行时间 2.99929154

上述代码创建了 3 个协程,其中 task1和 task2都放在了协程函数 main中,I/O 操作通过 asyncio.sleep(1)进行模拟,整个函数运行时间为 2.9999 秒,接近 3 秒,依旧是串行进行,如果希望修改为并发执行,将代码按照下述进行修改。

import asyncio
import time
# 异步函数1
async def task1(x):
    print("任务1")
    await asyncio.sleep(2)
    print("恢复任务1")
    return x
# 异步函数2
async def task2(x):
    print("任务2")
    await asyncio.sleep(1)
    print("恢复任务2")
    return x
async def main():
    start_time = time.perf_counter()
    ret_1,ret_2 = await asyncio.gather(task1(1),task2(2))
    print("任务1 返回的值是", ret_1)
    print("任务2 返回的值是", ret_2)
    print("运行时间", time.perf_counter() - start_time)
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

上述代码最大的变化是将task1task2放到了asyncio.gather()中运行,此时代码输出时间明显变短。

任务1
任务2
恢复任务2 # 任务2 由于等待时间短,先返回。
恢复任务1
任务1 返回的值是 1
任务2 返回的值是 2
运行时间 2.0005669480000003

asyncio.gather()可以更换为asyncio.wait()修改代码如下所示:

import asyncio
import time
# 异步函数1
async def task1(x):
    print("任务1")
    await asyncio.sleep(2)
    print("恢复任务1")
    return x
# 异步函数2
async def task2(x):
    print("任务2")
    await asyncio.sleep(1)
    print("恢复任务2")
    return x
async def main():
    start_time = time.perf_counter()
    done, pending = await asyncio.wait([task1(1), task2(2)])
    print(done)
    print(pending)
    print("运行时间", time.perf_counter() - start_time)
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

asyncio.wait()返回一个元组,其中包含一个已经完成的任务集合,一个未完成任务的集合。

gather 和 wait 的区别:

  • gather:需要所有任务都执行结束,如果任意一个协程函数崩溃了,都会抛异常,不会返回结果;
  • wait:可以定义函数返回的时机,可以设置为 FIRST_COMPLETED(第一个结束的), FIRST_EXCEPTION(第一个出现异常的), ALL_COMPLETED(全部执行完,默认的)。
done,pending = await asyncio.wait([task1(1),task2(2)],return_when=asyncio.tasks.FIRST_EXCEPTION)

创建 task

由于协程对象不能直接运行,在注册到事件循环时,是run_until_complete方法将其包装成一个 task对象。该对象是对coroutine对象的进一步封装,它比coroutine对象多了运行状态,例如 pendingrunningfinished,可以利用这些状态获取协程对象的执行情况。

下面显示的将coroutine对象封装成task对象,在上述代码基础上进行修改。

import asyncio
import time
# 异步函数1
async def task1(x):
    print("任务1")
    await asyncio.sleep(2)
    print("恢复任务1")
    return x
# 异步函数2
async def task2(x):
    print("任务2")
    await asyncio.sleep(1)
    print("恢复任务2")
    return x
async def main():
    start_time = time.perf_counter()
    # 封装 task 对象
    coroutine1 = task1(1)
    task_1 = loop.create_task(coroutine1)
    coroutine2 = task2(2)
    task_2 = loop.create_task(coroutine2)
    ret_1, ret_2 = await asyncio.gather(task_1, task_2)
    print("任务1 返回的值是", ret_1)
    print("任务2 返回的值是", ret_2)
    print("运行时间", time.perf_counter() - start_time)
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

由于task对象是future对象的子类对象,所以上述代码也可以按照下述内容修改:

# task_2 = loop.create_task(coroutine2)
task_2 = asyncio.ensure_future(coroutine2)

下面将task对象的各个状态进行打印输出。

import asyncio
import time
# 异步函数1
async def task1(x):
    print("任务1")
    await asyncio.sleep(2)
    print("恢复任务1")
    return x
# 异步函数2
async def task2(x):
    print("任务2")
    await asyncio.sleep(1)
    print("恢复任务2")
    return x
async def main():
    start_time = time.perf_counter()
    # 封装 task 对象
    coroutine1 = task1(1)
    task_1 = loop.create_task(coroutine1)
    coroutine2 = task2(2)
    # task_2 = loop.create_task(coroutine2)
    task_2 = asyncio.ensure_future(coroutine2)
    # 进入 pending 状态
    print(task_1)
    print(task_2)
    # 获取任务的完成状态
    print(task_1.done(), task_2.done())
    # 执行任务
    await task_1
    await task_2
    # 再次获取完成状态
    print(task_1.done(), task_2.done())
    # 获取返回结果
    print(task_1.result())
    print(task_2.result())
    print("运行时间", time.perf_counter() - start_time)
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

await task_1表示的是执行该协程,执行结束之后,task.done()返回 Truetask.result()获取返回值。

回调返回值

当协程执行完毕,需要获取其返回值,刚才已经演示了一种办法,使用 task.result()方法获取,但是该方法仅当协程运行完毕时,才能获取结果,如果协程没有运行完毕,result()方法会返回 asyncio.InvalidStateError(无效状态错误)。

一般编码都采用第二种方案,通过add_done_callback()方法绑定回调。

import asyncio
import requests
async def request_html():
    url = 'https://www.csdn.net'
    res = requests.get(url)
    return res.status_code
def callback(task):
    print('回调:', task.result())
loop = asyncio.get_event_loop()
coroutine = request_html()
task = loop.create_task(coroutine)
# 绑定回调
task.add_done_callback(callback)
print(task)
print("*"*100)
loop.run_until_complete(task)
print(task)

上述代码当coroutine执行完毕时,会调用callback函数。

如果回调函数需要多个参数,请使用functools模块中的偏函数(partial)方法

循环事件关闭

建议每次编码结束之后,都调用循环事件对象close()方法,彻底清理loop对象。

2.本节爬虫项目

本节课要采集的站点由于全部都是 coser 图片,所以地址在代码中查看即可。

完整代码如下所示:

import threading
import asyncio
import time
import requests
import lxml
from bs4 import BeautifulSoup
async def get(url):
    return requests.get(url)
async def get_html(url):
    print("准备抓取:", url)
    res = await get(url)
    return res.text
async def save_img(img_url):
    # thumbMid_5ae3e05fd3945 将小图替换为大图
    img_url = img_url.replace('thumb','thumbMid')
    img_url = "http://mycoser.com/"   img_url
    print("图片下载中:", img_url)
    res = await get(img_url)
    if res is not None:
        with open(f'./imgs/{time.time()}.jpg', 'wb') as f:
            f.write(res.content)
            return img_url,"ok"
async def main(url_list):
    # 创建 5 个任务
    tasks = [asyncio.ensure_future(get_html(url_list[_])) for _ in range(len(url_list))]
    dones, pending = await asyncio.wait(tasks)
    for task in dones:
        html = task.result()
        soup = BeautifulSoup(html, 'lxml')
        divimg_tags = soup.find_all(attrs={'class': 'workimage'})
        for div in divimg_tags:
            ret = await save_img(div.a.img["data-original"])
            print(ret)
if __name__ == '__main__':
    urls = [f"http://mycoser.com/picture/lists/p/{page}" for page in range(1, 17)]
    totle_page = len(urls) // 5 if len(urls) % 5 == 0 else len(urls) // 5   1
    # 对 urls 列表进行切片,方便采集
    for page in range(0, totle_page):
        start_page = 0 if page == 0 else page * 5
        end_page = (page   1) * 5
        # 循环事件对象
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main(urls[start_page:end_page]))

代码说明:上述代码中第一个要注意的是await关键字后面只能跟如下内容:

  • 原生的协程对象;
  • 一个包含await方法的对象返回的一个迭代器。

所以上述代码get_html函数中嵌套了一个协程 get。主函数 main里面为了运算方便,直接对 urls 进行了切片,然后通过循环进行运行。

当然上述代码的最后两行,可以直接修改为:

 # 循环事件对象
 # loop = asyncio.get_event_loop()
 #
 # loop.run_until_complete(main(urls[start_page:end_page]))
 asyncio.run(main(urls[start_page:end_page]))

轻松获取一堆高清图片:

到此这篇关于python协程与 asyncio 库详情的文章就介绍到这了,更多相关python 协程内容请搜索Devmax以前的文章或继续浏览下面的相关文章希望大家以后多多支持Devmax!

python协程与 asyncio 库详情的更多相关文章

  1. XCode 3.2 Ruby和Python模板

    在xcode3.2下,我的ObjectiveCPython/Ruby项目仍然可以打开更新和编译,但是你无法创建新项目.鉴于xcode3.2中缺少ruby和python的所有痕迹(即创建项目并添加新的ruby/python文件),是否有一种简单的方法可以再次安装模板?我发现了一些关于将它们复制到某个文件夹的信息,但我似乎无法让它工作,我怀疑文件夹的位置已经改变为3.2.解决方法3.2中的应用程序模板

  2. Swift基本使用-函数和闭包(三)

    声明函数和其他脚本语言有相似的地方,比较明显的地方是声明函数的关键字swift也出现了Python中的组元,可以通过一个组元返回多个值。传递可变参数,函数以数组的形式获取参数swift中函数可以嵌套,被嵌套的函数可以访问外部函数的变量。可以通过函数的潜逃来重构过长或者太复杂的函数。

  3. 10 个Python中Pip的使用技巧分享

    众所周知,pip 可以安装、更新、卸载 Python 的第三方库,非常方便。本文小编为大家总结了Python中Pip的使用技巧,需要的可以参考一下

  4. Swift、Go、Julia与R能否挑战 Python 的王者地位

    本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请发送邮件至dio@foxmail.com举报,一经查实,本站将立刻删除。

  5. 红薯因 Swift 重写开源中国失败,貌似欲改用 Python

    本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请发送邮件至dio@foxmail.com举报,一经查实,本站将立刻删除。

  6. 你没看错:Swift可以直接调用Python函数库

    上周Perfect又推出了新一轮服务器端Swift增强函数库:Perfect-Python。对,你没看错,在服务器端Swift其实可以轻松从其他语种的函数库中直接拿来调用,不需要修改任何内容。以如下python脚本为例:Perfect-Python可以用下列方法封装并调用以上函数,您所需要注意的仅仅是其函数名称以及参数。

  7. Swift中的列表解析

    在Swift中完成这个的最简单的方法是什么?我在寻找类似的东西:从Swift2.x开始,有一些与你的Python样式列表解析相当的东西。(在这个意义上,它更像是Python的xrange。如果你想保持集合懒惰一路通过,只是这样说:与Python中的列表解析语法不同,Swift中的这些操作遵循与其他操作相同的语法。

  8. swift抛出终端的python错误

    每当我尝试启动与python相关的swift时,我都会收到错误.我该如何解决?

  9. 在Android上用Java嵌入Python

    解决方法看看this,它适用于J2SE,你可以尝试在Android上运行.

  10. 在android studio中使用python代码构建android应用程序

    我有一些python代码和它的机器人,我正在寻找一种方法来使用android项目中的那些python代码.有没有办法做到这一点!?解决方法有两种主要工具可供使用,它们彼此不同:>QPython>Kivy使用Kivy,大致相同的代码也可以部署到IOS.

随机推荐

  1. 10 个Python中Pip的使用技巧分享

    众所周知,pip 可以安装、更新、卸载 Python 的第三方库,非常方便。本文小编为大家总结了Python中Pip的使用技巧,需要的可以参考一下

  2. python数学建模之三大模型与十大常用算法详情

    这篇文章主要介绍了python数学建模之三大模型与十大常用算法详情,文章围绕主题展开详细的内容介绍,具有一定的参考价值,感想取得小伙伴可以参考一下

  3. Python爬取奶茶店数据分析哪家最好喝以及性价比

    这篇文章主要介绍了用Python告诉你奶茶哪家最好喝性价比最高,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习吧

  4. 使用pyinstaller打包.exe文件的详细教程

    PyInstaller是一个跨平台的Python应用打包工具,能够把 Python 脚本及其所在的 Python 解释器打包成可执行文件,下面这篇文章主要给大家介绍了关于使用pyinstaller打包.exe文件的相关资料,需要的朋友可以参考下

  5. 基于Python实现射击小游戏的制作

    这篇文章主要介绍了如何利用Python制作一个自己专属的第一人称射击小游戏,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起动手试一试

  6. Python list append方法之给列表追加元素

    这篇文章主要介绍了Python list append方法如何给列表追加元素,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

  7. Pytest+Request+Allure+Jenkins实现接口自动化

    这篇文章介绍了Pytest+Request+Allure+Jenkins实现接口自动化的方法,文中通过示例代码介绍的非常详细。对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

  8. 利用python实现简单的情感分析实例教程

    商品评论挖掘、电影推荐、股市预测……情感分析大有用武之地,下面这篇文章主要给大家介绍了关于利用python实现简单的情感分析的相关资料,文中通过示例代码介绍的非常详细,需要的朋友可以参考下

  9. 利用Python上传日志并监控告警的方法详解

    这篇文章将详细为大家介绍如何通过阿里云日志服务搭建一套通过Python上传日志、配置日志告警的监控服务,感兴趣的小伙伴可以了解一下

  10. Pycharm中运行程序在Python console中执行,不是直接Run问题

    这篇文章主要介绍了Pycharm中运行程序在Python console中执行,不是直接Run问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

返回
顶部