Асинхронное программирование в Python, часть вторая
Итак, давайте оставим, наконец, мысленные эксперименты и перейдём к практике. Рассматриваемые далее примеры тестировались под Python версии 3.6.1.
Пример 1: синхронное программирование
Рассмотрим некую "задачу", которая получает из очереди и выполняет некоторую "работу". В нашем примере "работой" будет последовательность целых чисел, а суть "задачи" будет состоять в том, чтобы последовательно досчитать от нуля до полученного из очереди числа, выводя в консоль значение числа на каждой итерации счёта, а также сумму всех чисел в конце. В данном примере все операции выполняются синхронно, однако закладывается фундамент для того, чтобы в будущем превратить программу в асинхронную.
import queue
def task(name, work_queue):
if work_queue.empty():
print(f'Очередь задачи {name} пуста')
else:
while not work_queue.empty():
count = work_queue.get()
total = 0
for x in range(count):
print(f'Задача {name} занимается вычислениями')
total += 1
print(f'Результат работы задачи {name}: {total}')
def main():
# Создаём очередь
work_queue = queue.Queue()
# Добавляем "работу" в очередь
for work in [15, 10, 5, 2]:
work_queue.put(work)
# Создаём две задачи
tasks = [
(task, 'One', work_queue),
(task, 'Two', work_queue)
]
# Запускаем задачи на выполнение
for t, n, q in tasks:
t(n, q)
if __name__ == '__main__':
main()
"Задача" в этой программе -- это просто функция task
, принимающая два значения: строку, содержащую имя самой задачи, и очередь с "работой". Когда она запускается, то выполняется проверка, есть ли что-то в очереди для обработки, и, если есть, то запускается обработка, заключающаяся в подсчёте суммы чисел в диапазоне от нуля до полученного из очереди числа. Функция будет работать до тех пор, пока не будет исчерпана вся очередь с данными.
Если вы запустите эту программу на исполнение, то увидите, что задача с именем "One" выполнит всю работу в одиночку. После завершения первой задачи, запустится задача "Two", но не обнаружив ничего в очереди, она тут же закончит выполнение. В данном примере просто нет кода, который бы заставил задачи работать сообща, переключаясь друг с другом.
Пример 2: простая кооперативная конкуренция
В этом примере мы изменим код так, чтобы добавить возможность совместной работы двух задач, реализовав это при помощи генераторов. Используя оператор yield
, мы сделаем так, что после возврата значения функция task
не будет завершать свою работу, а лишь приостанавливать до момента следующего вызова next()
. Используемый в примере код реализует кооперативную конкуренцию: задачи выполняются поочерёдно, и каждая из них выполняет обработку какой-то части данных, хранящихся в очереди.
import queue
def task(name, queue):
while not queue.empty():
count = queue.get()
total = 0
for x in range(count):
print(f'Задача {name} занимается вычислениями')
total += 1
yield
print(f'Результат работы задачи {name}: {total}')
def main():
# Создаём очередь
work_queue = queue.Queue()
# Добавляем работу в очередь
for work in [15, 10, 5, 2]:
work_queue.put(work)
# Создаём задачи
tasks = [
task('One', work_queue),
task('Two', work_queue)
]
# Запускаем задачи
done = False
while not done:
for t in tasks:
try:
next(t)
except StopIteration:
tasks.remove(t)
if len(tasks) == 0:
done = True
if __name__ == '__main__':
main()
Пример 3: кооперативная конкуренция с блокирующими вызовами
В нашем следующем примере оставим всё так же, как и в предыдущем, однако внутрь task()
добавим вызов time.sleep(1)
для того, чтобы симулировать медленную I/O-операцию. Также дополнительно сделаем подсчёт и вывод времени, которое занимает выполнение каждой задачи и всей программы в целом.
import time
import queue
def task(name, queue):
while not queue.empty():
count = queue.get()
total = 0
start_time = time.time()
for x in range(count):
print(f'Задача {name} занимается вычислениями')
time.sleep(1)
total += 1
yield
print(f'Результат работы задачи {name}: {total}')
print(f'Задача {name} заняла времени: {time.time() - start_time}')
def main():
# Создаём очередь
work_queue = queue.Queue()
# Добавляем работу в очередь
for work in [15, 10, 5, 2]:
work_queue.put(work)
# Создаём задачи
tasks = [
task('One', work_queue),
task('Two', work_queue)
]
# Запускаем задачи
start_time = time.time()
done = False
while not done:
for t in tasks:
try:
next(t)
except StopIteration:
tasks.remove(t)
if len(tasks) == 0:
done = True
print(f'Всего прошло времени: {time.time() - start_time}')
if __name__ == '__main__':
main()
Запустив этот пример на выполнение, мы увидим, что всё работает так же, как и в предыдущем случае, за исключением того, что работа task()
теперь блокируется вызовами time.sleep(1)
. Более того, пока один из экземпляров task()
ожидает завершения вызова sleep()
, останавливается выполнение всей программы. Таким образом получается, что от нашей кооперативной конкуренции нет ровным счётом никакого толку. Это именно то, что в разных книжках и документации называется "блокирующий код". Также обратите внимание на общее время выполнения программы: оно равно сумме всех задержек, вызванных блокирующим кодом.