How to Handle Coroutines with asyncio in Python

Spread the love

When a program becomes very long and complex, it is convenient to divide it into subroutines, each of which implements a specific task. However, subroutines cannot be executed independently, but only at the request of the main program, which is responsible for coordinating the use of subroutines.

In this post, we introduce a generalization of the concept of subroutines, known as coroutines: just like subroutines, coroutines compute a single computational step, but unlike subroutines, there is no main program to coordinate the results. The coroutines link themselves together to form a pipeline without any supervising function responsible for calling them in a particular order. 

This post is taken from the book Python Parallel Programming Cookbook  (2nd Ed.) by Giancarlo Zaccone. In this book, you will implement effective programming techniques in Python to build scalable software that saves time and memory. 

In a coroutine, the execution point can be suspended and resumed later, since the coroutine keeps track of the state of execution. Having a pool of coroutines, it is possible to interleave the computations: the first one runs until it yields control back, then the second runs and goes on down the line.

Read Also: Introducing Coil, an open-source Android image loading library backed by Kotlin Coroutines

The interleaving is managed by the event loop. It keeps track of all the coroutines and schedules when they will be executed.

Other important aspects of coroutines are as follows:

  • Coroutines allow for multiple entry points that can yield multiple times.
  • Coroutines can transfer execution to any other coroutine.

The term yield is used here to describe a coroutine pausing and passing the control flow to another coroutine.

Getting ready to work with coroutines

We will use the following notation to work with coroutines:

import asyncio 

@asyncio.coroutine
def coroutine_function(function_arguments):
    ............
    DO_SOMETHING
    ............

Coroutines use the yield from syntax introduced in PEP 380 (read more at https://www.python.org/dev/peps/pep-0380/) to stop the execution of the current computation and suspends the coroutine’s internal state.

In particular, in the case of yield from future, the coroutine is suspended until future is done, then the result of future will be propagated (or raise an exception); in the case of yield from coroutine, the coroutine waits for another coroutine to produce a result that will be propagated (or raise an exception).

As we shall see in the next example, in which the coroutines will be used to simulate a finite state machine, we will use the yield from coroutine notation.

More on coroutines with asyncio are available at https://docs.python.org/3.5/library/asyncio-task.html.

Using coroutines to simulate a finite state machine

In this example, we see how to use coroutines to simulate a finite state machine with five states.

finite state machine or finite state automaton is a mathematical model that is widely used in engineering disciplines, but also in sciences such as mathematics and computer science.

The automaton that we want to simulate the behavior of using coroutines is as follows:

The states of the system are S0S1S2S3, and S4, with 0 and 1: the values for which the automaton can pass from one state to the next state (this operation is called a transition). So, for example, state S0 can pass to state S1, but only for the value 1, and S0 can pass to state S2, but only for the value 0.

The following Python code simulates a transition of the automaton from state S0 (the start state), up to state S4 (the end state):

1) The first step is obviously to import the relevant libraries:

import asyncio
import time
from random import randint

2) Then, we define the coroutine relative to start_state. The input_value parameter is evaluated randomly; it can be 0 or 1. If it is 0, then the control goes to coroutinestate2; otherwise, it changes to coroutine state1:

@asyncio.coroutine
def start_state():
    print('Start State called\n')
    input_value = randint(0, 1)
    time.sleep(1)
    if input_value == 0:
        result = yield from state2(input_value)
    else:
        result = yield from state1(input_value)
    print('Resume of the Transition:\nStart State calling'+ result)

3) Here is the coroutine for state1. The input_value parameter is evaluated randomly; it can be 0 or 1. If it is 0, then the control goes tostate2; otherwise, it changes to state1:

@asyncio.coroutine
def state1(transition_value):
    output_value ='State 1 with transition value = %s\n'% \
                                             transition_value
    input_value = randint(0, 1)
    time.sleep(1)
    print('...evaluating...')
    if input_value == 0:
        result = yield from state3(input_value)
    else:
        result = yield from state2(input_value)
    return output_value + 'State 1 calling %s' % result

4) The coroutine for state1 has the transition_value argument that allowed the passage of the state. Also, in this case, input_value is randomly evaluated. If it is 0, then the state transitions to state3; otherwise, the control changes to state2:

@asyncio.coroutine
def state2(transition_value):
    output_value = 'State 2 with transition value = %s\n' %\
                                             transition_value
    input_value = randint(0, 1)
    time.sleep(1)
    print('...evaluating...')
    if input_value == 0:
        result = yield from state1(input_value)
    else:
        result = yield from state3(input_value)
    return output_value + 'State 2 calling %s' % result

5) The coroutine for state3 has the transition_value argument, which allowed the passage of the state. input_value is randomly evaluated. If it is 0, then the state transitions to state1; otherwise, the control changes to end_state:

@asyncio.coroutine
def state3(transition_value):
    output_value = 'State 3 with transition value = %s\n' %\
                                                 transition_value
    input_value = randint(0, 1)
    time.sleep(1)
    print('...evaluating...')
    if input_value == 0:
        result = yield from state1(input_value)
    else:
        result = yield from end_state(input_value)
    return output_value + 'State 3 calling %s' % result

end_state prints out the transition_value argument, which allowed the passage of the state, and then stops the computation:

@asyncio.coroutine
def end_state(transition_value):
    output_value = 'End State with transition value = %s\n'%\
                                                transition_value
    print('...stop computation...')
    return output_value

7) In the __main__ function, the event loop is acquired, and then we start the simulation of the finite state machine, calling the automaton’s start_state:

if __name__ == '__main__':
    print('Finite State Machine simulation with Asyncio Coroutine')
    loop = asyncio.get_event_loop()
    loop.run_until_complete(start_state())

How coroutines simulate a finite state machine

Each state of the automaton has been defined by using the decorator:

@asyncio.coroutine

For example, state S0 is defined here:

@asyncio.coroutine
def StartState():
    print ("Start State called \n")
    input_value = randint(0,1)
    time.sleep(1)
    if (input_value == 0):
        result = yield from State2(input_value)
    else :
        result = yield from State1(input_value)

The transition to the next state is determined by input_value, which is defined by the randint(0,1) function of Python’s random module. This function randomly provides a value of 0 or 1.

In this manner, randintrandomly determines the state to which the finite state machine will pass:

input_value = randint(0,1)

After determining the values to pass, the coroutine calls the next coroutine using the yield from command:

if (input_value == 0):
        result = yield from State2(input_value)
    else :
        result = yield from State1(input_value)

The result variable is the value that each coroutine returns. It is a string, and, at the end of the computation, we can reconstruct the transition from the initial state of the automaton, start_state, up to end_state.

The main program starts the evaluation inside the event loop:

if __name__ == "__main__":
    print("Finite State Machine simulation with Asyncio Coroutine")
    loop = asyncio.get_event_loop()
    loop.run_until_complete(StartState())

Running the code, we have an output like this:

Finite State Machine simulation with Asyncio Coroutine
Start State called
...evaluating...
...evaluating...
...evaluating...
...evaluating...
...stop computation...
Resume of the Transition : 
Start State calling State 1 with transition value = 1
State 1 calling State 2 with transition value = 1
State 2 calling State 1 with transition value = 0
State 1 calling State 3 with transition value = 0
State 3 calling End State with transition value = 1

Handling coroutines with asyncio in Python 3.5

Before Python 3.5 was released, the asyncio module used generators to mimic asynchronous calls and, therefore, had a different syntax than the current version of Python 3.5.

Python 3.5 introduced the async and await keywords. Notice the lack of parentheses around the await func() call.

The following is an example of “Hello, world!“, using asyncio with the new syntax introduced by Python 3.5+:

import asyncio
 
async def main():
    print(await func())
 
async def func():
    # Do time intensive stuff...
    return "Hello, world!"
 
if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

In this post, we learned how to handle coroutines with asyncio. To learn more features of asynchronous programming in Python, you may go through the book Python Parallel Programming Cookbook  (2nd Ed.) by Packt Publishing.


Spread the love
%d bloggers like this: