Introduction
This tutorial provides an introduction to using asyncio for asynchronous programming in Python.
This tutorial introduces the key concepts of asyncio, leading to a final application to to download user data concurrently from an external API.
The source code for this tutorial can be found on GitLab: https://gitlab.com/patkennedy79/asyncio_example.
Concurrency
Concurrency means being able to perform multiple tasks at once, or having the perception that multiple tasks as being performed at once. With computers, the tasks may not be executing exactly at the same time, but may switch between tasks to make it feel like multiple tasks are being performed at the same time.
There are three main approaches for concurrent programming in Python:
- Processes
- Threads
- asyncio
An excellent introduction to these three concurrency options in Python is Raymond Hettinger's PyBay 2017 Keynote: Raymond Hettinger, Keynote on Concurrency, PyBay 2017
Asynchronous Programming
Asynchronous programming is supported in Python via the asyncio
module. Asynchronous programming allows your program
to perform multiple tasks at the same time (i.e. concurrently).
asyncio is built using coroutines, which are special functions that can suspend and resume their execution. Typically, the coroutines will all run in a single thread with an event loop controlling the execution of the coroutines.
Asynchronous programming is beneficial when working with I/O-bound tasks, such as networking, database access, or file access; asynchronous programming is not helpful for CPU-bound tasks (processes are a better option).
Introduction to asyncio
asyncio utilizes the async
and await
keywords to distinguish a coroutine from a function:
async def get_user_data(url, index):
"""Coroutine to get user data from an external API."""
result = await get(url + str(index))
return result.status_code
def add(x, y):
"""Function to add two numbers."""
return x + y
Let's jump into an example of a Python program for running a single coroutine:
import asyncio
async def wait_in_seconds(wait_time: int): # <4>
"""Coroutine that sleeps for the specified seconds."""
print(f"Starting wait_in_seconds({wait_time})...")
await asyncio.sleep(wait_time) # <5>, <6>
print(f"... finished wait_in_seconds({wait_time})!") # <7>
async def main(): # <2>
"""Main coroutine."""
print("Starting main()...")
await wait_in_seconds(2) # <3>
print("...finished main()!") # <8>
if __name__ == '__main__':
asyncio.run(main()) # <1>, <9>
Here are the steps that occur in this program:
asyncio.run(main())
causes the asyncio event loop to be created and passes themain()
coroutine to the asyncio event loop- The asyncio event loop runs the
main()
coroutine (usingasyncio.run_until_complete()
) - The
main()
coroutine runs untilawait wait_in_seconds(2)
, which suspends the execution of the current coroutine (main()
) - The asyncio event loop runs the
wait_in_seconds()
coroutine - The
wait_in_seconds()
coroutine runs untilawait asyncio.sleep(wait_time)
, which suspends the execution of the current coroutine (wait_in_seconds()
) - The asyncio event loop runs the
asyncio.sleep()
coroutine. - When the
asyncio.sleep()
coroutine completes, the asyncio event loop resumes thewait_in_seconds()
coroutine. - When the
wait_in_seconds()
coroutine completes, the asyncio event loop resumes themain()
coroutine. - When the
main()
coroutine completes, theasyncio.run()
function closes the event loop.
Here are the key takeaways from this example:
- The
async
keyword is used to define coroutines. - The
await
keyword is used to suspend the execution of the current coroutine until the called coroutine completes. - The asyncio event loop controls the execution of coroutines.
This program uses a typical pattern found with asyncio... create a main()
coroutine and use asyncio.run(main())
to
start the asyncio event loop and execute the top-level coroutine:
async def main():
# Call coroutine(s)
asyncio.run(main())
One thing you might notice about this program is that it is more complex than a similar program using functions. The benefit of coroutines will become apparent when multiple coroutines are run concurrently.
Running Multiple Coroutines
How long do you think this script will take to execute?
import asyncio
import time
async def wait_in_seconds(wait_time: int):
"""Coroutine that sleeps for the specified seconds."""
print(f"Starting wait_in_seconds({wait_time})...")
await asyncio.sleep(wait_time)
print(f"... finished wait_in_seconds({wait_time})!")
async def main():
"""Main coroutine."""
print("Starting main()...")
await wait_in_seconds(3)
await wait_in_seconds(4)
print("...finished main()!")
if __name__ == '__main__':
start_time = time.perf_counter()
asyncio.run(main())
print(f"Completed operation in {(time.perf_counter() - start_time):.6f} seconds!")
It might seem like it should complete in 4 seconds (larger sleep time), but coroutines work by suspending their execution
and waiting (await
) for the called coroutine to finish executing. The two calls to wait_in_seconds()
will execute
sequentially.
Here's the output:
$ python3 asyncio_example2.py
Starting main()...
Starting wait_in_seconds(3)...
... finished wait_in_seconds(3)! # 1st call
Starting wait_in_seconds(4)...
... finished wait_in_seconds(4)! # 2nd call (sequential!)
...finished main()!
Completed operation in 7.006342 seconds!
This example emphasizes that the await
keyword in a coroutine is used to suspend the current coroutine while it
waits for the called coroutine to complete.
Keyboard Interrupt
A nice improvement to the previous program is to add a graceful way to handle keyboard interrupts (i.e. Ctrl-C). The
__name__ == '__main__':
section should be updated to:
if __name__ == '__main__':
try:
start_time = time.perf_counter()
asyncio.run(main())
print(f"Completed operation in {(time.perf_counter() - start_time):.6f} seconds!")
except KeyboardInterrupt:
print("...stopping due to keyboard interrupt.")
Running this program and then doing Ctrl-C to stop the application:
$ python3 asyncio_example3.py
Starting main()...
Starting wait_in_seconds(3)...
^C...stopping due to keyboard interrupt.
Concurrently Running Coroutines
Let's take a look at why coroutines are so powerful... (hint: concurrency!).
gather()
The gather()
function from the asyncio
module can be used to run multiple coroutines concurrently:
import asyncio
import time
async def wait_in_seconds(wait_time: int):
"""Coroutine that sleeps for the specified seconds."""
print(f"Starting wait_in_seconds({wait_time})...")
await asyncio.sleep(wait_time)
print(f"... finished wait_in_seconds({wait_time})!")
async def main():
"""Main coroutine for running multiple coroutines concurrently."""
print("Starting main()...")
await asyncio.gather(wait_in_seconds(3), wait_in_seconds(4), wait_in_seconds(5))
print("...finished main()!")
if __name__ == '__main__':
try:
start_time = time.perf_counter()
asyncio.run(main())
print(f"Completed operation in {(time.perf_counter() - start_time):.6f} seconds!")
except KeyboardInterrupt:
print("...stopping due to keyboard interrupt.")
The gather()
method from the asycnio
module accepts multiple coroutines that are then scheduled to run by the asyncio
event loop:
await asyncio.gather(wait_in_seconds(3), wait_in_seconds(4), wait_in_seconds(5))
The program takes about 5 seconds to run, which is the largest wait time of the three coroutines passed in to gather()
:
$ python3 asyncio_example4.py
Starting main()...
Starting wait_in_seconds(3)...
Starting wait_in_seconds(4)...
Starting wait_in_seconds(5)...
... finished wait_in_seconds(3)!
... finished wait_in_seconds(4)!
... finished wait_in_seconds(5)!
...finished main()!
Completed operation in 5.004393 seconds!
Changing the example to have a coroutine that simulates an external API call and returns a status message:
import asyncio
import time
async def simulated_api_call(index: int) -> str:
"""Coroutine that simulates an external API call."""
await asyncio.sleep(index)
return f"Successfully completed simulated_api_call({index})!"
async def main():
"""Main coroutine for running multiple coroutines concurrently."""
print("Starting main()...")
results = await asyncio.gather(simulated_api_call(3), simulated_api_call(4), simulated_api_call(5))
for item in results:
print(item)
print("...finished main()!")
if __name__ == '__main__':
try:
start_time = time.perf_counter()
asyncio.run(main())
print(f"Completed operation in {(time.perf_counter() - start_time):.6f} seconds!")
except KeyboardInterrupt:
print("...stopping due to keyboard interrupt.")
The wait_in_seconds()
coroutine has been replaced with the simulated_api_call()
coroutine:
async def simulated_api_call(index: int) -> str:
"""Coroutine that simulates an external API call."""
await asyncio.sleep(index)
return f"Successfully completed simulated_api_call({index})!"
The difference with this coroutine is that it returns a string indicating success. Running this script has the returned strings from the three coroutines being printed after all three coroutines complete:
$ python3 asyncio_example5.py
Starting main()...
Successfully completed simulated_api_call(3)!
Successfully completed simulated_api_call(4)!
Successfully completed simulated_api_call(5)!
...finished main()!
Completed operation in 5.003844 seconds!
gather()
is great for running coroutines concurrently, but the results are only returned after all the coroutines
complete.
Coroutines vs. Tasks
Before moving on, let's talk about Tasks in asyncio.
Tasks in asyncio are wrappers around coroutines. Tasks can be created using:
task = asyncio.create_task(simulated_api_call(3))
The create_task()
method will wrap the coroutine (simulated_api_call(3)
) in a Task and schedule this Task to run in
the asyncio event loop. In fact, the asyncio.gather()
function wraps each coroutine in a Task so that they are executed
by the asyncio event loop.
There is also a new TaskGroup
in Python 3.11 that can be used with an asynchronous context manager to run multiple
coroutines concurrently:
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(simulated_api_call(3))
task2 = tg.create_task(simulated_api_call(4))
From my experience, I've run into problems working with Tasks in asyncio. I think they are important to be aware of, but I would recommend working with coroutines.
as_completed()
asyncio provides the as_completed()
function for running coroutines concurrently, but provides their results as they
become available:
import asyncio
import time
async def simulated_api_call(index: int) -> str:
"""Coroutine that simulates an external API call."""
await asyncio.sleep(index)
return f"Successfully completed simulated_api_call({index})!"
async def main():
"""Main coroutine for running multiple coroutines concurrently."""
coroutines = [simulated_api_call(i) for i in range(1, 6)]
for completed_coroutine in asyncio.as_completed(coroutines):
# Use `await` to get the result from the coroutine
result = await completed_coroutine
print(result)
if __name__ == '__main__':
try:
start_time = time.perf_counter()
asyncio.run(main())
print(f"Completed operation in {(time.perf_counter() - start_time):.6f} seconds!")
except KeyboardInterrupt:
print("...stopping due to keyboard interrupt.")
The use of asyncio.as_completed()
results in a refactor to the main()
coroutine:
async def main():
"""Main coroutine for running multiple coroutines concurrently."""
coroutines = [simulated_api_call(i) for i in range(1, 6)]
for completed_coroutine in asyncio.as_completed(coroutines):
# Use `await` to get the result from the coroutine
result = await completed_coroutine
print(result)
First, a list of coroutines is created using list comprehension:
coroutines = [simulated_api_call(i) for i in range(1, 6)]
Second, these coroutines are scheduled to run in the asyncio event loop and the result from each coroutine is processed as each coroutine finishes:
for completed_coroutine in asyncio.as_completed(coroutines):
Finally, the result of each coroutine can be retrieved by await
ing the coroutine and printed to the console:
# Use `await` to get the result from the coroutine
result = await completed_coroutine
print(result)
The use of asyncio.as_completed()
is my preferred approach for running coroutines concurrently.
asyncio Example for Retrieving User Data
Now that we're at the point of being able to write coroutines and run multiple coroutines concurrently, let's write a script to read data from a service called ReqRes (Request/Response - ReqRes). ReqRes provides an API for testing applications making API calls. We'll be using the "Single User" API endpoint to retrieve individual (fake) user data.
When a call is made to https://reqres.in/api/users/2
, ReqRes will respond with a JSON packet:
{
"data": {
"id": 2,
"email": "janet.weaver@reqres.in",
"first_name": "Janet",
"last_name": "Weaver",
"avatar": "https://reqres.in/img/faces/2-image.jpg"
},
"support": {
"url": "https://contentcaddy.io?utm_source=reqres&utm_medium=json&utm_campaign=referral",
"text": "Tired of writing endless social media content? Let Content Caddy generate it for you."
}
}
In our script, we can parse this data to print out the relevant information.
The httpx
module will be used for making the HTTP requests to the ReqRes API. The httpx
module provides an asynchronous
client for sending out HTTP requests when working with asyncio:
import httpx
async with httpx.AsyncClient() as client:
response = await client.get('https://reqres.in/api/users/4')
To avoid any issues with remembering to open and close the asynchronous client, an asynchronous context manager (async with
)
is the recommended approach for working with the asynchronous client from httpx
. This client
can then be used to call
the applicable coroutine based on the desired HTTP method (get
, post
, delete
, etc.).
Here's the full script for reading the data for 10 users using asyncio.as_completed()
and httpx
:
import asyncio
import time
import httpx
from utilities import parse_user_data
async def get_user_data(client: httpx.AsyncClient, index: int) -> str:
"""Coroutine that returns the data for the specified user."""
url = 'https://reqres.in/api/users/' + str(index)
response = await client.get(url, timeout=2.0, follow_redirects=True)
response.raise_for_status()
first_name, last_name, email = parse_user_data(response.json())
return f"User {index}: {first_name} {last_name} - {email}"
async def main():
"""Main coroutine for running multiple coroutines concurrently."""
async with httpx.AsyncClient() as client:
coroutines = [get_user_data(client, i) for i in range(1,11)]
for completed_coroutine in asyncio.as_completed(coroutines):
# Use `await` to get the result from the coroutine
result = await completed_coroutine
print(result)
if __name__ == '__main__':
try:
start_time = time.perf_counter()
asyncio.run(main())
print(f"Completed operation in {(time.perf_counter() - start_time):.6f} seconds!")
except KeyboardInterrupt:
print("...stopping due to keyboard interrupt.")
The main()
coroutine has a major change to use an asynchronous context manager to create an asynchronous HTTP client
from the httpx
module:
async def main():
"""Main coroutine for running multiple coroutines concurrently."""
async with httpx.AsyncClient() as client:
coroutines = [get_user_data(client, i) for i in range(1,11)]
for completed_coroutine in asyncio.as_completed(coroutines):
# Use `await` to get the result from the coroutine
result = await completed_coroutine
print(result)
This client needs to be passed to the get_user_data()
coroutine for making the HTTP GET call.
NOTE: The
httpx
documentation strongly recommends only creating a single asynchronous client, so the asynchronous client is created in themain()
coroutine and passed to each coroutine that is created.
The get_user_data()
coroutine retrieves the user data for the specified index using the client.get()
coroutine:
async def get_user_data(client: httpx.AsyncClient, index: int) -> str:
"""Coroutine that returns the data for the specified user."""
url = 'https://reqres.in/api/users/' + str(index)
response = await client.get(url, timeout=2.0, follow_redirects=True)
response.raise_for_status()
first_name, last_name, email = parse_user_data(response.json())
return f"User {index}: {first_name} {last_name} - {email}"
After creating the URL string, the httpx asynchronous client
is used to make an HTTP GET call to retrieve the JSON data
from the ReqRes API. Since await
is used, this coroutine (get_user_data()
) will be suspended until a response comes back from the client.get()
call. The suspending of the coroutine (get_user_data()
) allows the asyncio event loop to run other coroutines.
Once the JSON response is available, the raise_for_status()
method is called to raise an exception if any unexpected
(i.e. not 200 OK) status codes are returned. If the response is valid, the relevant data is parsed and a string is returned.
Here's the parse_user_data()
function for reference:
def parse_user_data(json_data: dict) -> tuple[str, str, str]:
"""Parse the JSON data from ReqRes.
The input JSON data is expected to be of the format:
{
"data": {
"id": 2,
"email": "janet.weaver@reqres.in",
"first_name": "Janet",
"last_name": "Weaver",
"avatar": "https://reqres.in/img/faces/2-image.jpg"
},
"support": { ... }
}
The following fields are returned as a tuple of strings:
- first_name (str)
- last_name (str)
- email (str)
"""
if 'data' not in json_data:
print(f'ERROR! JSON data: {json_data}')
return '', '', ''
first_name = json_data['data']['first_name']
last_name = json_data['data']['last_name']
email = json_data['data']['email']
return first_name, last_name, email
If you run this script, you'll see that the user data is printed as soon as it is retrieved thanks to asyncio.as_completed()
:
$ python3 user_data_asyncio_v1.py
User 5: Charles Morris - charles.morris@reqres.in
User 6: Tracey Ramos - tracey.ramos@reqres.in
User 3: Emma Wong - emma.wong@reqres.in
User 8: Lindsay Ferguson - lindsay.ferguson@reqres.in
User 10: Byron Fields - byron.fields@reqres.in
User 9: Tobias Funke - tobias.funke@reqres.in
User 4: Eve Holt - eve.holt@reqres.in
User 2: Janet Weaver - janet.weaver@reqres.in
User 1: George Bluth - george.bluth@reqres.in
User 7: Michael Lawson - michael.lawson@reqres.in
Completed operation in 0.242237 seconds!
If you were to run a synchronous version of this script, you get the data back in chronological order but the execution time is much longer:
$ python3 user_data_sync_version.py
User 1: George Bluth - george.bluth@reqres.in
User 2: Janet Weaver - janet.weaver@reqres.in
User 3: Emma Wong - emma.wong@reqres.in
User 4: Eve Holt - eve.holt@reqres.in
User 5: Charles Morris - charles.morris@reqres.in
User 6: Tracey Ramos - tracey.ramos@reqres.in
User 7: Michael Lawson - michael.lawson@reqres.in
User 8: Lindsay Ferguson - lindsay.ferguson@reqres.in
User 9: Tobias Funke - tobias.funke@reqres.in
User 10: Byron Fields - byron.fields@reqres.in
Completed operation in 1.796225 seconds!
Semaphore
The previous asyncio script creates 10 coroutines that execute concurrently, which means 10 HTTP GET calls are
being made to the ReqRes API at approximately the same time. Since a lot of APIs have rate limits, we need to add a way
to limit the number of coroutines that are executing concurrently.
In asyncio, a semaphore (asyncio.Semaphore
) is used to limit the number of coroutines/tasks that execute concurrently:
semaphore = asyncio.Semaphore(5) # Allow 5 coroutines/tasks to execute concurrently
A coroutine needs to take the semaphore
as an argument:
async def get_user_data(semaphore: asyncio.Semaphore):
await semaphore.acquire()
try:
result = await another_coroutine()
finally:
semaphore.release()
The semaphore is essentially a counter that starts at the specified value (5 in this case) and the value is decremented
when a coroutine calls acquire()
. After the coroutine completes, it needs to call release()
to cause the value to
increment. If a coroutine calls acquire()
and the semaphore has a count of 0, then the coroutine must wait until the
counter is incremented (which is how the number of concurrent coroutines is limited!).
The use of semaphores may seem complex, but an asynchronous context manager can be used to avoid needing to call semaphore.acquire()
and semapohre.release()
:
async def get_user_data(semaphore: asyncio.Semaphore):
await semaphore:
result = await another_coroutine()
Here is the updated script for accessing user data, but limiting the number of coroutines that execute concurrently to 5:
import asyncio
import time
import httpx
from utilities import parse_user_data
async def get_user_data(client: httpx.AsyncClient, index: int, semaphore: asyncio.Semaphore) -> str:
"""Coroutine that returns the data for the specified user."""
async with semaphore: # <3>
url = 'https://reqres.in/api/users/' + str(index)
response = await client.get(url, timeout=2.0, follow_redirects=True)
response.raise_for_status()
first_name, last_name, email = parse_user_data(response.json())
return f"User {index}: {first_name} {last_name} - {email}"
async def main():
"""Main coroutine for running multiple coroutines concurrently."""
semaphore = asyncio.Semaphore(5) # <1>
async with httpx.AsyncClient() as client:
coroutines = [get_user_data(client, i, semaphore) for i in range(1,11)] # <2>
for completed_coroutine in asyncio.as_completed(coroutines):
# Use `await` to get the result from the coroutine
result = await completed_coroutine
print(result)
if __name__ == '__main__':
try:
start_time = time.perf_counter()
asyncio.run(main())
print(f"Completed operation in {(time.perf_counter() - start_time):.6f} seconds!")
except KeyboardInterrupt:
print("...stopping due to keyboard interrupt.")
Within the main()
coroutine, a semaphore
is created to limit the number of coroutines executing concurrently to 5 (<1>). The
get_user_data()
coroutine needs to be passed the semaphore
(<2>), so that the get_user_data()
coroutine can use
the semaphore
in an asynchronous context manager.
Running this script shows that the user data is received in approximately the same time:
python3 user_data_asyncio_v2.py
User 7: Michael Lawson - michael.lawson@reqres.in
User 4: Eve Holt - eve.holt@reqres.in
User 6: Tracey Ramos - tracey.ramos@reqres.in
User 9: Tobias Funke - tobias.funke@reqres.in
User 1: George Bluth - george.bluth@reqres.in
User 8: Lindsay Ferguson - lindsay.ferguson@reqres.in
User 10: Byron Fields - byron.fields@reqres.in
User 3: Emma Wong - emma.wong@reqres.in
User 5: Charles Morris - charles.morris@reqres.in
User 2: Janet Weaver - janet.weaver@reqres.in
Completed operation in 0.235260 seconds!
However, only 5 coroutines are being run concurrently at any single time.
If you want to really see this in action, add an
asycnio.sleep(2)
call in theget_user_data()
coroutine.
Conclusion
asyncio is a great way to speed up Python applications that perform I/O-bound tasks. There is a learning curve to learn asyncio, but I think it's a powerful module that allows for safer concurrent code than threads.
The key concepts to remember with asyncio are:
- Coroutines are special functions that can be suspended and resumed
- The
async
keyword is used to define coroutines - The
await
keyword is used to suspend the execution of the current coroutine - The asyncio event loop controls the execution of coroutines
asyncio.gather()
can be used to run coroutines concurrently, but the results are available after all the coroutines completeasyncio.as_completed()
can be used to run coroutines concurrently and handle the results as the coroutines completeasyncio.Semaphore
can be used to limit the number of coroutines running concurrently
I hope this tutorial inspires you to give asyncio a try!