Patrick's Software Blog

Introduction to asyncio in Python

Title of the blog post around a laptop computer being opened in the dark.

Introduction

This tutorial provides an introduction to using asyncio for asynchronous programming in Python.

This tutorial introduces the key concepts of asyncio, leading to a final application to to download user data concurrently from an external API.

The source code for this tutorial can be found on GitLab: https://gitlab.com/patkennedy79/asyncio_example.

Concurrency

Concurrency means being able to perform multiple tasks at once, or having the perception that multiple tasks as being performed at once. With computers, the tasks may not be executing exactly at the same time, but may switch between tasks to make it feel like multiple tasks are being performed at the same time.

There are three main approaches for concurrent programming in Python:

  1. Processes
  2. Threads
  3. asyncio

An excellent introduction to these three concurrency options in Python is Raymond Hettinger's PyBay 2017 Keynote: Raymond Hettinger, Keynote on Concurrency, PyBay 2017

Asynchronous Programming

Asynchronous programming is supported in Python via the asyncio module. Asynchronous programming allows your program to perform multiple tasks at the same time (i.e. concurrently).

asyncio is built using coroutines, which are special functions that can suspend and resume their execution. Typically, the coroutines will all run in a single thread with an event loop controlling the execution of the coroutines.

Asynchronous programming is beneficial when working with I/O-bound tasks, such as networking, database access, or file access; asynchronous programming is not helpful for CPU-bound tasks (processes are a better option).

Introduction to asyncio

asyncio utilizes the async and await keywords to distinguish a coroutine from a function:

async def get_user_data(url, index):
    """Coroutine to get user data from an external API."""
    result = await get(url + str(index))
    return result.status_code

def add(x, y):
    """Function to add two numbers."""
    return x + y

Let's jump into an example of a Python program for running a single coroutine:

import asyncio


async def wait_in_seconds(wait_time: int):  # <4>
    """Coroutine that sleeps for the specified seconds."""
    print(f"Starting wait_in_seconds({wait_time})...")
    await asyncio.sleep(wait_time)  # <5>, <6>
    print(f"... finished wait_in_seconds({wait_time})!") # <7>


async def main():  # <2>
    """Main coroutine."""
    print("Starting main()...")
    await wait_in_seconds(2)  # <3>
    print("...finished main()!")  # <8>


if __name__ == '__main__':
    asyncio.run(main())  # <1>, <9>

Here are the steps that occur in this program:

  1. asyncio.run(main()) causes the asyncio event loop to be created and passes the main() coroutine to the asyncio event loop
  2. The asyncio event loop runs the main() coroutine (using asyncio.run_until_complete())
  3. The main() coroutine runs until await wait_in_seconds(2), which suspends the execution of the current coroutine (main())
  4. The asyncio event loop runs the wait_in_seconds() coroutine
  5. The wait_in_seconds() coroutine runs until await asyncio.sleep(wait_time), which suspends the execution of the current coroutine (wait_in_seconds())
  6. The asyncio event loop runs the asyncio.sleep() coroutine.
  7. When the asyncio.sleep() coroutine completes, the asyncio event loop resumes the wait_in_seconds() coroutine.
  8. When the wait_in_seconds() coroutine completes, the asyncio event loop resumes the main() coroutine.
  9. When the main() coroutine completes, the asyncio.run() function closes the event loop.

Here are the key takeaways from this example:

  • The async keyword is used to define coroutines.
  • The await keyword is used to suspend the execution of the current coroutine until the called coroutine completes.
  • The asyncio event loop controls the execution of coroutines.

This program uses a typical pattern found with asyncio... create a main() coroutine and use asyncio.run(main()) to start the asyncio event loop and execute the top-level coroutine:

async def main():
    # Call coroutine(s)

asyncio.run(main())

One thing you might notice about this program is that it is more complex than a similar program using functions. The benefit of coroutines will become apparent when multiple coroutines are run concurrently.

Running Multiple Coroutines

How long do you think this script will take to execute?

import asyncio
import time


async def wait_in_seconds(wait_time: int):
    """Coroutine that sleeps for the specified seconds."""
    print(f"Starting wait_in_seconds({wait_time})...")
    await asyncio.sleep(wait_time)
    print(f"... finished wait_in_seconds({wait_time})!")


async def main():
    """Main coroutine."""
    print("Starting main()...")
    await wait_in_seconds(3)
    await wait_in_seconds(4)
    print("...finished main()!")


if __name__ == '__main__':
    start_time = time.perf_counter()
    asyncio.run(main())
    print(f"Completed operation in {(time.perf_counter() - start_time):.6f} seconds!")

It might seem like it should complete in 4 seconds (larger sleep time), but coroutines work by suspending their execution and waiting (await) for the called coroutine to finish executing. The two calls to wait_in_seconds() will execute sequentially.

Here's the output:

$ python3 asyncio_example2.py 
Starting main()...
Starting wait_in_seconds(3)...
... finished wait_in_seconds(3)!  # 1st call
Starting wait_in_seconds(4)...
... finished wait_in_seconds(4)!  # 2nd call (sequential!)
...finished main()!
Completed operation in 7.006342 seconds!

This example emphasizes that the await keyword in a coroutine is used to suspend the current coroutine while it waits for the called coroutine to complete.

Keyboard Interrupt

A nice improvement to the previous program is to add a graceful way to handle keyboard interrupts (i.e. Ctrl-C). The __name__ == '__main__': section should be updated to:

if __name__ == '__main__':
    try:
        start_time = time.perf_counter()
        asyncio.run(main())
        print(f"Completed operation in {(time.perf_counter() - start_time):.6f} seconds!")
    except KeyboardInterrupt:
        print("...stopping due to keyboard interrupt.")

Running this program and then doing Ctrl-C to stop the application:

$ python3 asyncio_example3.py
Starting main()...
Starting wait_in_seconds(3)...
^C...stopping due to keyboard interrupt.

Concurrently Running Coroutines

Let's take a look at why coroutines are so powerful... (hint: concurrency!).

gather()

The gather() function from the asyncio module can be used to run multiple coroutines concurrently:

import asyncio
import time


async def wait_in_seconds(wait_time: int):
    """Coroutine that sleeps for the specified seconds."""
    print(f"Starting wait_in_seconds({wait_time})...")
    await asyncio.sleep(wait_time)
    print(f"... finished wait_in_seconds({wait_time})!")


async def main():
    """Main coroutine for running multiple coroutines concurrently."""
    print("Starting main()...")
    await asyncio.gather(wait_in_seconds(3), wait_in_seconds(4), wait_in_seconds(5))
    print("...finished main()!")


if __name__ == '__main__':
    try:
        start_time = time.perf_counter()
        asyncio.run(main())
        print(f"Completed operation in {(time.perf_counter() - start_time):.6f} seconds!")
    except KeyboardInterrupt:
        print("...stopping due to keyboard interrupt.")

The gather() method from the asycnio module accepts multiple coroutines that are then scheduled to run by the asyncio event loop:

await asyncio.gather(wait_in_seconds(3), wait_in_seconds(4), wait_in_seconds(5))

The program takes about 5 seconds to run, which is the largest wait time of the three coroutines passed in to gather():

$ python3 asyncio_example4.py
Starting main()...
Starting wait_in_seconds(3)...
Starting wait_in_seconds(4)...
Starting wait_in_seconds(5)...
... finished wait_in_seconds(3)!
... finished wait_in_seconds(4)!
... finished wait_in_seconds(5)!
...finished main()!
Completed operation in 5.004393 seconds!

Changing the example to have a coroutine that simulates an external API call and returns a status message:

import asyncio
import time


async def simulated_api_call(index: int) -> str:
    """Coroutine that simulates an external API call."""
    await asyncio.sleep(index)
    return f"Successfully completed simulated_api_call({index})!"


async def main():
    """Main coroutine for running multiple coroutines concurrently."""
    print("Starting main()...")
    results = await asyncio.gather(simulated_api_call(3), simulated_api_call(4), simulated_api_call(5))
    for item in results:
        print(item)
    print("...finished main()!")


if __name__ == '__main__':
    try:
        start_time = time.perf_counter()
        asyncio.run(main())
        print(f"Completed operation in {(time.perf_counter() - start_time):.6f} seconds!")
    except KeyboardInterrupt:
        print("...stopping due to keyboard interrupt.")

The wait_in_seconds() coroutine has been replaced with the simulated_api_call() coroutine:

async def simulated_api_call(index: int) -> str:
    """Coroutine that simulates an external API call."""
    await asyncio.sleep(index)
    return f"Successfully completed simulated_api_call({index})!"

The difference with this coroutine is that it returns a string indicating success. Running this script has the returned strings from the three coroutines being printed after all three coroutines complete:

$ python3 asyncio_example5.py
Starting main()...
Successfully completed simulated_api_call(3)!
Successfully completed simulated_api_call(4)!
Successfully completed simulated_api_call(5)!
...finished main()!
Completed operation in 5.003844 seconds!

gather() is great for running coroutines concurrently, but the results are only returned after all the coroutines complete.

Coroutines vs. Tasks

Before moving on, let's talk about Tasks in asyncio.

Tasks in asyncio are wrappers around coroutines. Tasks can be created using:

task = asyncio.create_task(simulated_api_call(3))

The create_task() method will wrap the coroutine (simulated_api_call(3)) in a Task and schedule this Task to run in the asyncio event loop. In fact, the asyncio.gather() function wraps each coroutine in a Task so that they are executed by the asyncio event loop.

There is also a new TaskGroup in Python 3.11 that can be used with an asynchronous context manager to run multiple coroutines concurrently:

async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(simulated_api_call(3))
        task2 = tg.create_task(simulated_api_call(4))

From my experience, I've run into problems working with Tasks in asyncio. I think they are important to be aware of, but I would recommend working with coroutines.

as_completed()

asyncio provides the as_completed() function for running coroutines concurrently, but provides their results as they become available:

import asyncio
import time


async def simulated_api_call(index: int) -> str:
    """Coroutine that simulates an external API call."""
    await asyncio.sleep(index)
    return f"Successfully completed simulated_api_call({index})!"


async def main():
    """Main coroutine for running multiple coroutines concurrently."""
    coroutines = [simulated_api_call(i) for i in range(1, 6)]
    for completed_coroutine in asyncio.as_completed(coroutines):
        # Use `await` to get the result from the coroutine
        result = await completed_coroutine
        print(result)


if __name__ == '__main__':
    try:
        start_time = time.perf_counter()
        asyncio.run(main())
        print(f"Completed operation in {(time.perf_counter() - start_time):.6f} seconds!")
    except KeyboardInterrupt:
        print("...stopping due to keyboard interrupt.")

The use of asyncio.as_completed() results in a refactor to the main() coroutine:

async def main():
    """Main coroutine for running multiple coroutines concurrently."""
    coroutines = [simulated_api_call(i) for i in range(1, 6)]
    for completed_coroutine in asyncio.as_completed(coroutines):
        # Use `await` to get the result from the coroutine
        result = await completed_coroutine
        print(result)

First, a list of coroutines is created using list comprehension:

coroutines = [simulated_api_call(i) for i in range(1, 6)]

Second, these coroutines are scheduled to run in the asyncio event loop and the result from each coroutine is processed as each coroutine finishes:

for completed_coroutine in asyncio.as_completed(coroutines):

Finally, the result of each coroutine can be retrieved by awaiting the coroutine and printed to the console:

# Use `await` to get the result from the coroutine
result = await completed_coroutine
print(result)

The use of asyncio.as_completed() is my preferred approach for running coroutines concurrently.

asyncio Example for Retrieving User Data

Now that we're at the point of being able to write coroutines and run multiple coroutines concurrently, let's write a script to read data from a service called ReqRes (Request/Response - ReqRes). ReqRes provides an API for testing applications making API calls. We'll be using the "Single User" API endpoint to retrieve individual (fake) user data.

When a call is made to https://reqres.in/api/users/2, ReqRes will respond with a JSON packet:

{
    "data": {
        "id": 2,
        "email": "janet.weaver@reqres.in",
        "first_name": "Janet",
        "last_name": "Weaver",
        "avatar": "https://reqres.in/img/faces/2-image.jpg"
    },
    "support": {
        "url": "https://contentcaddy.io?utm_source=reqres&utm_medium=json&utm_campaign=referral",
        "text": "Tired of writing endless social media content? Let Content Caddy generate it for you."
    }
}

In our script, we can parse this data to print out the relevant information.

The httpx module will be used for making the HTTP requests to the ReqRes API. The httpx module provides an asynchronous client for sending out HTTP requests when working with asyncio:

import httpx

async with httpx.AsyncClient() as client:
    response = await client.get('https://reqres.in/api/users/4')

To avoid any issues with remembering to open and close the asynchronous client, an asynchronous context manager (async with) is the recommended approach for working with the asynchronous client from httpx. This client can then be used to call the applicable coroutine based on the desired HTTP method (get, post, delete, etc.).

Here's the full script for reading the data for 10 users using asyncio.as_completed() and httpx:

import asyncio
import time

import httpx

from utilities import parse_user_data


async def get_user_data(client: httpx.AsyncClient, index: int) -> str:
    """Coroutine that returns the data for the specified user."""
    url = 'https://reqres.in/api/users/' + str(index)
    response = await client.get(url, timeout=2.0, follow_redirects=True)
    response.raise_for_status()
    first_name, last_name, email = parse_user_data(response.json())
    return f"User {index}: {first_name} {last_name} - {email}"


async def main():
    """Main coroutine for running multiple coroutines concurrently."""
    async with httpx.AsyncClient() as client:
        coroutines = [get_user_data(client, i) for i in range(1,11)]
        for completed_coroutine in asyncio.as_completed(coroutines):
            # Use `await` to get the result from the coroutine
            result = await completed_coroutine
            print(result)


if __name__ == '__main__':
    try:
        start_time = time.perf_counter()
        asyncio.run(main())
        print(f"Completed operation in {(time.perf_counter() - start_time):.6f} seconds!")
    except KeyboardInterrupt:
        print("...stopping due to keyboard interrupt.")

The main() coroutine has a major change to use an asynchronous context manager to create an asynchronous HTTP client from the httpx module:

async def main():
    """Main coroutine for running multiple coroutines concurrently."""
    async with httpx.AsyncClient() as client:
        coroutines = [get_user_data(client, i) for i in range(1,11)]
        for completed_coroutine in asyncio.as_completed(coroutines):
            # Use `await` to get the result from the coroutine
            result = await completed_coroutine
            print(result)

This client needs to be passed to the get_user_data() coroutine for making the HTTP GET call.

NOTE: The httpx documentation strongly recommends only creating a single asynchronous client, so the asynchronous client is created in the main() coroutine and passed to each coroutine that is created.

The get_user_data() coroutine retrieves the user data for the specified index using the client.get() coroutine:

async def get_user_data(client: httpx.AsyncClient, index: int) -> str:
    """Coroutine that returns the data for the specified user."""
    url = 'https://reqres.in/api/users/' + str(index)
    response = await client.get(url, timeout=2.0, follow_redirects=True)
    response.raise_for_status()
    first_name, last_name, email = parse_user_data(response.json())
    return f"User {index}: {first_name} {last_name} - {email}"

After creating the URL string, the httpx asynchronous client is used to make an HTTP GET call to retrieve the JSON data from the ReqRes API. Since await is used, this coroutine (get_user_data()) will be suspended until a response comes back from the client.get() call. The suspending of the coroutine (get_user_data()) allows the asyncio event loop to run other coroutines.

Once the JSON response is available, the raise_for_status() method is called to raise an exception if any unexpected (i.e. not 200 OK) status codes are returned. If the response is valid, the relevant data is parsed and a string is returned.

Here's the parse_user_data() function for reference:

def parse_user_data(json_data: dict) -> tuple[str, str, str]:
    """Parse the JSON data from ReqRes.

    The input JSON data is expected to be of the format:
        {
            "data": {
                "id": 2,
                "email": "janet.weaver@reqres.in",
                "first_name": "Janet",
                "last_name": "Weaver",
                "avatar": "https://reqres.in/img/faces/2-image.jpg"
            },
            "support": { ... }
        }

    The following fields are returned as a tuple of strings:
        - first_name (str)
        - last_name (str)
        - email (str)
    """
    if 'data' not in json_data:
        print(f'ERROR! JSON data: {json_data}')
        return '', '', ''

    first_name = json_data['data']['first_name']
    last_name = json_data['data']['last_name']
    email = json_data['data']['email']
    return first_name, last_name, email

If you run this script, you'll see that the user data is printed as soon as it is retrieved thanks to asyncio.as_completed():

$ python3 user_data_asyncio_v1.py
User 5: Charles Morris - charles.morris@reqres.in
User 6: Tracey Ramos - tracey.ramos@reqres.in
User 3: Emma Wong - emma.wong@reqres.in
User 8: Lindsay Ferguson - lindsay.ferguson@reqres.in
User 10: Byron Fields - byron.fields@reqres.in
User 9: Tobias Funke - tobias.funke@reqres.in
User 4: Eve Holt - eve.holt@reqres.in
User 2: Janet Weaver - janet.weaver@reqres.in
User 1: George Bluth - george.bluth@reqres.in
User 7: Michael Lawson - michael.lawson@reqres.in
Completed operation in 0.242237 seconds!

If you were to run a synchronous version of this script, you get the data back in chronological order but the execution time is much longer:

$ python3 user_data_sync_version.py 
User 1: George Bluth - george.bluth@reqres.in
User 2: Janet Weaver - janet.weaver@reqres.in
User 3: Emma Wong - emma.wong@reqres.in
User 4: Eve Holt - eve.holt@reqres.in
User 5: Charles Morris - charles.morris@reqres.in
User 6: Tracey Ramos - tracey.ramos@reqres.in
User 7: Michael Lawson - michael.lawson@reqres.in
User 8: Lindsay Ferguson - lindsay.ferguson@reqres.in
User 9: Tobias Funke - tobias.funke@reqres.in
User 10: Byron Fields - byron.fields@reqres.in
Completed operation in 1.796225 seconds!

Semaphore

The previous asyncio script creates 10 coroutines that execute concurrently, which means 10 HTTP GET calls are
being made to the ReqRes API at approximately the same time. Since a lot of APIs have rate limits, we need to add a way to limit the number of coroutines that are executing concurrently.

In asyncio, a semaphore (asyncio.Semaphore) is used to limit the number of coroutines/tasks that execute concurrently:

semaphore = asyncio.Semaphore(5)  # Allow 5 coroutines/tasks to execute concurrently

A coroutine needs to take the semaphore as an argument:

async def get_user_data(semaphore: asyncio.Semaphore):
    await semaphore.acquire()
    try:
        result = await another_coroutine()
    finally:
        semaphore.release()

The semaphore is essentially a counter that starts at the specified value (5 in this case) and the value is decremented when a coroutine calls acquire(). After the coroutine completes, it needs to call release() to cause the value to increment. If a coroutine calls acquire() and the semaphore has a count of 0, then the coroutine must wait until the counter is incremented (which is how the number of concurrent coroutines is limited!).

The use of semaphores may seem complex, but an asynchronous context manager can be used to avoid needing to call semaphore.acquire() and semapohre.release():

async def get_user_data(semaphore: asyncio.Semaphore):
    await semaphore:
        result = await another_coroutine()

Here is the updated script for accessing user data, but limiting the number of coroutines that execute concurrently to 5:

import asyncio
import time

import httpx

from utilities import parse_user_data


async def get_user_data(client: httpx.AsyncClient, index: int, semaphore: asyncio.Semaphore) -> str:
    """Coroutine that returns the data for the specified user."""
    async with semaphore:  # <3>
        url = 'https://reqres.in/api/users/' + str(index)
        response = await client.get(url, timeout=2.0, follow_redirects=True)
        response.raise_for_status()
        first_name, last_name, email = parse_user_data(response.json())
        return f"User {index}: {first_name} {last_name} - {email}"


async def main():
    """Main coroutine for running multiple coroutines concurrently."""
    semaphore = asyncio.Semaphore(5)  # <1>

    async with httpx.AsyncClient() as client:
        coroutines = [get_user_data(client, i, semaphore) for i in range(1,11)]  # <2>
        for completed_coroutine in asyncio.as_completed(coroutines):
            # Use `await` to get the result from the coroutine
            result = await completed_coroutine
            print(result)


if __name__ == '__main__':
    try:
        start_time = time.perf_counter()
        asyncio.run(main())
        print(f"Completed operation in {(time.perf_counter() - start_time):.6f} seconds!")
    except KeyboardInterrupt:
        print("...stopping due to keyboard interrupt.")

Within the main() coroutine, a semaphore is created to limit the number of coroutines executing concurrently to 5 (<1>). The get_user_data() coroutine needs to be passed the semaphore (<2>), so that the get_user_data() coroutine can use the semaphore in an asynchronous context manager.

Running this script shows that the user data is received in approximately the same time:

python3 user_data_asyncio_v2.py
User 7: Michael Lawson - michael.lawson@reqres.in
User 4: Eve Holt - eve.holt@reqres.in
User 6: Tracey Ramos - tracey.ramos@reqres.in
User 9: Tobias Funke - tobias.funke@reqres.in
User 1: George Bluth - george.bluth@reqres.in
User 8: Lindsay Ferguson - lindsay.ferguson@reqres.in
User 10: Byron Fields - byron.fields@reqres.in
User 3: Emma Wong - emma.wong@reqres.in
User 5: Charles Morris - charles.morris@reqres.in
User 2: Janet Weaver - janet.weaver@reqres.in
Completed operation in 0.235260 seconds!

However, only 5 coroutines are being run concurrently at any single time.

If you want to really see this in action, add an asycnio.sleep(2) call in the get_user_data() coroutine.

Conclusion

asyncio is a great way to speed up Python applications that perform I/O-bound tasks. There is a learning curve to learn asyncio, but I think it's a powerful module that allows for safer concurrent code than threads.

The key concepts to remember with asyncio are:

  • Coroutines are special functions that can be suspended and resumed
  • The async keyword is used to define coroutines
  • The await keyword is used to suspend the execution of the current coroutine
  • The asyncio event loop controls the execution of coroutines
  • asyncio.gather() can be used to run coroutines concurrently, but the results are available after all the coroutines complete
  • asyncio.as_completed() can be used to run coroutines concurrently and handle the results as the coroutines complete
  • asyncio.Semaphore can be used to limit the number of coroutines running concurrently

I hope this tutorial inspires you to give asyncio a try!