Асинхронное программирование в Python, часть вторая

Итак, давайте оставим, наконец, мысленные эксперименты и перейдём к практике. Рассматриваемые далее примеры тестировались под Python версии 3.6.1.

Пример 1: синхронное программирование

Рассмотрим некую "задачу", которая получает из очереди и выполняет некоторую "работу". В нашем примере "работой" будет последовательность целых чисел, а суть "задачи" будет состоять в том, чтобы последовательно досчитать от нуля до полученного из очереди числа, выводя в консоль значение числа на каждой итерации счёта, а также сумму всех чисел в конце. В данном примере все операции выполняются синхронно, однако закладывается фундамент для того, чтобы в будущем превратить программу в асинхронную.

import queue

def task(name, work_queue):
    if work_queue.empty():
        print(f'Очередь задачи {name} пуста')
    else:
        while not work_queue.empty():
            count = work_queue.get()
            total = 0
            for x in range(count):
                print(f'Задача {name} занимается вычислениями')
                total += 1
            print(f'Результат работы задачи {name}: {total}')


def main():
    # Создаём очередь
    work_queue = queue.Queue()

    # Добавляем "работу" в очередь
    for work in [15, 10, 5, 2]:
        work_queue.put(work)

    # Создаём две задачи
    tasks = [
        (task, 'One', work_queue),
        (task, 'Two', work_queue)
    ]

    # Запускаем задачи на выполнение
    for t, n, q in tasks:
        t(n, q)


if __name__ == '__main__':
    main()

"Задача" в этой программе -- это просто функция task, принимающая два значения: строку, содержащую имя самой задачи, и очередь с "работой". Когда она запускается, то выполняется проверка, есть ли что-то в очереди для обработки, и, если есть, то запускается обработка, заключающаяся в подсчёте суммы чисел в диапазоне от нуля до полученного из очереди числа. Функция будет работать до тех пор, пока не будет исчерпана вся очередь с данными.

Если вы запустите эту программу на исполнение, то увидите, что задача с именем "One" выполнит всю работу в одиночку. После завершения первой задачи, запустится задача "Two", но не обнаружив ничего в очереди, она тут же закончит выполнение. В данном примере просто нет кода, который бы заставил задачи работать сообща, переключаясь друг с другом.

Пример 2: простая кооперативная конкуренция

В этом примере мы изменим код так, чтобы добавить возможность совместной работы двух задач, реализовав это при помощи генераторов. Используя оператор yield, мы сделаем так, что после возврата значения функция task не будет завершать свою работу, а лишь приостанавливать до момента следующего вызова next(). Используемый в примере код реализует кооперативную конкуренцию: задачи выполняются поочерёдно, и каждая из них выполняет обработку какой-то части данных, хранящихся в очереди.

import queue

def task(name, queue):
    while not queue.empty():
        count = queue.get()
        total = 0
        for x in range(count):
            print(f'Задача {name} занимается вычислениями')
            total += 1
            yield
        print(f'Результат работы задачи {name}: {total}')

def main():
    # Создаём очередь
    work_queue = queue.Queue()

    # Добавляем работу в очередь
    for work in [15, 10, 5, 2]:
        work_queue.put(work)

    # Создаём задачи
    tasks = [
        task('One', work_queue),
        task('Two', work_queue)
    ]

    # Запускаем задачи
    done = False
    while not done:
        for t in tasks:
            try:
                next(t)
            except StopIteration:
                tasks.remove(t)
            if len(tasks) == 0:
                done = True


if __name__ == '__main__':
    main()

Пример 3: кооперативная конкуренция с блокирующими вызовами

В нашем следующем примере оставим всё так же, как и в предыдущем, однако внутрь task() добавим вызов time.sleep(1) для того, чтобы симулировать медленную I/O-операцию. Также дополнительно сделаем подсчёт и вывод времени, которое занимает выполнение каждой задачи и всей программы в целом.

import time
import queue


def task(name, queue):
    while not queue.empty():
        count = queue.get()
        total = 0
        start_time = time.time()
        for x in range(count):
            print(f'Задача {name} занимается вычислениями')
            time.sleep(1)
            total += 1
            yield
        print(f'Результат работы задачи {name}: {total}')
        print(f'Задача {name} заняла времени: {time.time() - start_time}')


def main():
    # Создаём очередь
    work_queue = queue.Queue()

    # Добавляем работу в очередь
    for work in [15, 10, 5, 2]:
        work_queue.put(work)

    # Создаём задачи
    tasks = [
        task('One', work_queue),
        task('Two', work_queue)
    ]

    # Запускаем задачи
    start_time = time.time()
    done = False
    while not done:
        for t in tasks:
            try:
                next(t)
            except StopIteration:
                tasks.remove(t)
            if len(tasks) == 0:
                done = True

    print(f'Всего прошло времени: {time.time() - start_time}')


if __name__ == '__main__':
    main()

Запустив этот пример на выполнение, мы увидим, что всё работает так же, как и в предыдущем случае, за исключением того, что работа task() теперь блокируется вызовами time.sleep(1). Более того, пока один из экземпляров task() ожидает завершения вызова sleep(), останавливается выполнение всей программы. Таким образом получается, что от нашей кооперативной конкуренции нет ровным счётом никакого толку. Это именно то, что в разных книжках и документации называется "блокирующий код". Также обратите внимание на общее время выполнения программы: оно равно сумме всех задержек, вызванных блокирующим кодом.

Дивіться також

HTMLer Технології
HTMLer
9 липня 2019
DicMer Технології
DicMer
7 липня 2019