当前位置:首页 >> 脚本专栏

Python使用asyncio包处理并发详解

阻塞型I/O和GIL

CPython 解释器本身就不是线程安全的,因此有全局解释器锁(GIL),一次只允许使用一个线程执行 Python 字节码。因此,一个 Python 进程通常不能同时使用多个 CPU 核心。

然而,标准库中所有执行阻塞型 I/O 操作的函数,在等待操作系统返回结果时都会释放GIL。这意味着在 Python 语言这个层次上可以使用多线程,而 I/O 密集型 Python 程序能从中受益:一个 Python 线程等待网络响应时,阻塞型 I/O 函数会释放 GIL,再运行一个线程。

asyncio

这个包使用事件循环驱动的协程实现并发。 asyncio 大量使用 yield from 表达式,因此与Python 旧版不兼容。

asyncio 包使用的“协程”是较严格的定义。适合asyncio API 的协程在定义体中必须使用 yield from,而不能使用 yield。此外,适合 asyncio 的协程要由调用方驱动,并由调用方通过 yield from 调用;

示例1

import threading
import asyncio

@asyncio.coroutine
def hello():
  print('Start Hello', threading.currentThread())
  yield from asyncio.sleep(5)
  print('End Hello', threading.currentThread())

@asyncio.coroutine
def world():
  print('Start World', threading.currentThread())
  yield from asyncio.sleep(3)
  print('End World', threading.currentThread())

# 获取EventLoop:
loop = asyncio.get_event_loop()
tasks = [hello(), world()]
# 执行coroutine
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

@asyncio.coroutine把生成器函数标记为协程类型。
asyncio.sleep(3) 创建一个3秒后完成的协程。
loop.run_until_complete(future),运行直到future完成;如果参数是 coroutine object,则需要使用 ensure_future()函数包装。
loop.close() 关闭事件循环

示例2

import asyncio

@asyncio.coroutine
def worker(text):
  """
  协程运行的函数
  :param text:
  :return:
  """
  i = 0
  while True:
    print(text, i)

    try:
      yield from asyncio.sleep(.1)
    except asyncio.CancelledError:
      break

    i += 1


@asyncio.coroutine
def client(text, io_used):
  worker_fu = asyncio.ensure_future(worker(text))

  # 假装等待I/O一段时间
  yield from asyncio.sleep(io_used)

  # 结束运行协程
  worker_fu.cancel()
  return 'done'


loop = asyncio.get_event_loop()
tasks = [client('xiaozhe', 3), client('zzzz', 5)]
result = loop.run_until_complete(asyncio.wait(tasks))
loop.close()
print('Answer:', result)

解释:

1. asyncio.ensure_future(coro_or_future, *, loop=None):计划安排一个 coroutine object的执行,返回一个 asyncio.Task object。
2. worker_fu.cancel(): 取消一个协程的执行,抛出CancelledError异常。
3. asyncio.wait():协程的参数是一个由期物或协程构成的可迭代对象; wait 会分别把各个协程包装进一个 Task 对象。

asyncio.Task 对象与threading.Thread对象的比较

asyncio.Task 对象差不多与 threading.Thread 对象等效。
Task 对象用于驱动协程, Thread 对象用于调用可调用的对象。
Task 对象不由自己动手实例化,而是通过把协程传给 asyncio.ensure_future(…) 函数或loop.create_task(…) 方法获取。
获取的 Task 对象已经排定了运行时间;Thread 实例则必须调用 start 方法,明确告知让它运行。
如果想终止任务,可以使用 Task.cancel() 实例方法,在协程内部抛出CancelledError 异常。

线程与协程的安全比较

如果使用线程做过重要的编程,因为调度程序任何时候都能中断线程。必须记住保留锁,去保护程序中的重要部分,防止多步操作在执行的过程中中断,防止数据处于无效状态。

协程默认会做好全方位保护,以防止中断。我们必须显式产出才能让程序的余下部分运行。对协程来说,无需保留锁,在多个线程之间同步操作,协程自身就会同步,因为在任意时刻只有一个协程运行。想交出控制权时,可以使用 yield 或 yield from 把控制权交还调度程序。这就是能够安全地取消协程的原因:按照定义,协程只能在暂停的 yield处取消,因此可以处理 CancelledError 异常,执行清理操作。

Future(期物)

通常情况下自己不应该创建期物,而只能由并发框架(concurrent.futures 或 asyncio)实例化。原因很简单:期物表示终将发生的事情,而确定某件事会发生的唯一方式是执行的时间已经排定。

asyncio.Future

在 asyncio 包中, BaseEventLoop.create_task(…) 方法接收一个协程,排定它的运行时间,然后返回一个 asyncio.Task 实例——也是 asyncio.Future 类的实例,因为 Task 是Future 的子类,用于包装协程。

asyncio.ensure_future(coro_or_future, *, loop=None)

这个函数统一了协程和期物:第一个参数可以是二者中的任何一个。如果是 Future 或 Task 对象,那就原封不动地返回。如果是协程,那么 async 函数会调用loop.create_task(…) 方法创建 Task 对象。 loop= 关键字参数是可选的,用于传入事件循环;如果没有传入,那么 async 函数会通过调用 asyncio.get_event_loop() 函数获取循环对象。

BaseEventLoop.create_task(coro)

这个方法排定协程的执行时间,返回一个 asyncio.Task 对象。

asyncio 包中有多个函数会自动把参数指定的协程包装在 asyncio.Task 对象中,例如 BaseEventLoop.run_until_complete(…) 方法。

asyncio.as_completed

为了集成进度条,我们可以使用的是 as_completed 生成器函数;幸好, asyncio 包提供了这个生成器函数的相应版本。

使用asyncio和aiohttp包

从 Python 3.4 起, asyncio 包只直接支持 TCP 和 UDP。如果想使用 HTTP 或其他协议,那么要借助第三方包 aiohttp 。

cc_list = ['China', 'USA']

@asyncio.coroutine
def get_flag(cc):
  url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
  resp = yield from aiohttp.request('GET', url)
  image = yield from resp.read()
  return image

@asyncio.coroutine
def download_one(name): 
  image = yield from get_flag(name) 
  save_flag(image, name.lower() + '.gif')
  return name

loop = asyncio.get_event_loop() 
wait_coro = asyncio.wait([download_one(cc) for cc in sorted(cc_list)]) 
res, _ = loop.run_until_complete(wait_coro) 
loop.close()

使用 asyncio 包时,我们编写的异步代码中包含由 asyncio 本身驱动的协程(即委派生成器),而生成器最终把职责委托给 asyncio 包或第三方库(如aiohttp)中的协程。这种处理方式相当于架起了管道,让 asyncio 事件循环(通过我们编写的协程)驱动执行低层异步 I/O 操作的库函数。

避免阻塞型调用

有两种方法能避免阻塞型调用中止整个应用程序的进程:
1. 在单独的线程中运行各个阻塞型操作
2. 把每个阻塞型操作转换成非阻塞的异步调用使用

多个线程是可以的,但是各个操作系统线程(Python 使用的是这种线程)消耗的内存达兆字节(具体的量取决于操作系统种类)。如果要处理几千个连接,而每个连接都使用一个线程的话,我们负担不起。

把生成器当作协程使用是异步编程的另一种方式。对事件循环来说,调用回调与在暂停的协程上调用 .send() 方法效果差不多。各个暂停的协程是要消耗内存,但是比线程消耗的内存数量级小。

上面的脚本为什么会很快

在上面的脚本中,调用 loop.run_until_complete 方法时,事件循环驱动各个download_one 协程,运行到第一个 yield from 表达式处时,那个表达式驱动各个get_flag 协程,然后在get_flag协程里面运行到第一个 yield from 表达式处时,调用 aiohttp.request(…)函数。这些调用都不会阻塞,因此在零点几秒内所有请求全部开始。

asyncio 的基础设施获得第一个响应后,事件循环把响应发给等待结果的 get_flag 协程。得到响应后, get_flag 向前执行到下一个 yield from 表达式处,调用resp.read() 方法,然后把控制权还给主循环。其他响应会陆续返回。所有 get_ flag 协程都获得结果后,委派生成器 download_one 恢复,保存图像文件。

async和await

为了简化并更好地标识异步IO,从Python 3.5开始引入了新的语法async和await,可以让coroutine的代码更简洁易读。

async和await是针对coroutine的新语法,要使用新的语法,只需要做两步简单的替换。
1. 把@asyncio.coroutine替换为async
2. 把yield from替换为await

例如:

@asyncio.coroutine
def hello():
  print("Hello world!")
  r = yield from asyncio.sleep(1)
  print("Hello again!")

等同于

async def hello():
  print("Hello world!")
  r = await asyncio.sleep(1)
  print("Hello again!")

网站请求实例

import asyncio
import aiohttp

urls = [
  'http://www.163.com/',
  'http://www.sina.com.cn/',
  'https://www.hupu.com/',
  'http://www.csdn.net/'
]


async def get_url_data(u):
  """
  读取url的数据
  :param u:
  :return:
  """
  print('running ', u)
  async with aiohttp.ClientSession() as session:
    async with session.get(u) as resp:
      print(u, resp.status, type(resp.text()))
      # print(await resp.text())

  return resp.headers


async def request_url(u):
  """
  主调度函数
  :param u:
  :return:
  """
  res = await get_url_data(u)
  return res


loop = asyncio.get_event_loop()
task_lists = asyncio.wait([request_url(u) for u in urls])
all_res, _ = loop.run_until_complete(task_lists)
loop.close()

print(all_res)

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。