Інтро
Асинхронне програмування останніми роками стає дедалі популярнішим у спільноті Python. Стає зрозумілим, чому використання бібліотек на зразок aiohttp
збільшується в рази. Вони обробляють багато конкурентних підключень, при цьому зберігають читабельність та простоту коду.
Не так давно Django випустив підтримку асинхронності в наступних версіях. Тож майбутнє асинхронного Python достатньо світле. Однак для багатьох розробників, які мають досвід роботи лише зі стандартною блокувальною моделлю, механізм роботи нових інструментів може здатися заплутаним.
В цьому матеріалі ми спробуємо зрозуміти, як все працює «під капотом», і створимо невеликого двійника aiohttp
з нуля. Почнемо з базового прикладу з офіційної документації та додамо увесь необхідний функціонал.
Імовірно, ви вже трохи працювали з asyncio і розумієте, про що йде мова. Якщо ж ні, вам допоможуть ці матеріали:
Для нетерплячих — остаточний код доступний на hzlmn/sketch
.
Низькорівневий API Asyncio, передача даних та протоколи
Asyncio пройшов довгих шлях, щоб стати таким, яким ми його знаєм. Спочатку бібліотека створювалась як низькорівневий інструмент під назвою tulip
. Тож написання високорівневих застосунків було не таким приємним, як сьогодні.
Зараз asyncio
є здебільшого високорівневим API, натомість для управління циклами подій існує набір низькорівневих хелперів, а також реалізація мережевих/ipc-протоколів.
З коробки бібліотека підтримує лише TCP
, UDP
, SSL
та підпроцеси. Інші бібліотеки самостійно реалізовують протоколи вищого рівня (HTTP
, FTP
тощо), які базуються на транспортних протоколах та доступному API.
Усі комунікації відбуваються через з'єднання передачі та протоколів. Простіше кажучи, передача описує, як ми можемо обмінятися даними, а протокол відповідальний за те, які саме дані обрати.
Asyncio
містить достатньо непогану офіційну документацію, з якою детальніше можна ознайомитись за посиланням.
Щоб отримати певне уявлення про те, як все працює, напишемо простий TCP
-сервер, що виводитиме повідомлення.
server.py
import asyncio
class Server(asyncio.Protocol):
def connection_made(self, transport):
self._transport = transport
def data_received(self, data):
message = data.decode()
self._transport.write(data)
self._transport.close()
loop = asyncio.get_event_loop()
coro = loop.create_server(Server, '127.0.0.1', 8080)
server = loop.run_until_complete(coro)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
$ curl http://127.0.0.1:8080
GET / HTTP/1.1
Host: 127.0.0.1:8080
User-Agent: curl/7.54.0
Accept: */*
Як бачимо, код досить простий, однак він ще не дуже масштабований для створення високорівневих застосунків.
Оскільки HTTP
працює поверх транспортного протоколу TCP
, ми вже можемо надсилати HTTP
-запити до нашого сервера, однак отримуємо їх у raw-форматі. Тож додамо кращий механізм обробки HTTP
-запитів.
Створюємо серверний протокол
Додамо обробку запиту, щоб отримати корисну інформацію про його заголовки, тіло та шлях, аби не працювати з необробленим текстом. Парсинг — досить складна тема, яка трохи виходить за межі матеріалу, тому ми використовуємо httptools.
З іншого боку, aiohttp
має власний парсер на основі Python, а також прив'язку до Node http-parser
.
Напишемо власний клас для парсингу, який буде використовуватись як міксин для нашого основного класу Server
.
http_parser.py
class HttpParserMixin:
def on_body(self, data):
self._body = data
def on_url(self, url):
self._url = url
def on_message_complete(self):
print(f"Received request to {self._url.decode(self._encoding)}")
def on_header(self, header, value):
header = header.decode(self._encoding)
self._headers[header] = value.decode(self._encoding)
Тепер, коли ми маємо робочий HttpParserMixin
, модифікуємо трохи наш Server
та застосуємо міксин.
server.py
import asyncio
from httptools import HttpRequestParser
from .http_parser import HttpParserMixin
class Server(asyncio.Protocol, HttpParserMixin):
def __init__(self, loop):
self._loop = loop
self._encoding = "utf-8"
self._url = None
self._headers = {}
self._body = None
self._transport = None
self._request_parser = HttpRequestParser(self)
def connection_made(self, transport):
self._transport = transport
def connection_lost(self, *args):
self._transport = None
def data_received(self, data):
# Pass data to our parser
self._request_parser.feed_data(data)
Зараз у нас є сервер, який розуміє вхідні HTTP
-запити та отримує деяку важливу інформацію з них. Спробуймо додати простий ранер:
server.py
if __name__ == "__main__":
loop = asyncio.get_event_loop()
serv = Server(loop)
server = loop.run_until_complete(loop.create_server(lambda: serv, port=8080))
try:
print("Started server on ::8080")
loop.run_until_complete(server.serve_forever())
except KeyboardInterrupt:
server.close()
loop.run_until_complete(server.wait_closed())
loop.stop()
> python server.py
Started server on ::8080
> curl http://127.0.0.1:8080/hello
Об'єкти Request/Response
Тепер ми маємо робочий сервер, який може обробляти HTTP
-запити, проте такої абстракції недостатньо для наших застосунків.
Створимо базовий клас Request
, який згрупує разом всю інформацію з вхідних HTTP
-запитів. Використаємо бібліотеку yarl
для роботи з url (переконайтеся, що встановили її з pip).
request.py
import json
from yarl import URL
class Request:
_encoding = "utf_8"
def __init__(self, method, url, headers, version=None, body=None, app=None):
self._version = version
self._method = method.decode(self._encoding)
self._url = URL(url.decode(self._encoding))
self._headers = headers
self._body = body
@property
def method(self):
return self._method
@property
def url(self):
return self._url
@property
def headers(self):
return self._headers
def text(self):
if self._body is not None:
return self._body.decode(self._encoding)
def json(self):
text = self.text()
if text is not None:
return json.loads(text)
def __repr__(self):
return f"<Request at 0x{id(self)}>"
Далі нам знадобиться структура, яка допоможе описати HTTP
-відповідь у зручному для розробників вигляді та конвертувати її в сирий HTTP
, який може бути оброблений asyncio.Transport
.
response.py
import http.server
web_responses = http.server.BaseHTTPRequestHandler.responses
class Response:
_encoding = "utf-8"
def __init__(
self,
body=None,
status=200,
content_type="text/plain",
headers=None,
version="1.1",
):
self._version = version
self._status = status
self._body = body
self._content_type = content_type
if headers is None:
headers = {}
self._headers = headers
@property
def body(self):
return self._body
@property
def status(self):
return self._status
@property
def content_type(self):
return self._content_type
@property
def headers(self):
return self._headers
def add_body(self, data):
self._body = data
def add_header(self, key, value):
self._headers[key] = value
def __str__(self):
"""Використаємо це в наших обробниках. Фактично, генеруємо сиру HTTP-відповідь,
який передається нашому TCP-протоколу
"""
status_msg, _ = web_responses.get(self._status)
messages = [
f"HTTP/{self._version} {self._status} {status_msg}",
f"Content-Type: {self._content_type}",
f"Content-Length: {len(self._body)}",
]
if self.headers:
for header, value in self.headers.items():
messages.append(f"{header}: {value}")
if self._body is not None:
messages.append("\
\
" + self._body)
return "\
\
".join(messages)
def __repr__(self):
return f"<Response at 0x{id(self)}>"
Код досить зрозумілий: ми інкапсулюємо усі дані та передбачаємо потрібні гетери. Також у нас є декілька хелперів для того, щоб отримати text
та json
, які будуть використані пізніше. Щоб створити об'єкт Request
з повідомлення, необхідно оновити наш Server
.
Потрібний об'єкт повинен створюватись, коли весь запит оброблено, тож реалізуємо цей процес в обробнику on_message_complete
нашого парсера.
http_parser.py
class HttpParserMixin:
...
def on_message_complete(self):
self._request = self._request_class(
version=self._request_parser.get_http_version(),
method=self._request_parser.get_method(),
url=self._url,
headers=self._headers,
body=self._body,
)
...
Серверу також потрібні невеликі зміни, аби створити об'єкт Response
та передати закодовані значення в asyncio.Transport
.
server.py
from .response import Response
...
class Server(asyncio.Protocol, HttpParserMixin):
...
def __init__(self, loop):
...
self._request = None
self._request_class = Request
...
def data_received(self, data):
self._request_parser.feed_data(data)
resp = Response(body=f"Received request on {self._request.url}")
self._transport.write(str(resp).encode(self._encoding))
self._transport.close()
Тепер, запустивши server.py
, ми зможемо побачити в респонзі received request on /path
у відповідь на curl http://localhost:8080/path
.
Застосунок та UrlDispatcher
На цьому етапі у нас є простий сервер, який може обробляти HTTP
-запити та об'єкти Request/Response
для роботи з циклами запитів. Однак створені нами інструменти не дотримуються декількох важливих концепцій.
Зараз у нас є лише один основний обробник запитів, а у великих застосунках для кожного маршруту їх набагато більше. Тож потрібен механізм, що дозволяв би зареєструвати декілька обробників.
Спробуймо створити найпростіший варіант UrlDispatcher
— об'єкт зі словником. Як ключ він зберігатиме метод та кортеж шляху, а фактичний обробник — як значення. Нам також потрібен обробник для тих ситуацій, коли користувач хоче отримати доступ до нерозпізнаного маршруту.
router.py
from .response import Response
class UrlDispatcher:
def __init__(self):
self._routes = {}
async def _not_found(self, request):
return Response(f"Not found {request.url} on this server", status=404)
def add_route(self, method, path, handler):
self._routes[(method, path)] = handler
def resolve(self, request):
key = (request.method, request.url.path)
if key not in self._routes:
return self._not_found
return self._routes[key]
Ми пропустили такі моменти, як параметризовані маршрути, однак повернемося до них пізніше. Поки що залишимо якомога простіший варіант.
Далі нам потрібен контейнер Application
, який об'єднає усю інформацію стосовно застосунку, тому що кожен раз звертатися до Server
буде незручно.
import asyncio
from .router import UrlDispatcher
from .server import Server
from .response import Response
class Application:
def __init__(self, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
self._loop = loop
self._router = UrlDispatcher()
@property
def loop(self):
return self._loop
@property
def router(self):
return self._router
def _make_server(self):
return Server(loop=self._loop, handler=self._handler, app=self)
async def _handler(self, request, response_writer):
"""Обробляє вхідний запит"""
handler = self._router.resolve(request)
resp = await handler(request)
if not isinstance(resp, Response):
raise RuntimeError(f"expect Response instance but got {type(resp)}")
response_writer(resp)
Нам необхідно модифікувати трохи наш Server
та додати метод response_writer
, який буде відповідальним за передачу даних транспортному протоколу. Треба також додати до ініціалізації властивості handler
та app
, які будуть використовуватись для виклику відповідних обробників.
server.py
class Server(asyncio.Protocol, HttpParserMixin):
...
def __init__(self, loop, handler, app):
self._loop = loop
self._url = None
self._headers = {}
self._body = None
self._transport = None
self._request_parser = HttpRequestParser(self)
self._request = None
self._request_class = Request
self._request_handler = handler
self._request_handler_task = None
def response_writer(self, response):
self._transport.write(str(response).encode(self._encoding))
self._transport.close()
...
http_parser.py
class HttpParserMixin:
def on_body(self, data):
self._body = data
def on_url(self, url):
self._url = url
def on_message_complete(self):
self._request = self._request_class(
version=self._request_parser.get_http_version(),
method=self._request_parser.get_method(),
url=self._url,
headers=self._headers,
body=self._body,
)
self._request_handler_task = self._loop.create_task(
self._request_handler(self._request, self.response_writer)
)
def on_header(self, header, value):
header = header.decode(self._encoding)
self._headers[header] = value.decode(self._encoding)
Нарешті, з готовим основним функціоналом та можливістю зареєструвати нові маршрути та обробники, додамо простий хелпер для фактичного запуску нашого застосунку (подібно до web.run_app
в aiohttp
).
application.py
def run_app(app, host="127.0.0.1", port=8080, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
serv = app._make_server()
server = loop.run_until_complete(
loop.create_server(lambda: serv, host=host, port=port)
)
try:
print(f"Started server on {host}:{port}")
loop.run_until_complete(server.serve_forever())
except KeyboardInterrupt:
server.close()
loop.run_until_complete(server.wait_closed())
loop.stop()
Не забуваємо додати до застосунку щойно створені інструменти:
app.py
import asyncio
from .response import Response
from .application import Application, run_app
app = Application()
async def handler(request):
return Response(f"Hello at {request.url}")
app.router.add_route("GET", "/", handler)
if __name__ == "__main__":
run_app(app)
Якщо ви запустите застосунок та зробите GET
-запит на шлях /
, то побачите повідомлення Hello at /
та 404
помилку для всіх інших маршрутів. Чудова робота! Однак ще є над чим працювати.
$ curl 127.0.0.1:8080/
Hello at /
$ curl 127.0.0.1:8080/invalid
Not found /invalid on this server
Що далі
Ми маємо увесь базовий функціонал, але досі є речі, які потребують змін у нашому «фреймворку». Найперше необхідно додати параметризовані маршрути, оскільки це «маст-хев» усіх сучасних бібліотек. Далі необхідно додати підтримку посередників (middlewares), оскільки це дуже поширена та потужна концепція.
Ми також можемо спробувати реалізувати хуки життєвого циклу (on_startup
, on_shutdown
, on_cleanup
тощо), подібні до aiohttp
.
Параметри маршрутів
На разі наш UrlDispatcher
досить бідний на функціонал та працює із зареєстрованими url-шляхами як з рядками. Найперше нам необхідно додати підтримку таких шаблонів, як /user/{username}
в нашому методі resolve
. Нам також потрібен хелпер _format_pattern
, який буде відповідальним за генерацію фактичних регулярних виразів з параметризованого рядка. Як ви могли помітити, у нас є інший хелпер _method_not_allowed
та методи для простішого визначення GET
, POST
-маршрутів.
router.py
import re
from functools import partialmethod
from .response import Response
class UrlDispatcher:
_param_regex = r"{(?P<param>\\w+)}"
def __init__(self):
self._routes = {}
async def _not_found(self, request):
return Response(f"Could not find {request.url.raw_path}")
async def _method_not_allowed(self, request):
return Response(f"{request.method} not allowed for {request.url.raw_path}")
def resolve(self, request):
for (method, pattern), handler in self._routes.items():
match = re.match(pattern, request.url.raw_path)
if match is None:
return None, self._not_found
if method != request.method:
return None, self._method_not_allowed
return match.groupdict(), handler
def _format_pattern(self, path):
if not re.search(self._param_regex, path):
return path
regex = r""
last_pos = 0
for match in re.finditer(self._param_regex, path):
regex += path[last_pos: match.start()]
param = match.group("param")
regex += r"(?P<%s>\\w+)" % param
last_pos = match.end()
return regex
def add_route(self, method, path, handler):
pattern = self._format_pattern(path)
self._routes[(method, pattern)] = handler
add_get = partialmethod(add_route, "GET")
add_post = partialmethod(add_route, "POST")
add_put = partialmethod(add_route, "PUT")
add_head = partialmethod(add_route, "HEAD")
add_options = partialmethod(add_route, "OPTIONS")
Потрібно також змінити контейнер нашого застосунку. Зараз метод resolve
з UrlDispatcher
повертає match_info
та handler
. Тож в Application._handler
змінимо такі рядки:
application.py
class Application:
...
async def _handler(self, request, response_writer):
"""Process incoming request"""
match_info, handler = self._router.resolve(request)
request.match_info = match_info
...
Посередники (Middlewares)
Посередники потрібні для модифікації вхідних запитів або відповідей обробника. Вони запускаються перед кожним запитом до сервера. Для наших потреб реалізувати все буде досить просто. Передусім додамо перелік зареєстрованих посередників до нашого об'єкта Application
та трохи змінимо Application._handler
. Кожен посередник повинен працювати з результатом попереднього в ланцюжку.
application.py
from functools import partial
...
class Application:
def __init__(self, loop=None, middlewares=None):
...
if middlewares is None:
self._middlewares = []
...
async def _handler(self, request, response_writer):
"""Обробка вхідного запиту"""
match_info, handler = self._router.resolve(request)
request.match_info = match_info
if self._middlewares:
for md in self._middlewares:
handler = partial(md, handler=handler)
resp = await handler(request)
...
Спробуймо додати посередник для логування:
app.py
import asyncio
from .response import Response
from .application import Application, run_app
async def log_middleware(request, handler):
print(f"Received request to {request.url.raw_path}")
return await handler(request)
app = Application(middlewares=[log_middleware])
async def handler(request):
return Response(f"Hello at {request.url}")
app.router.add_route("GET", "/", handler)
if __name__ == "__main__":
run_app(app)
Хуки життєвого циклу застосунку
Реалізуємо запуск певних дій у відповідь на запуск та зупинку сервера. В aiohttp
існує багато сигналів на зразок on_startup
, on_shutdown
, on_response_prepared
, але для наших потреб залишимо все якомога простішим. Реалізуємо лише startup
та shutdown
.
Всередині Application
нам необхідно додати перелік фактичних обробників для кожної події з правильною інкапсуляцією та гетерами. Потім проводимо фактичний startup
та shutdown
та відповідні виклики хелпера run_app
.
application.py
class Application:
def __init__(self, loop=None, middlewares=None):
...
self._on_startup = []
self._on_shutdown = []
...
@property
def on_startup(self):
return self._on_startup
@property
def on_shutdown(self):
return self._on_shutdown
async def startup(self):
coros = [func(self) for func in self._on_startup]
await asyncio.gather(*coros, loop=self._loop)
async def shutdown(self):
coros = [func(self) for func in self._on_shutdown]
await asyncio.gather(*coros, loop=self._loop)
...
def run_app(app, host="127.0.0.1", port=8080, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
serv = app._make_server()
loop.run_until_complete(app.startup())
server = loop.run_until_complete(
loop.create_server(lambda: serv, host=host, port=port)
)
try:
print(f"Started server on {host}:{port}")
loop.run_until_complete(server.serve_forever())
except KeyboardInterrupt:
loop.run_until_complete(app.shutdown())
server.close()
loop.run_until_complete(server.wait_closed())
loop.stop()
Обробка виключень
На цьому етапі ми вже додали більшість ключових фіч, однак нам досі не вистачає обробки виключень. В aiohttp
можна досить просто обробляти веб-виключення як python-виключення. Тут на допомогу приходять класи Exception
та Response
.
Спершу створимо власний базовий клас HTTPException
та декілька хелперів, залежно від типу помилок, які ми хочемо опрацьовувати (наприклад, HTTPNotFound
— для нерозпізнаних шляхів, HTTPBadRequest
— для проблем з боку користувача та HTTPFound
— для редіректу).
from .response import Response
class HTTPException(Response, Exception):
status_code = None
def __init__(self, reason=None, content_type=None):
self._reason = reason
self._content_type = content_type
Response.__init__(
self,
body=self._reason,
status=self.status_code,
content_type=self._content_type or "text/plain",
)
Exception.__init__(self, self._reason)
class HTTPNotFound(HTTPException):
status_code = 404
class HTTPBadRequest(HTTPException):
status_code = 400
class HTTPFound(HTTPException):
status_code = 302
def __init__(self, location, reason=None, content_type=None):
super().__init__(reason=reason, content_type=content_type)
self.add_header("Location", location)
Тепер нам треба трохи модифікувати наш Application._handler
, щоб відловлювати веб-виключення.
application.py
class Application:
...
async def _handler(self, request, response_writer):
"""Process incoming request"""
try:
match_info, handler = self._router.resolve(request)
request.match_info = match_info
if self._middlewares:
for md in self._middlewares:
handler = partial(md, handler=handler)
resp = await handler(request)
except HTTPException as exc:
resp = exc
...
Ми також можемо видалити хелпери _not_found
та _method_not_allowed
з нашого UrlDispatcher
та замість цього просто повернути певне виключення.
router.py
class UrlDispatcher:
...
def resolve(self, request):
for (method, pattern), handler in self._routes.items():
match = re.match(pattern, request.url.raw_path)
if match is None:
raise HTTPNotFound(reason=f"Could not find {request.url.raw_path}")
if method != request.method:
raise HTTPBadRequest(reason=f"{request.method} not allowed for {request.url.raw_path}")
return match.groupdict(), handler
...
Непогано буде також додати відформатовану відповідь для internal server error
, оскільки ми не хочемо, щоб наш застосунок ламався при виникненні помилок. Додамо простий html-шаблон, а також невеликий хелпер для форматування виключень.
helpers.py
import traceback
from .response import Response
server_exception_templ = """
<div>
<h1>500 Internal server error</h1>
<span>Server got itself in trouble : <b>{exc}</b><span>
<p>{traceback}</p>
</div>
"""
def format_exception(exc):
resp = Response(status=500, content_type="text/html")
trace = traceback.format_exc().replace("\
", "</br>")
msg = server_exception_templ.format(exc=str(exc), traceback=trace)
resp.add_body(msg)
return resp
Тепер ми просто відловлюємо всі Exception
всередині нашого Application._handler
та генеруємо фактичну html-відповідь з нашим хелпером.
application.py
class Application:
...
async def _handler(self, request, response_writer):
"""Обробка вхідного запиту"""
try:
match_info, handler = self._router.resolve(request)
request.match_info = match_info
if self._middlewares:
for md in self._middlewares:
handler = partial(md, handler=handler)
resp = await handler(request)
except HTTPException as exc:
resp = exc
except Exception as exc:
resp = format_exception(exc)
...
Налаштовуємо процес завершення
Наостанок нам потрібно додати обробку правильного процесу завершення роботи сервера. Для цього замінимо run_app
такими рядками:
application.py
...
def run_app(app, host="127.0.0.1", port=8080, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
serv = app._make_server()
loop.run_until_complete(app.startup())
server = loop.run_until_complete(
loop.create_server(lambda: serv, host=host, port=port)
)
loop.add_signal_handler(
signal.SIGTERM, lambda: asyncio.ensure_future(app.shutdown())
)
...
Приклад застосунку
Тепер, коли всі наші інструменти готові, спробуймо завершити нашу попередню роботу над хуками життєвого циклу та використаємо обробку виключень.
app.py
rom .application import Application, run_app
async def on_startup(app):
# тут ви можете просто звертатися до фактичної БД, але для нашого прикладу залишимо звичайний сет
app.db = {"john_doe",}
async def log_middleware(request, handler):
print(f"Received request to {request.url.raw_path}")
return await handler(request)
async def handler(request):
username = request.match_info["username"]
if username not in request.app.db:
raise HTTPNotFound(reason=f"No such user with as {username} :(")
return Response(f"Welcome, {username}!")
app = Application(middlewares=[log_middleware])
app.on_startup.append(on_startup)
app.router.add_get("/{username}", handler)
if __name__ == "__main__":
run_app(app)
Якщо ви все правильно зробили, то маєте побачите повідомлення логів, вітання у відповідь на реєстрацію користувача та HTTPNotFound
для незареєстрованих користувачів та нерозпізнаних шляхів.
Висновок
Щойно ми подолали приблизно 500 рядків на шляху створення простенького фреймворку, надихнувшись aiohttp
та sanic
. Звичайно ж, наша робота не готова до виходу в продакшен, оскільки ми пропускали багато корисних та важливих фіч: на зразок більшої надійності, кращої підтримки специфікації HTTP, веб-сокетів тощо. Натомість ми на практиці розглянули, як подібні інструменти працюють зсередини.
Ще немає коментарів