Python 协程库 Asyncio

Python 在3.5版本的时候就加入了asyncio,但是用了一下貌似不太好用,在版本3.7,asyncio有了极大的改善,使用了一下还算不错,asyncio引入了async await语法,有点类似于nodejs。

最简单的例子:

import asyncio

async def main():
    print('Hello ...')
    await asyncio.sleep(1)
    print('... World!')

# Python 3.7+
asyncio.run(main())

比如PHP的 swoole 扩展,它也是基于协程的,但是使用起来就没有 python 那么方便了,swoole 没有提供 async await 语法。asyncio 提供了 create_task 来创建协程任务,它可以类比为 swoole 的 go 函数:

async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))

    task2 = asyncio.create_task(
        say_after(2, 'world'))

    print(f"started at {time.strftime('%X')}")

    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")

asyncio还提供了许多辅助函数,比如asyncio.gather用来同时运行多个协程任务,相当于之前使用swoole实现过的GroupWait。相应的swoole的channel也有asyncio的对应版本asyncio.Queue。而且还提供了多种queue,比如PriorityQueue和LifoQueue,因此asyncio也是完全适用于CSP编程模型。

使用asyncio创建一个TCP服务:

import asyncio

async def handle_echo(reader, writer):
    while True:
        # 判断EOF结束符关闭链接
        if reader.at_eof():
            break
        data = await reader.readline()
        message = data.decode()
        addr = writer.get_extra_info('peername')

        print(f"Received {message!r} from {addr!r}")

        print(f"Send: {message!r}")
        writer.write(data)
        await writer.drain()

    print("Close the connection")
    writer.close()

async def tcp_server_task():
    server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)

    addr = server.sockets[0].getsockname()
    print(f'Serving on {addr}')

    async with server:
        await server.serve_forever()

async def main():
    task = asyncio.create_task(tcp_server_task())
    await task

asyncio.run(main())

可以看到代码并不是很多,创建一个简单的TCP服务并不能显示asyncio的强大,现在我们假设需要建立一个集中式的TCP日志收集服务,将TCP端口收到的日志保存在postgresql或者mysql中,日志的请求量会比较大,所以需要有较好的性能,使用如下代码简单实现:

import asyncio
from psycopg2.pool import SimpleConnectionPool
from datetime import datetime

class AsyncTask:

    def __init__(self):
        self.task_queue = asyncio.Queue()
        self.db_pool = SimpleConnectionPool(
            2, 20,
            dbname='echoes',
            host='192.168.2.10',
            user='twn39',
            password='tangweinan'
        )

    async def handle_echo(self, reader, writer):
        while True:
            # 判断EOF结束符关闭链接
            if reader.at_eof():
                break
            data = await reader.readline()
            message = data.decode()
            self.task_queue.put_nowait(message.strip())
            writer.write("OK\r\n".encode())
            await writer.drain()

        print("Close the connection")
        writer.close()

    async def tcp_server_task(self):
        server = await asyncio.start_server(self.handle_echo, '127.0.0.1', 8888)

        addr = server.sockets[0].getsockname()
        print(f'Serving on {addr}')

        async with server:
            await server.serve_forever()

    async def consume_task(self, name: str):
        while True:
            data = await self.task_queue.get()
            conn = self.db_pool.getconn()
            print(conn)
            cur = conn.cursor()
            cur.execute("insert into logs (level, message, created_at) values (%s, %s, %s)", (
                200, 'log test', datetime.now()
            ))
            conn.commit()
            self.db_pool.putconn(conn)
            print(f'work: {name} consume data: {data}')
            self.task_queue.task_done()

if __name__ == '__main__':
    async def main():
        async_task = AsyncTask()
        task = asyncio.create_task(async_task.tcp_server_task())
        task1 = asyncio.create_task(async_task.consume_task('worker-1'))
        task2 = asyncio.create_task(async_task.consume_task('worker-2'))
        await asyncio.gather(task, task1, task2, return_exceptions=True)

    asyncio.run(main())

在程序中引入了Queue,这样可以异步写入数据库,在请求高峰时提高稳定性,我们创建了三个主要的任务,一个是监听TCP端口,获取日志数据,将日志放入queue中,另外两个是消费queue,当queue中有数据时写入数据库,没有时等待数据。代码中有两个无限循环(其实是三个),在程序运行的时候,相当于是并行的,但是实际上它是单线程的,协程可以中断,当多个协程运行时,实际上是函数之间相互切换运行,但是切换的时间很短,所以体现出来是并行的。多线程编程它的线程切换是由操作系统来实现的,所以相互切换的成本比较高,而协程是由用户来决定什么时候切换的,也称为用户端线程。