Нам предстоит погрузиться в мир асинхронного программирования. Сейчас уже сложно представить Python без асинхронного подхода — он только набирает популярность среди веб-разработчиков.
Сегодня мы изучим следующие темы:
Разработчики используют итераторы в коде настолько часто, что даже не задумываются, в чём они помогают. Итераторы в Python — это списки, словари, множества, строки, файлы и другие коллекции. Везде, где вы пишете цикл for
, используется итератор. Итераторы помогают перемещаться по объектам любого контейнера в коде. При этом вам не нужно задумываться о том, как хранятся и обрабатываются эти элементы: итератор инкапсулирует их. Проще говоря, это способ вычитывать элементы из объекта по одному.
Перейдём сразу к практике. Представим, что на собеседовании вас попросили реализовать аналог функции range
.
class Range:
def __init__(self, stop_value: int):
self.current = -1
self.stop_value = stop_value - 1
def __iter__(self):
return RangeIterator(self)
class RangeIterator:
def __init__(self, container):
self.container = container
def __next__(self):
if self.container.current < self.container.stop_value:
self.container.current += 1
return self.container.current
raise StopIteration
Получили первую версию работающего кода. Запустим код и убедимся в этом.
_range = Range(5)
for i in _range:
print(i)
0 1 2 3 4
Начнём с класса Range
. У него внутри реализован магический метод __iter__
. Он обозначает, что объект этого класса итерабельный, то есть с ним можно работать в цикле for
. Ещё говорят, что __iter__
отдаёт итерируемый объект.
Ограничимся понятиями итератора и итерабельного объекта. Чтобы код действительно отдавал новые данные из range
, нужно реализовать соответствующую функцию. Она как раз и называется итератор. RangeIterator
— итератор для класса range
. Любой итератор должен реализовывать магическую функцию __next__
, в которой он должен отдавать новые значения для объектов класса Range
. Если вы дошли до конца множества значений, то появляется исключение StopIteration
.
Для тех, кто привык читать первоисточники, существует PEP-234 (на английском). Там подробно изложена работа итераторов и итерабельных объектов.
Но можно ли как-то упростить написанный выше код? Да, можно.
class Range2:
def __init__(self, stop_value: int):
self.current = -1
self.stop_value = stop_value - 1
def __iter__(self):
return self
def __next__(self):
if self.current < self.stop_value:
self.current += 1
return self.current
raise StopIteration
В Python есть возможность объявить объекты класса и итерабельными, и итераторами. Это удобно, но с точки зрения принципов проектирования приложения, у такого объекта есть две ответственности: он является итератором и при этом выполняет какую-то свою логику. В мире Python это допустимо, но в некоторых других языках вас могут понять неправильно. Будьте бдительны!
Ещё стоит рассмотреть, как работает цикл for
под капотом.
iterable = Range2(5)
iterator = iter(iterable)
while True:
try:
value = next(iterator)
print(value)
except StopIteration:
break
0 1 2 3 4
Работа генераторов построена на принципе запоминания контекста выполнения функции. Если не вдаваться в подробности работы Python-интерпретатора, то функция-генератор «запоминает», на каком месте она остановилась, и может продолжить своё выполнение после ключевого слова yield
.
Рассмотрим простой пример.
def simple_generator():
yield 1
yield 2
return 3
Посмотрим, что будет, если вызвать такой код:
gen = simple_generator()
print(next(gen))
print(next(gen))
print(next(gen))
1 2
--------------------------------------------------------------------------- StopIteration Traceback (most recent call last) <ipython-input-3-2721680b9d7d> in <module> 2 print(next(gen)) 3 print(next(gen)) ----> 4 print(next(gen)) StopIteration: 3
То есть функция действительно запоминает, где она остановилась после каждого вызова функции next
.
Генераторы удобны и для создания генераторных выражений — generator expressions
. Особенно это полезно, если нужно сгенерировать много объектов, а память расходовать жалко. Код выглядит так:
gen_exp = (x for x in range(100000))
print(gen_exp)
<generator object <genexpr> at 0x7ff8de407b30>
Есть ещё небольшой синтаксический сахар, связанный с генераторами. Представим функцию, внутри которой есть цикл по элементам списка, и они выводятся один за другим.
numbers = [1, 2, 3]
def func():
for item in numbers:
yield item
for item in func():
print(item)
1 2 3
За счёт генератора конструкция сокращается до такой:
def func():
yield from numbers
Бывает очень полезно, но на практике используется довольно редко.
Затронем тему корутин — основных строительных блоков асинхронного программирования на Python. Они появились в ответ на невозможность использования полноценного распараллеливания программы с помощью тредов (потоков) из-за работы GIL
. Для тех, кому интересно узнать подробнее про Global Interpreter Lock
, есть хорошая обзорная статья.
Корутина — это генератор. Однако в PEP-342 предложили расширить возможности генераторов, добавив туда несколько конструкций, о которых сейчас пойдёт речь.
Представьте себе метод, который на вход получает какое-то значение, как-то его обрабатывает и отдаёт результат. Пусть это будет функция, рассчитывающая количество денег на вашем счете через $N$ лет при определённом проценте. На вход функция принимает процент по депозиту в годовых и сумму на счёте.
import math
def cash_return(deposit: int, percent: float, years: int) -> float:
value = math.pow(1 + percent / 100, years)
return round(deposit * value, 2)
Теперь узнаем, сколько вы получите денег через $5$ лет, если сумма на депозите — $1 000 000$ рублей, а ставка по депозиту — $5%%$ годовых.
cash_return(1_000_000, 5, 5)
1276281.56
Теперь вы хотите посмотреть на то, как будет меняться итоговая сумма в зависимости от депозита. Тут приходит на помощь корутина.
import math
def cash_return_coro(percent: float, years: int) -> float:
value = math.pow(1 + percent / 100, years)
while True:
try:
deposit = (yield)
yield round(deposit * value, 2)
except GeneratorExit:
print('Выход из корутины')
raise
Запустим корутину с теми же условиями — $5$ лет и $5%%$ годовых.
coro = cash_return_coro(5, 5)
next(coro)
values = [1000, 2000, 5000, 10000, 100000]
for item in values:
print(coro.send(item))
next(coro)
coro.close()
1276.28 2552.56 6381.41 12762.82 127628.16 Выход из корутины
Разберёмся, что произошло. В коде видно четыре новых конструкции: (yield)
, send(...)
, close()
и GeneratorExit
. Корутины могут не только отдавать значения и запоминать место, где остановился код, но и ждать новых значений. Для этого ввели конструкцию (yield)
, которая позволяет принимать набор параметров. Так как приём параметров в корутине происходит необычным способом, то и отправка параметров сделана с помощью специальной функции send(...)
, через которую можно передать в функцию необходимые параметры. В конце ещё можно вызвать метод close()
, который прекратит выполнение корутины. Когда вы вызываете метод close()
, выбрасывается исключение GeneratorExit
, которое можно перехватить и грамотно обработать.
Ещё одно преимущество — возможность запомнить контекст выполнения. В функции cash_return_coro
нет необходимости вычислять переменную value
каждый раз, когда вы хотите посчитать сумму. Недостатком такого подхода можно назвать большее количество кода, который надо написать, чтобы всё могло грамотно работать.
Асинхронное программирование в мире Python-разработки на пике популярности. Про него пишут статьи и делают доклады на конференциях. Концепция уже прижилась и во многих других популярных языках. Давайте восполним пробелы и погрузимся в работу с асинхронным кодом на Python.
Раньше разработчики не сильно заостряли своё внимание на типе выполняемых задач внутри приложения — это было не нужно для индустрии в целом. Все писали достаточно большие монолитные приложения, а проблемы с производительностью обычно решались на уровне горизонтального масштабирования: через потоки, процессы или даже через несколько приложений на разных виртуальных машинах.
Сейчас использование только процессов и потоков не даёт нужной производительности. Рассмотрим три основных типа задач, с которыми сталкивается большинство разработчиков:
Подробнее об этом можно прочитать в англоязычных статьях: - о значении терминов CPU bound и I/O bound, - о производительности.
Из-за массового перехода на микросервисы количество сетевого взаимодействия между системами многократно возросло, как и нагрузка на базы данных. Проблемы работы с сетью или с доступом к БД относятся к I/O bound-задачам. То есть их основная работа — ожидание обработки запроса к внешней системе. Такой класс задач в монолитных системах решался пулом потоков — thread pool. Однако, его стало не хватать из-за достаточно интенсивной нагрузки на сеть между множеством сервисов.
Классический метод решения I/O bound задач — добавление ресурсов к существующим системам. Однако, многие компании не могут позволить себе «заливать всё железом» — докупать новые железные серверы, вместо оптимизации кода. Например, Instagram может себе такое позволить, поэтому они до сих пор используют Django даже с учётом всей нагрузки.
Перейдём к практике. Представьте приложение, которое ходит на некий сайт-агрегатор, достаёт данные по фильмам и сохраняет в БД. Код будет выглядеть так (ссылка на сайт выдуманная):
import requests
def do_some_logic(data):
pass
def save_to_database(data):
pass
data = requests.get('https://data.aggregator.com/films')
processed_data = do_some_logic(data)
save_to_database(data)
Этот код достаточно линейный. Если вместо загрузки фильмов в БД такой код будет выполнять отдачу данных о фильмах с этого же сайта, то при достаточной нагрузке приложение начнёт сильно проседать по скорости ответа клиентам. При этом бо́льшую часть времени код будет просто ждать запроса от клиента, делать запрос к сайту https://data.aggregator.com/films и отдавать данные. То есть в эти моменты интерпретатор не будет делать никаких полезных действий, а клиенты будут ждать.
Схематично изобразить выполнение программы можно вот так:
Теперь определим тип задачи в каждой ячейке:
Интуитивно выполнение программы кажется примерно таким, как указано на картинке выше.
Однако, если привести картинку в соответствие с реальностью, получим следующий результат:
То есть бо́льшую часть времени программа ждёт ввода/вывода, а меньшая часть времени отводится на выполнение полезной работы.
Эту проблему можно решить, распараллелив код на процессы и потоки. Такой вариант поможет, но на короткое время — при таком подходе сильно увеличатся расходы ресурсов сервера. Плюс количество допустимых процессов и потоков ограничено. То есть либо закончится оперативная память под потоки, либо закончатся ядра под процессы. Вишенкой на торте становится GIL
, который даёт работать только одному потоку в единицу времени. Это не позволяет эффективно использовать массовый параллелизм на потоках и добавляет свои накладные расходы, хоть и не очень заметные.
Посмотрим, как применение потоков сказывается на выполнении программы:
Действительно, два потока почти в два раза лучше отрабатывают I/O bound задачи. Но к сожалению, при таком подходе очень просто ошибиться и столкнуться с проблемой «состояния гонок». Можно попробовать вариант с корутинами, но его сложнее реализовать. И при таком способе не получится создать много потоков, так как они потребляют гораздо больше ОЗУ, чем корутины. Также написание многопоточного кода требует от разработчика большей внимательности, чем при написании линейного.
Стоит внимательнее присмотреться к проблеме. Всё ещё бо́льшую часть времени интерпретатор не совершает активных действий, а только ждёт ответа от ОС, завершилась ли та или иная операция ввода/вывода. В целом процессы и потоки не сильно помогут, ведь интерпретатор будет по-прежнему простаивать на каждом из них. При этом появится много накладных расходов на переключение контекста между процессами или на потребление оперативной памяти потоками, что может только ухудшить положение.
Все эти проблемы необходимо решать эффективно. С этим может помочь использование асинхронного кода.
Итак, вы добрались до сердца асинхронных программ в Python — цикла событий. Чтобы понять, как он работает, обратимся к простой реализации, которую предложил Девид Бизли (David Beazley) в 2009 году. Она хороша тем, что не содержит сложных конструкций, которыми сейчас обросли популярные реализации цикла событий на Python. Эта часть будет построена на разборе кода и практик, которые применяются для разработки цикла событий на основе кода Бизли, и как учитывать эти знания при разработке асинхронных приложений на Python. Код уже приведён к современной версии Python.
Начнём с архитектуры цикла событий.
Расмотрим блоки:
Первым стоит рассмотреть работу планировщика. Его основные функции — приём и справедливая обработка списка задач.
import logging
from typing import Generator
from queue import Queue
class Scheduler:
def __init__(self):
self.ready = Queue()
self.task_map = {}
def add_task(self, coroutine: Generator) -> int:
new_task = Task(coroutine)
self.task_map[new_task.tid] = new_task
self.schedule(new_task)
return new_task.tid
def exit(self, task: Task):
del self.task_map[task.tid]
def schedule(self, task: Task):
self.ready.put(task)
def _run_once(self):
task = self.ready.get()
try:
result = task.run()
except StopIteration:
self.exit(task)
return
self.schedule(task)
def event_loop(self):
while self.task_map:
self._run_once()
Вся работа происходит в функции event_loop()
, которая просто достаёт задачи одну за другой. В функции _run_once()
идёт обработка итерации цикла событий, в которой поочерёдно берутся и запускаются задачи для обработки. Если задача не завершилась, то она ставится заново в очередь задач self.ready
. Выполненные задачи нужно убрать из планировщика функцией exit()
.
Для добавления задачи используйте функцию add_task()
. Она принимает корутину для выполнения и создаёт с ней задачу в планировщике. Чтобы поставить задачу напрямую в планировщик, необходимо вызвать функцию schedule()
.
Далее разберёмся с устройством задачи.
import types
from typing import Generator, Union
class Task:
task_id = 0
def __init__(self, target: Generator):
Task.task_id += 1
self.tid = Task.task_id # Task ID
self.target = target # Target coroutine
self.sendval = None # Value to send
self.stack = [] # Call stack
# Run a task until it hits the next yield statement
def run(self):
while True:
try:
result = self.target.send(self.sendval)
if isinstance(result, types.GeneratorType):
self.stack.append(self.target)
self.sendval = None
self.target = result
else:
if not self.stack:
return
self.sendval = result
self.target = self.stack.pop()
except StopIteration:
if not self.stack:
raise
self.sendval = None
self.target = self.stack.pop()
Сама по себе задача — обёртка над корутиной. У каждой задачи есть свой id
, который учитывается в планировщике в словаре task_map
. На его заполненность смотрит планировщик при выполнении задач.
Другая особенность задач — возможность выполнения корутин методом run()
. Давайте посмотрим, как они выполняются в рамках задачи. Предположим, что есть корутина, которая вызывает другую корутину, а та — третью. Например, вот такой код:
def double(x):
yield x * x
def add(x, y):
yield from double(x + y)
def main():
result = yield add(1, 2)
print(result)
yield
Код является небольшой модификацией кода Бизли из его выступления. Теперь попробуем выполнить эту цепочку корутин в рамках Task
.
task = Task(main())
task.run()
9
Таким же образом будут выполняться и остальные корутины в рамках планировщика. Осталось только расширить планировщик для работы с I/O-операциями.
Теперь у нас достаточно знаний, чтобы без труда освоить основную встроенную библиотеку для асинхронного программирования — asyncio
.
С версии Python 3.5 в язык добавили специальный синтаксис — async/await. Он позволяет использовать «нативные» корутины, которые теперь являются частью языка. Они разделяют генераторы от асинхронного кода, что позволяет создавать асинхронные генераторы и ускорять работу асинхронного кода.
Посмотрим, как выглядит простая программа с использованием async/await.
import random
import asyncio
async def func():
r = random.random()
await asyncio.sleep(r)
return r
async def value():
result = await func()
print(result)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(value())
loop.close()
В целом изменений немного. Переменная loop
— это не что иное, как планировщик задач. Он работает по схожим принципам с тем, что рассматривался ранее. Теперь все функции переключаются с помощью await
.
Познакомимся с основными функциями asyncio
, которые часто встречаются на практике:
gather
— выполняет список корутин одновременно и дожидается результата выполнения всех корутин.sleep
— заставляет корутину уснуть на определённое количество секунд.wait
/wait_for
— удобные функции, чтобы дождаться выполнения уже запущенной корутины.Также стоит ознакомиться с основными функциями event_loop
:
get_event_loop
— получить новый объект event_loop
или тот, что уже существует. При этом одновременно может существовать только один объект event_loop
.run_until_complete
/run
— удобные функции для запуска и проверки асинхронных функций.shutdown_asyncgens
— одна из самых недооценённых функций цикла событий, которая позволяет правильно завершить выполнение цикла событий и всех корутин.call_soon
— позволяет запланировать выполнение корутины, но не ждать её выполнения. Таким образом можно вечно ставить на выполнение одну и ту же функцию.Теперь стоит поговорить про ключевые различия между asyncio и предложенной реализацией цикла событий. Asyncio работает на функциях обратного вызова или колбэках (callback). Этот механизм запускает задачи «честнее», чем текущий планировщик. Каждая корутина по-честному ставится в очередь и исполняется. В простом планировщике переключения не произойдёт, пока вся цепочка корутин не выполнится, что блокирует выполнение остальных задач. Однако, у колбэков есть и свой недостаток — callback hell. Это состояние, когда после вызова каждой функции нужно вызвать ещё одну функцию и ещё одну функцию... Получаются интересные фрагменты кода:
func1.add_callback(
func2.add_callback(
func3.add_callback(func4)
)
)
К счастью, этого удаётся избежать через синтаксис async/await.
await func4()
await func3()
await func2()
await func1()
Такое поведение возможно благодаря введению класса Future
. По сути, такие объекты спасают от колбэков и делают код более линейным. В современных версиях Python Future
-объекты для нативных корутин не нужны.
В прошлых уроках мы познакомились с областями применения асинхронного программирования в Python и его реализацией.
Теперь познакомимся с тремя фреймворками:
Twisted
— один из старейших. AIOHTTP
— самый нативный. FastAPI
— самый узкоспециализированный.Разумеется, их гораздо больше. Но пока начнём с малого.
Несмотря на то что Django 3.х гордо называет себя асинхронным фреймворком, большинство его кода, включая ORM, всё ещё работает в синхронном режиме. Создатели фреймворка медленно, но уверенно идут к поддержке асинхронности, хотя сама асинхронность в этом фреймворке — скорее дополнение, чем основной путь разработки. В Django работают оба режима — синхронный и асинхронный. У фреймворка есть возможность переключаться между ними в случае необходимости.
Во второй версии Django проблема с I/O-bound-операциями решалась при помощи Django-Channels. Эта библиотека расширила возможности Django, позволив разработчикам асинхронно обрабатывать HTTP-соединения и веб-сокеты. Для этого был добавлен абстрактный слой между работой с асинхронными соединениями и синхронным Django. На основе данных, полученных из асинхронных соединений, выполняются необходимые синхронные задачи.
Django-Channel
разделён на нескольких сущностей:
Канал (Channel) — это очередь событий, в которую поступают данные из соединения. Вы можете создать любое количество каналов, привязав их к URL в классическом для Django стиле. При возникновении нового события, например, установки соединения или получения данных, информацию получат все потребители.
Потребитель (Consumer) — привязан к одному каналу или целой группе. Он умеет прослушивать поступающие сообщения и реагировать на них нужным образом. Каждому потребителю известно, из какого канала пришло сообщение.
Пример создания простого приложения можно найти в документации.
Под капотом Channels используется ASGI-сервер Daphne
, написанный на Twisted
. Давайте узнаем о нём побольше.
Во времена древних богов, воителей и королей до популяризации асинхронных фреймворков и до появления в Python собственной реализации event-loop, простой народ искал защитника. Одним из первых асинхронных инструментов был Twisted.
На самом деле это было не так уж давно: этот проект относительно недавно достиг совершеннолетия!
Twisted — не просто веб-фреймворк для создания сайтов. Это набор модулей на все случаи жизни для написания любых клиентских или серверных веб-приложений. Также у него есть реализация GUI.
Twisted состоит из нескольких ключевых понятий:
1 Protocol.
Протоколы описывают, как приложение будет получать и отдавать данные. В Twisted реализовано много готовых протоколов для работы с сетью. Каждый объект протокола обязательно реализует метод, создающий соединение, и три метода-коллбека, которые вызываются:
2 Factory.
Класс, который отвечает за конфигурацию, управление и создание объектов протокола на каждое соединение. Здесь располагается код, который отвечает за поведение программы при неуспешной попытке соединения или его потере.
3 Reactor.
Это собственная реализация event-loop в Twisted — сердце любого Twisted-приложения. Объект реактора не создаётся явно, а просто импортируется из библиотеки. После вызова reactor.run()
он будет отвечать за весь цикл событий в потоке:
4 Deffered-объекты.
Отличительная и самая узнаваемая черта Twisted. Они нужны для формирования цепочки обратных вызовов. Рассмотрим их работу на примере:
def toInt(data): # Первый обработчик превращает данные в int
return int(data)
def increment_number(data): # Второй обработчик увеличивает значение на единицу
return data + 1
def print_result(data): # Третий обработчик выводит данные в консоль
print(data)
def handleFailure(f): # Обработчик ошибок
print("OOPS!")
def get_deffered():
# Объявляем Deffered-объект
d = defer.Deferred()
# Добавляем обработчики в нужном порядке
return d.addCallbacks(toInt,
handleFailure).addCallbacks(increment_number,
handleFailure).addCallback(print_result)
Deffered-объект будет ожидать поступления данных, а после их обработки передаст выполнение одному из коллбеков. Они бывают двух видов: обработчики данных — callback и обработчики ошибок — errback. Callback принимает входные данные, а errback — объект Failure, состоящий из возникшего Exception и traceback. Twisted выполнит всю цепочку, выбирая в каждой паре коллбеков подходящий.
Например, приложение, которое выводит заглавными буквами на главной странице случайную фразу, может выглядеть так:
import aiohttp
from aiohttp import web
async def get_phrase():
# Метод, который обращается к стороннему ресурсу и забирает данные
async with aiohttp.ClientSession() as session:
async with session.get('https://fish-text.ru/get', params={'type': 'title'}) as response:
result = await response.json(content_type='text/html; charset=utf-8')
return result.get('text')
async def index_handler(request):
return web.Response(text=await get_phrase()) # Формируем ответ для клиента
async def response_signal(request, response):
response.text = response.text.upper() # Увеличиваем все буквы в ответе
return response
async def make_app():
app = web.Application() # Объявляем приложение — ваш веб-сервер
app.on_response_prepare.append(response_signal) # Добавим сигнал
app.add_routes([web.get('/', index_handler)]) # Добавим необходимый URL
return app
web.run_app(make_app()) # Запускаем приложение
Вы могли заметить, что структура приложения гораздо проще, чем у Twisted. Сравните реализации протокола: в Twisted вы должны описать три отдельных метода на каждое событие, и держать в памяти, когда вызывается каждый из них, и какие Deffered-цепочки в них задействованы. В aiohttp код пишется линейно и в нём нет коллбэков. Благодаря простоте кода и отличной документации aiohttp быстро обрёл популярность.
aiohttp
есть много функций для комфортной разработки асинхронного веб-приложения. Например, он поддерживает jinja-шаблоны и раздачу статических файлов, а также у него есть middlewares, пользовательские сессии и debugtoolbar.Fast API досталась от Starlette классическая функциональность веб-фреймворка:
Pydantic отвечает за валидацию API при помощи встроенной в Python аннотации типов. Кроме встроенных типов, Pydantic обзавёлся своими, например, Color
для цветов в css, а ещё Json
, AnyUrl
или UUID5
.
В результате этого союза получился быстрый (как по скорости работы, так и по написанию кода) фреймворк, со встроенной автоматической валидацией и сериализацией данных на основе описанных моделей. Ещё он умеет авторизировать пользователя через JWT, api-key или OAuth2 и самостоятельно генерировать документацию!
Благодаря строгой типизации во фреймворке есть встроенная генерация OpenAPI-файла. После запуска приложения вы получите готовую документацию для вашего API и интерфейс для её просмотра. К сожалению, в обратную сторону это не работает: по OpenAPI-файлу нельзя получить готовый API, поэтому придётся писать его самостоятельно :(
Ещё одна отличительная способность FastAPI — поддержка внедрения зависимостей. Это делает код более гибким: вы можете переиспользовать нужные наборы параметров в разных частях приложения. А ещё ваши зависимости могут тоже иметь зависимости :)
В качестве примера рассмотрим реализацию API, которое складывает два числа.
from fastapi import FastAPI
from pydantic import BaseModel
from pydantic.fields import Optional, Field
# Объявляем приложение и задаём ему название, которое будет отображаться в документации
app = FastAPI(title="Простые математические операции")
# Объявляем модель, которая будет валидировать данные, поступающие от пользователя
# При несовпадении данных со схемой пользователю вернётся ошибка валидации
class Add(BaseModel):
first_number: int = Field(title='Первое слагаемое')
second_number: Optional[int] = Field(title='Второе слагаемое')
# Объявляем модель для формирования результата
# При несовпадении данных со схемой вы получите подробный traceback :)
class Result(BaseModel):
result: int = Field(title='Результат')
# Добавляем URL и привязываем к нему модели запроса и ответа
@app.post("/add", response_model=Result)
async def create_item(item: Add):
# Выполняем вычисления
return {
'result': item.first_number + item.second_number or 1
}
exception_handler
для оповещения пользователя об ошибках, магия автогенерации документации не сработает: Swagger не будет знать о новых вариантах ответа. Вам придётся дополнить OpenAPI вручную.Фреймворк заточен для написания API, прост в освоении и обладает широким набором возможностей.
Работая с Django, вы привыкли к структуре проекта, которую предоставляет фреймворк. Но у FastAPI, как и у любых других микрофреймворков, отсутствует заранее предусмотренная структура проекта. Чтобы проект не превратился в бесформенную лужу, у него должна быть структура. Поэтому создадим каркас для проекта так, как это обычно делают в компаниях.
Начнём проект с установки зависимостей. Вам понадобятся следующие библиотеки:
aioredis==1.3.1
elasticsearch[async]==7.9.1
fastapi==0.61.1
orjson==3.4.1
uvicorn==0.12.2
uvloop==0.14.0
# core/config.py
import os
from logging import config as logging_config
from core.logger import LOGGING
# Применяем настройки логирования
logging_config.dictConfig(LOGGING)
# Название проекта. Используется в Swagger-документации
PROJECT_NAME = os.getenv('PROJECT_NAME', 'movies')
# Настройки Redis
REDIS_HOST = os.getenv('REDIS_HOST', '127.0.0.1')
REDIS_PORT = int(os.getenv('REDIS_PORT', 6379))
# Настройки Elasticsearch
ELASTIC_HOST = os.getenv('ELASTIC_HOST', '127.0.0.1')
ELASTIC_PORT = int(os.getenv('ELASTIC_PORT', 9200))
# Корень проекта
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# core/logger.py
LOG_FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
LOG_DEFAULT_HANDLERS = ['console', ]
# В логгере настраивается логгирование uvicorn-сервера.
# Про логирование в Python можно прочитать в документации
# https://docs.python.org/3/howto/logging.html
# https://docs.python.org/3/howto/logging-cookbook.html
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'verbose': {
'format': LOG_FORMAT
},
'default': {
'()': 'uvicorn.logging.DefaultFormatter',
'fmt': '%(levelprefix)s %(message)s',
'use_colors': None,
},
'access': {
'()': 'uvicorn.logging.AccessFormatter',
'fmt': "%(levelprefix)s %(client_addr)s - '%(request_line)s' %(status_code)s",
},
},
'handlers': {
'console': {
'level': 'DEBUG',
'class': 'logging.StreamHandler',
'formatter': 'verbose',
},
'default': {
'formatter': 'default',
'class': 'logging.StreamHandler',
'stream': 'ext://sys.stdout',
},
'access': {
'formatter': 'access',
'class': 'logging.StreamHandler',
'stream': 'ext://sys.stdout',
},
},
'loggers': {
'': {
'handlers': LOG_DEFAULT_HANDLERS,
'level': 'INFO',
},
'uvicorn.error': {
'level': 'INFO',
},
'uvicorn.access': {
'handlers': ['access'],
'level': 'INFO',
'propagate': False,
},
},
'root': {
'level': 'INFO',
'formatter': 'verbose',
'handlers': LOG_DEFAULT_HANDLERS,
},
}
# main.py
import logging
import uvicorn
from fastapi import FastAPI
from fastapi.responses import ORJSONResponse
from core import config
from core.logger import LOGGING
app = FastAPI(
# Конфигурируем название проекта. Оно будет отображаться в документации
title=config.PROJECT_NAME,
# Адрес документации в красивом интерфейсе
docs_url='/api/openapi',
# Адрес документации в формате OpenAPI
openapi_url='/api/openapi.json',
# Можно сразу сделать небольшую оптимизацию сервиса
# и заменить стандартный JSON-сереализатор на более шуструю версию, написанную на Rust
default_response_class=ORJSONResponse,
)
if __name__ == '__main__':
# Приложение может запускаться командой
# `uvicorn main:app --host 0.0.0.0 --port 8000`
# но чтобы не терять возможность использовать дебагер,
# запустим uvicorn сервер через python
uvicorn.run('main:app', host='0.0.0.0', port=8000)
Запустите сервис. После запуска должна открываться страница с документацией http://0.0.0.0:8000/api/openapi
Теперь объявим в модуле db
соединения с Elasticsearch и Redis. Создайте следующие файлы:
# db/elastic.py
from typing import Optional
from elasticsearch import AsyncElasticsearch
es: Optional[AsyncElasticsearch] = None
# Функция понадобится при внедрении зависимостей
async def get_elastic() -> AsyncElasticsearch:
return es
# db/redis.py
from typing import Optional
from aioredis import Redis
redis: Optional[Redis] = None
# Функция понадобится при внедрении зависимостей
async def get_redis() -> Redis:
return redis
Изменим main.py, чтобы подключить соединения к базам.
import logging
import aioredis
import uvicorn as uvicorn
from elasticsearch import AsyncElasticsearch
from fastapi import FastAPI
from fastapi.responses import ORJSONResponse
from core import config
from core.logger import LOGGING
from db import elastic
from db import redis
app = FastAPI(title=config.PROJECT_NAME, docs_url='/api/openapi',
openapi_url='/api/openapi.json', default_response_class=ORJSONResponse)
@app.on_event('startup')
async def startup():
redis.redis = await aioredis.create_redis_pool(
(config.REDIS_HOST, config.REDIS_PORT),
minsize=10,
maxsize=20
)
elastic.es = AsyncElasticsearch(hosts=[f'{config.ELASTIC_HOST}:{config.ELASTIC_PORT}'])
@app.on_event('shutdown')
async def shutdown():
await redis.redis.close()
await elastic.es.close()
if __name__ == '__main__':
uvicorn.run('main:app', host='0.0.0.0', port=8000,
log_config=LOGGING, log_level=logging.DEBUG)
Создадим первый обработчик HTTP-запросов. Создадим модуль api
, а внутри него — модуль v1
. Внутри v1
создайте файл film.py
.
from fastapi import APIRouter
from pydantic import BaseModel
# Объект router, в котором регистрируем обработчики
router = APIRouter()
# FastAPI в качестве моделей использует библиотеку pydantic
# https://pydantic-docs.helpmanual.io
# У неё есть встроенные механизмы валидации, сериализации и десериализации
# Также она основана на дата-классах
# Модель ответа API
class Film(BaseModel):
id: str
title: str
# С помощью декоратора регистрируем обработчик film_details
# На обработку запросов по адресу <some_prefix>/some_id
# Позже подключим роутер к корневому роутеру
# И адрес запроса будет выглядеть так — /api/v1/film/some_id
# В сигнатуре функции указываем тип данных, получаемый из адреса запроса (film_id: str)
# И указываем тип возвращаемого объекта — Film
@router.get('/{film_id}', response_model=Film)
async def film_details(film_id: str) -> Film:
return Film(id='some_id', title='some_title')
Подключим роутер фильмов к приложению. Отредактируем main.py
.
import logging
import aioredis
import uvicorn
from elasticsearch import AsyncElasticsearch
from fastapi import FastAPI
from fastapi.responses import ORJSONResponse
from api.v1 import film
from core import config
from core.logger import LOGGING
from db import elastic, redis
app = FastAPI(title=config.PROJECT_NAME, docs_url='/api/openapi',
openapi_url='/api/openapi.json', default_response_class=ORJSONResponse)
@app.on_event('startup')
async def startup():
redis.redis = await aioredis.create_redis_pool(
(config.REDIS_HOST, config.REDIS_PORT),
minsize=10,
maxsize=20
)
elastic.es = AsyncElasticsearch(hosts=[f'{config.ELASTIC_HOST}:{config.ELASTIC_PORT}'])
@app.on_event('shutdown')
async def shutdown():
await redis.redis.close()
await elastic.es.close()
app.include_router(film.router, prefix='/api/v1/film', tags=['film'])
if __name__ == '__main__':
uvicorn.run('main:app', host='0.0.0.0', port=8000)
Запустите приложение. В документации появится описание добавленного метода http://localhost:8000/api/openapi
.
Теперь настало время реализовать бизнес-логику. FastAPI пропагандирует использование подхода внедрения зависимостей — dependency injection.
DI позволяет развязать структуру кода. При таком подходе бизнес-логика перестаёт быть зависимой от работы с конкретной БД или фреймворком, что позволяет быстро менять их при необходимости. Второе преимущество — упрощение тестирования приложений, так как не придётся делать множество monkey patch в коде тестов. Это упростит читаемость тестов для других разработчиков и сделает код чище.
В FastAPI зависимости указывается с помощью функции Depends, в которую передаётся функция-провайдер. Она возвращает необходимую зависимость. Функция-провайдер также предоставляет свои зависимости.
Перейдём к практике, чтобы сразу ощутить магию.
В модуле services
создайте файл film.py
.
from functools import lru_cache
from aioredis import Redis
from elasticsearch import AsyncElasticsearch
from fastapi import Depends
from db.elastic import get_elastic
from db.redis import get_redis
# FilmService содержит бизнес-логику по работе с фильмами.
# Никакой магии тут нет. Обычный класс с обычными методами.
# Этот класс ничего не знает про DI — максимально сильный и независимый.
class FilmService:
def __init__(self, redis: Redis, elastic: AsyncElasticsearch):
self.redis = redis
self.elastic = elastic
# get_film_service — это провайдер FilmService.
# С помощью Depends он сообщает, что ему необходимы Redis и Elasticsearch
# Для их получения вы ранее создали функции-провайдеры в модуле db
# Используем lru_cache-декоратор, чтобы создать объект сервиса в едином экземпляре (синглтона)
@lru_cache()
def get_film_service(
redis: Redis = Depends(get_redis),
elastic: AsyncElasticsearch = Depends(get_elastic),
) -> FilmService:
return FilmService(redis, elastic)
Граф зависимостей будет выглядеть следующим образом.
Создадим модель фильма в models/film.py. Ранее вы уже создавали модель Film
в api/v1/film.py, но она используется исключительно для представления данных по HTTP. Внутренние модели, одну из которых вы создаёте сейчас, используется только в рамках бизнес-логики.
import orjson
# Используем pydantic для упрощения работы при перегонке данных из json в объекты
from pydantic import BaseModel
def orjson_dumps(v, *, default):
# orjson.dumps возвращает bytes, а pydantic требует unicode, поэтому декодируем
return orjson.dumps(v, default=default).decode()
class Film(BaseModel):
id: str
title: str
description: str
class Config:
# Заменяем стандартную работу с json на более быструю
json_loads = orjson.loads
json_dumps = orjson_dumps
Реализуем бизнес-логику для получения фильмов по id
.
class FilmService:
def __init__(self, redis: Redis, elastic: AsyncElasticsearch):
self.redis = redis
self.elastic = elastic
async def get_by_id(self, film_id: str) -> Optional[Film]:
film = await self._film_from_cache(film_id)
if not film:
film = await self._get_film_from_elastic(film_id)
if not film:
return None
await self._put_film_to_cache(film)
return film
async def _get_film_from_elastic(self, film_id: str) -> Optional[Film]:
doc = await self.elastic.get('movies', film_id)
return Film(**doc['_source'])
async def _film_from_cache(self, film_id: str) -> Optional[Film]:
data = await self.redis.get(film_id)
if not data:
return None
film = Film.parse_raw(data)
return film
async def _put_film_to_cache(self, film: Film):
await self.redis.set(film.id, film.json(), expire=FILM_CACHE_EXPIRE_IN_SECONDS)
Подключите сервис к API.
from http import HTTPStatus
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from services.film import FilmService, get_film_service
router = APIRouter()
class Film(BaseModel):
id: str
title: str
# Внедряем FilmService с помощью Depends(get_film_service)
@router.get('/{film_id}', response_model=Film)
async def film_details(film_id: str, film_service: FilmService = Depends(get_film_service)) -> Film:
film = await film_service.get_by_id(film_id)
if not film:
raise HTTPException(status_code=HTTPStatus.NOT_FOUND, detail='film not found')
# Перекладываем данные из models.Film в Film
# Обратите внимание, что у модели бизнес-логики есть поле description
# Которое отсутствует в модели ответа API.
# Если бы использовалась общая модель для бизнес-логики и формирования ответов API
# вы бы предоставляли клиентам данные, которые им не нужны
# и, возможно, данные, которые опасно возвращать
return Film(id=film.id, title=film.title)
Вы познакомились с FastAPI и научились настраивать проект с нуля. Также вы познакомились с подходом внедрения зависимостей, научились работать с базовыми командами Redis get и set. А ещё поняли, зачем разделять модели бизнес-логики и ответов API.