От начального до продвинутого уровня
Асинхронное программирование кажется сложным и пугающим для новичков.
Но это один из самых мощных инструментов, которые вы можете добавить в свой арсенал Python.
Представьте себе, что вы пишете код, который никогда не простаивает в ожидании ответа — ваши программы становятся быстрее, отзывчивее и способны обрабатывать несколько задач одновременно.
В этой статье я проведу вас шаг за шагом через 9 уровней от основ до продвинутых техник параллелизма. Независимо от того, новичок ли вы в асинхронном программировании или хотите совершенствовать свои навыки, это руководство предоставит вам практические знания и примеры для освоения асинхронного программирования на Python.
Уровень 0: Понимание необходимости асинхронного программирования
Рассмотрим скрипт, который извлекает данные с нескольких веб-сайтов. При синхронном программировании каждый запрос блокирует программу до его завершения:
import requests
import time
# The urls list could be much longer
urls = ["http://example.com",
"http://example.org",
"http://example.net/",]
start_time = time.time()
for url in urls:
response = requests.get(url)
print(response.status_code)
print(f"Sync code cost {time.time() - start_time:.2f} seconds")
# Sync code cost 0.64 seconds
Приведенный выше код просмотрел 3 URL адреса один за другим синхронно т.е обрабатал каждый URL-адрес по порядку, переходя к следующему, пока не завершится текущий.
В общей сложности это заняло 0,64 секунды.
Выглядит приемлемо?
Представьте, что вы ждете по 3 секунды для каждого из 10 URL адресов, в итоге вы получите 30-секундный процесс. Не говоря уже о 100 URL-адресах, 100000 URL-адресах и так далее. Выполнение программы займет много времени.
Следующий сценарий, так называемый сценарий с привязкой ввода-вывода, демонстрирует асинхронный подход для решения рассматриваемой задачи.
import aiohttp
import asyncio
import time
async def fetch_url(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
print(f"Status: {response.status}")
async def main():
urls = ["http://example.com",
"http://example.org",
"http://example.net/",]
start_time = time.time()
await asyncio.gather(*(fetch_url(url) for url in urls))
print(f"Async code cost {time.time() - start_time:.2f} seconds")
asyncio.run(main())
# Async code cost 0.22 seconds
Приведенный выше код является асинхронной версией для реализации той же задачи. На это ушло всего 0,22 секунды!
Почему и как это возможно?
Потому что, используя асинхронные методы кодирования, можно выполнять несколько задач одновременно (в данном случае выполнять все запросы одновременно), что значительно сокращает время ожидания.
Если вы не можете полностью понять приведенный выше асинхронный код, не беспокойтесь об этом, давайте сейчас рассмотрим асинхронные методы Python.
Уровень 1: Понимание цикла обработки событий
Сердцем асинхронного программирования на Python является цикл обработки событий.
Мы можем рассматривать его как главный планировщик, который координирует выполнение задач, не приостанавливая работу всей программы, поэтому магия асинхронности работает.
По сути, цикл обработки событий делает возможным неблокирующее выполнение, а это означает, что пока одна задача ожидает выполнения (например, для выполнения трудоемкой операции ввода-вывода), другие задачи могут продолжать выполняться.
Неблокируемость — основное преимущество асинхронного программирования. Это отличие от блокирования операций в синхронном программировании, где вся программа должна ожидать завершения одной задачи, прежде чем переходить к следующей. И в этом корень ненужных временных затрат.
Модуль asyncio в Python предоставляет простой способ реализации цикла обработки событий. Цикл обработки событий управляет выполнением и возобновлением каждой сопрограммы (сопрограмма — это специальная функция, которая может быть приостановлена и возобновлена, что позволяет выполнять неблокирующие операции), обеспечивая неблокирующие операции.
Разговоры стоят дешево, давайте посмотрим на какой-нибудь код.:
import asyncio
async def task_1():
print("Starting task 1")
await asyncio.sleep(2) # simulate a slow I/O operation
print("Task 1 done")
async def task_2():
print("Starting task 2")
await asyncio.sleep(1) # simulate another slow I/O operation
print("Task 2 done")
async def main():
await asyncio.gather(task_1(), task_2())
asyncio.run(main())
# Starting task 1
# Starting task 2
# Task 2 done
# Task 1 done
В приведенном выше коде asyncio.run(main()) запускает цикл обработки событий, который управляет сопрограммами task_1 и task_2. Цикл обработки событий запускает их одновременно, позволяя task_2 запускаться и завершаться до завершения task_1.
Вот почему асинхронные программы работают намного быстрее. Цикл обработки событий никогда никого не ждет. Если происходит медленная операция, он просто начинает выполнять другие действия и возвращается к обработке медленной операции позже. Он всегда продолжает усердно работать для вас:)
Но как цикл обработки событий узнает, когда нужно остановить текущую сопрограмму и перейти к другой?
Вот ключевые слова async и await.
Уровень 2: Умелое использование async и await
В Python есть два ключевых слова, которые используются повсеместно в мире асинхронности — async и await.
- async используется для определения сопрограммы, которая представляет собой функцию, которую можно приостанавливать и возобновлять, не блокируя другие операции. Когда вы определяете функцию с помощью async, вы указываете, что она может использовать await для передачи управления обратно в цикл обработки событий.
- функция await используется в асинхронной функции для приостановки ее выполнения до завершения ожидаемой сопрограммы. Когда используется функция await, она сообщает циклу обработки событий, что текущая сопрограмма ожидает некоторого результата, позволяя циклу обработки событий тем временем запускать другие сопрограммы.
Таким образом, благодаря ключевым словам async и await цикл обработки событий может знать, как правильно обрабатывать сопрограммы.
Цикл событий координирует вызовы await, приостанавливая текущую корутину и продолжая выполнение других задач. Это обеспечивает эффективное управление множеством корутин, делая асинхронное программирование идеальным для обработки операций, связанных с вводом-выводом, и операций с высокой задержкой. По сути, async определяет, где может происходить асинхронное поведение, в то время как await указывает, когда контроль следует вернуть циклу событий, что позволяет эффективно выполнять несколько задач одновременно.
Уровень 3: Управление корутинами как гуру с использованием asyncio
Вы уже видели предыдущие примеры с использованием модуля asyncio. По сути, это основной модуль в Python для реализации асинхронных программ.
Давайте вернемся к одному из предыдущих примеров для более глубокого изучения:
import asyncio
async def task_1():
print("Task 1 started")
await asyncio.sleep(2)
print("Task 1 finished")
return "Result 1"
async def task_2():
print("Task 2 started")
await asyncio.sleep(1)
print("Task 2 finished")
return "Result 2"
async def main():
await asyncio.gather(task_1(), task_2())
asyncio.run(main())
# Task 1 started
# Task 2 started
# Task 2 finished
# Task 1 finished
Метод asyncio.gather() часто используется для одновременного выполнения множества задач (корутин).
Он прост и сжат, но если вам необходим больший контроль над каждой задачей, можно явно управлять ими с помощью asyncio.create_task():
import asyncio
async def task_1():
print("Task 1 started")
await asyncio.sleep(2)
print("Task 1 finished")
return "Result 1"
async def task_2():
print("Task 2 started")
await asyncio.sleep(1)
print("Task 2 finished")
return "Result 2"
async def main():
t1 = asyncio.create_task(task_1())
t2 = asyncio.create_task(task_2())
# Wait for both tasks to finish
await t1
await t2
asyncio.run(main())
# Task 1 started
# Task 2 started
# Task 2 finished
# Task 1 finished
Результаты одинаковы при обоих подходах. Однако при выполнении корутин вторым способом можно применить больше индивидуальных контролей, например, отменить задачу 1 до её завершения.

Уровень 4: Отмена слишком долгих асинхронных задач
Ожидание завершения каждой корутины не всегда является наилучшим решением. В некоторых случаях вы можете захотеть напрямую отменить слишком долгую задачу. Это можно сделать интуитивно, вызвав метод cancel():
import asyncio
async def task_1():
print("Task 1 started")
try:
await asyncio.sleep(2)
except asyncio.CancelledError:
print("Task 1 was cancelled")
raise
print("Task 1 finished")
return "Result 1"
async def task_2():
print("Task 2 started")
await asyncio.sleep(1)
print("Task 2 finished")
return "Result 2"
async def main():
t1 = asyncio.create_task(task_1())
t2 = asyncio.create_task(task_2())
# Wait for Task 2 to finish
await t2
# Cancel Task 1 before it finishes
t1.cancel()
# Wait for Task 1 to handle the cancellation
try:
await t1
except asyncio.CancelledError:
print("Handled cancellation of Task 1")
asyncio.run(main())
# Task 1 started
# Task 2 started
# Task 2 finished
# Task 1 was cancelled
# Handled cancellation of Task 1
Как было показано выше, задание 1 было отменено до того, как оно было завершено.
Примечание: Задача 1 была запущена сразу после выполнения метода asyncio.create_task(task_1()). Время ожидания t1 предназначено для завершения задачи 1, поэтому подходящее время для вызова t1.cancel() находится между этими двумя командами.
Уровень 5: Обработка задач с таймаутом с помощью asyncio.wait_for()
Возможно, отмена корутины в некоторых ситуациях является слишком грубым подходом, и нам следует предоставить каждой корутине разумное время для обработки, прежде чем она станет слишком долгой.
Да, нам нужно ограничение по времени, и метод asyncio.wait_for() позволяет установить максимальный срок для завершения корутины.
import asyncio
async def slow_task():
await asyncio.sleep(5)
return "Task finished"
async def main():
try:
result = await asyncio.wait_for(slow_task(), timeout=2)
print(result)
except asyncio.TimeoutError:
print("Task timed out!")
asyncio.run(main())
# Task timed out!
Как показывает приведённый выше пример, мы применили метод asyncio.wait_for() и установили максимальное время ожидания в 2 секунды. Медленная задача завершилась с тайм-аутом, так как она заняла 5 секунд.
Уровень 6: Ограничение параллелизма с помощью asyncio.Semaphore для предотвращения перегрузки ресурсов
Асинхронные программы не всегда идеальны. Иногда они могут быть даже опасными.
Если вы запускаете слишком много корутин одновременно, ресурсы будут чрезмерно использованы, и ваш сервер окажется в блокировке.
Поэтому вам нужно знать о объекте asyncio.Semaphore. Он помогает ограничить количество конкурентных задач, которые могут выполняться одновременно, что особенно важно при обращении к общим ресурсам или внешним сервисам, которые не могут обрабатывать неограниченное количество одновременных запросов.
Например, следующая программа использует asyncio.Semaphore(), чтобы ограничить число корутин, запускаемых одновременно, до 5:
import asyncio
semaphore = asyncio.Semaphore(5)
async def limited_task(n):
async with semaphore:
print(f'Task {n} started')
await asyncio.sleep(1)
print(f'Task {n} finished')
async def main():
tasks = [limited_task(i) for i in range(10)]
await asyncio.gather(*tasks)
asyncio.run(main())
# Task 0 started
# Task 1 started
# Task 2 started
# Task 3 started
# Task 4 started
# Task 0 finished
# Task 1 finished
# Task 2 finished
# Task 3 finished
# Task 4 finished
# Task 5 started
# Task 6 started
# Task 7 started
# Task 8 started
# Task 9 started
# Task 5 finished
# Task 6 finished
# Task 7 finished
# Task 8 finished
# Task 9 finished
На основе результатов мы можем четко увидеть, что первые 5 задач были выполнены одновременно, а затем следующие 5.
Этот контроль за использованием ресурсов кажется тонким, но он убережет вас от неожиданных проблем в крупномасштабной производственной среде.
Уровень 7: Обработка ошибок асинхронного кода Python
Корректная обработка ошибок является ключевым моментом для надежного кода.
В большинстве случаев асинхронные программы используют те же подходы к обработке ошибок, что и синхронный код.
Однако для определенных исключений необходимо знать и использовать специфические объекты ошибок для асинхронного кода, такие как asyncio.CancelledError. (Целый список ошибок асинхронного Python можно найти здесь.)
Кроме специфичных ошибок, есть проблема, подверженная ошибкам, которую стоит упомянуть:
При использовании asyncio.create_task() для одновременного выполнения задач нужно быть осторожным в отношении стратегии обработки ошибок. Поскольку задача выполняется независимо, исключения должны обрабатываться как внутри задачи, так и путем ожидания задачи и последующего перехвата исключения.
В производственной среде мы можем не знать, есть ли у задачи блок try-catch или нет. Поэтому лучшей практикой всегда будет помещать await task внутрь структуры try-catch следующим образом:
import asyncio
async def task_1():
print("Task 1 started")
await asyncio.sleep(2)
print("Task 1 finished")
return "Result 1"
async def main():
t1 = asyncio.create_task(task_1())
t1.cancel()
try:
await t1
except asyncio.CancelledError:
print("Handled cancellation of Task 1")
asyncio.run(main())
# Handled cancellation of Task 1
Уровень 8: Асинхронная очередь: более быстрый паттерн производитель-потребитель
Очередь часто используется в паттерне производитель-потребитель. Она выступает в роли буфера между производителем(ями) и потребителем(ами), помогая разделить их операции и управлять коммуникацией в аккуратной структуре данных.
В Python существует встроенная реализация очереди — queue.Queue, а также асинхронная версия — asyncio.Queue.
Обе версии применяют концепцию очередей. Однако асинхронная версия делает процессы производства и потребления асинхронными, что может значительно улучшить производительность вашего кода.
Например, следующий фрагмент использует асинхронную очередь и применяет её для обработки производства и потребления:
import asyncio
async def producer(queue):
for i in range(3):
print(f"Producing {i}")
await queue.put(i)
await asyncio.sleep(2) # simulate a delay for a time-consuming process
async def consumer(queue):
while True:
item = await queue.get()
print(f"Consuming {item}")
queue.task_done()
async def main():
queue = asyncio.Queue()
prod = asyncio.create_task(producer(queue))
cons = asyncio.create_task(consumer(queue))
await asyncio.gather(prod)
await queue.join()
cons.cancel()
asyncio.run(main())
# Producing 0
# Consuming 0
# Producing 1
# Consuming 1
# Producing 2
# Consuming 2
Это иллюстрирует типичный сценарий производителя-потребителя, где одна корутина (производитель) производит элементы, а другая корутина (потребитель) их потребляет, обе работают одновременно.
Теперь давайте углубимся в ключевые моменты нашего использования асинхронной очереди:
- Очередь создается с помощью queue = asyncio.Queue().
- Задачи производителя и потребителя создаются и запускаются одновременно с помощью asyncio.create_task().
- await asyncio.gather(prod) гарантирует, что основная программа ждет завершения работы производителя, который должен произвести все элементы.
- await queue.join() блокирует основную функцию до тех пор, пока все элементы в очереди не будут обработаны. Это гарантирует, что программа ждет, пока потребитель не обработает все, что было произведено, перед выходом.
- Каждый раз, когда потребитель обрабатывает элемент, он сигнализирует очереди с помощью queue.task_done(), что указывает на то, что этот элемент был полностью обработан. Это позволяет queue.join() в конечном итоге разблокироваться.
- После того как очередь пустая и все задачи выполнены, cons.cancel() отменяет задачу потребителя, чтобы остановить его бесконечный цикл.
Заключение
Поздравляю с тем, что вы дошли до конца этого подробного руководства, не каждый может потратить более 10 минут, чтобы полностью прочитать и освоить технику.
Пройдя через эти девять уровней асинхронного программирования в Python, вы глубоко поняли, как реализовать неблокирующие, параллельные задачи в различных сценариях — от базовых асинхронных функций до более сложных паттернов производителя-потребителя.
Собираете данные, строите масштабируемые веб-серверы, управляете доступом к ресурсам или выполняете любые задачи, связанные с I/O, асинхронное программирование может стать мощным инструментом в вашем арсенале Python. Теперь возьмите эти принципы и сделайте ваши приложения более эффективными!
Спасибо за чтение и удачного программирования! ❤️
Обсуждение закрыто.