Concurrency is a fundamental concept in computing. It is the process where a system can manage multiple tasks in an overlapping manner. This does not mean the tasks are all running at the exact same instant. Instead, it means a task can be started, it can run for a while, and then it can be paused. While it is paused, another task can be started and allowed to run. The system effectively juggles these tasks, switching between them to make progress on all of them. Their execution periods overlap, giving the illusion of simultaneous execution.
This is different from parallelism. Parallelism is when multiple tasks are running at the exact same physical time. This requires hardware with multiple processing units, such as a multi-core CPU. If you have four CPU cores, you can run four tasks in true parallel. Concurrency, on the other hand, can be achieved even on a single-core CPU. It is a structural property of a program, while parallelism is a runtime behavior dependent on hardware. The source article correctly notes that concurrency is a broader term than parallelism.
The Problem: I/O Bound vs. CPU Bound
To understand why we need async IO, we must first understand the two main types of tasks a program performs. The first type is “CPU-bound.” These are tasks that involve heavy computation. Think of resizing a large image, encrypting a file, or calculating a complex mathematical model. The task is limited by the speed of the CPU. To make these tasks faster, you generally need a faster CPU or multiple CPUs (parallelism).
The second type is “I/O-bound.” I/O stands for “Input/Output.” These are tasks that spend most of their time waiting for something else. This “something else” could be a response from a web server, a query to a database, or reading a large file from a slow hard drive. While the program is waiting for the network or the disk, the CPU is sitting idle, doing nothing. This is the specific problem that asynchronous programming is designed to solve. It aims to put the CPU to work while it would otherwise be waiting.
Traditional Solutions: Threads and Multiprocessing
Before async IO became popular, Python offered two primary tools for concurrency. The first is threading. A thread is a separate flow of execution. Your operating system can manage multiple threads, switching between them. If one thread is blocked waiting for a network request, the operating system can switch to another thread to keep the CPU busy. However, due to Python’s Global Interpreter Lock (GIL), only one thread can execute Python bytecode at a time, so it does not provide true parallelism for CPU-bound tasks.
The second tool is multiprocessing. This module bypasses the GIL by creating entirely new processes. Each process gets its own Python interpreter and memory space. This allows for true parallelism and is excellent for CPU-bound tasks. However, creating and managing processes is “heavy.” It uses more memory, and communicating between processes is more complex than communicating between threads. Both of these solutions are powerful, but they also introduce complexity and resource overhead.
What is Async IO in Python?
Async IO, provided by the asyncio library, is Python’s third approach to concurrency. It is a framework for writing single-threaded, concurrent code using a concept called “cooperative multitasking.” Instead of relying on the operating system to switch between threads, or creating new processes, asyncio manages everything within a single thread. It does this by using an “event loop” and special functions called “coroutines.” This approach is particularly well-suited for high-performance I/O-bound applications.
As the source article states, asyncio is a library package used to write concurrent code using the async and await syntax. It can be imported simply with import asyncio. This model allows a program to manage tens of thousands of concurrent I/O operations (like network connections) far more efficiently than using a separate thread for each one. It achieves concurrency without the overhead of threads or the complexity of multiprocessing.
Setting up Your Environment
The first step to using asyncio is ensuring you have the correct Python version. As the source material notes, you will need Python 3.7 or a newer version. These versions have greatly simplified the asyncio API, particularly with the introduction of asyncio.run(), which makes it much easier to get started.
It is a universal best practice in Python development to use a virtual environment for every project. A virtual environment is an isolated directory that contains a specific version of Python and its own set of installed libraries. This prevents conflicts between the dependencies of different projects. As shown in the source, you can create one using the venv module that comes built-in with Python.
Creating and Activating a Virtual Environment
To create a virtual environment, open your terminal or command prompt, navigate to your project’s folder, and run the command: python -m venv myenv. This command creates a new directory named “myenv” which will hold your environment’s files. This only needs to be done once per project.
To “activate” or start using the environment, you must run a special script. On Windows, as the source article mentions, the command is myenv\Scripts\activate. On Unix-based systems like macOS or Linux, the command is source myenv/bin/activate. Once activated, your terminal prompt will usually change to show the environment’s name. Now, any pip commands you run will install packages into this isolated environment, leaving your global Python installation clean.
Installing Asynchronous Libraries
With your virtual environment activated, you can now install the libraries you need. The asyncio library itself is part of Python’s standard library, so it does not need to be installed. However, to do anything useful with it, you need third-party libraries that support asyncio. A normal library, like the popular requests library for HTTP, will not work. Calling a blocking function from a synchronous library will freeze your entire asyncio application.
You must use “async-native” libraries. The source article suggests two popular ones. aiohttp is an “asynchronous HTTP client/server” framework. It is the asyncio equivalent of requests. aioredis is an asynchronous library for “Redis,” a popular in-memory database. To install them, you use pip, as shown in the source: pip install aiohttp aioredis. These libraries are built from the ground up to be non-blocking and to integrate perfectly with asyncio.
Async IO “Hello, World”
Let us look at the simple example from the source article. This is the “Hello, World” of asyncio.
Python
import asyncio
async def hello():
print(“Hello”)
await asyncio.sleep(1)
print(“World”)
asyncio.run(hello())
This short program demonstrates all the core concepts. The import asyncio line brings in the library. The async def syntax defines a “coroutine” function named hello. Inside this function, print(“Hello”) is a normal, synchronous function call.
The most important line is await asyncio.sleep(1). This is the asynchronous, non-blocking part. asyncio.sleep() is an async function that tells the program to “pause here for one second.” The await keyword gives control back to the asyncio event loop. The loop can then go and do other work during that one second instead of just freezing. After the second is up, the loop resumes the hello function, which then runs print(“World”). Finally, asyncio.run(hello()) is the main entry point that starts the event loop and runs the hello coroutine.
Understanding Coroutines
As the source article highlights, coroutines are a critical part of asyncio. But what are they? A coroutine is, in essence, a function that can be paused and resumed. When you define a function using async def, you are not creating a normal function. When you call it, like hello(), it does not immediately run the code inside. Instead, it returns a “coroutine object.”
This object is like a blueprint for the work that needs to be done. It contains the code, the current state, and its context. It does not do anything until you hand it over to the asyncio event loop. The loop is responsible for actually executing, pausing, and resuming the coroutine. This ability to pause is what enables cooperative multitasking. A coroutine can run until it hits an await on a slow I/O operation, and it “cooperatively” pauses itself, yielding control back to the loop so another coroutine can run.
Introducing the async and await Keywords
The async and await keywords are the special syntax that makes all of this work. The async keyword is used in two places. First, as we have seen, async def is used to declare a coroutine function. This flags it to Python as a special function that must be managed by an event loop.
Second, async for and async with are used to create asynchronous loops and context managers, which are more advanced features. The await keyword, as its name suggests, is used to “await” the result of another asynchronous operation. You can only use the await keyword inside an async def function. When you await something, you are telling the asyncio loop, “I am pausing my execution here until this other thing is complete. You can go run other tasks in the meantime.” This is the explicit “pause” point.
A Deeper Dive into Coroutines
In the first part, we introduced coroutines as special functions defined with async def. Now, let us explore them more deeply. The concept of a coroutine actually predates asyncio. It comes from the idea of “generators” in Python. Generators, which use the yield keyword, are functions that can be paused. When a generator “yields” a value, its state is frozen, and it returns the value. It can then be resumed from that exact point later.
Coroutines are a supercharged version of this idea. A coroutine defined with async def is a function that can pause its execution when it encounters an await expression. It pauses, allowing other code to run, and then resumes when the awaited operation is complete. This “pause” is the cooperative part of “cooperative multitasking.” The coroutine explicitly and voluntarily gives up control, rather than being forcibly interrupted by an operating system scheduler like a thread is.
What is an “Awaitable”?
The await keyword is used to pause a coroutine, but you cannot await just anything. You can only await an object that is “awaitable.” In the world of asyncio, there are three main types of awaitable objects: coroutines, Tasks, and Futures.
A coroutine object is what you get when you call an async def function. As we saw in await asyncio.sleep(1), you can await another coroutine. This is the most common case. You await an I/O operation from a library (like aiohttp) that is itself an async def function. When you await another coroutine, asyncio transparently schedules it to run, waits for it to finish, and then gives you the result.
Tasks: The Schedulers of Work
A coroutine object on its own does not run. You need to tell asyncio to run it. When you pass a coroutine to asyncio.run(), it gets wrapped in a Task and scheduled. A Task is the primary way asyncio manages the execution of a coroutine. A Task is an object that “wraps” a coroutine and handles its lifecycle. It schedules the coroutine on the event loop, runs it to its first await, and then resumes it when its awaited operation is complete.
You can create tasks manually using asyncio.create_task(). This is how you tell asyncio, “I want this coroutine to start running in the background, concurrently with my current coroutin.” The Task object is also an awaitable, so you can await it later to get its result or to just make sure it has finished. Tasks are what make concurrent execution possible.
Futures: The Low-Level Placeholders
A Future is a low-level awaitable object that represents the eventual result of an asynchronous operation. When you create a Future, it is like an empty box or a placeholder. You can await this empty box, and your coroutine will pause. At some point, some other part of the system (usually a low-level I/O component) will put a value into the box (or an error) and mark it as “done.” When that happens, any coroutine that was awaiting the Future will be woken up and resumed.
You will rarely create Future objects manually in modern asyncio code. Task is actually a subclass of Future. When you create a Task, you are creating a special kind of Future that knows how to run a coroutine to get its own result. Understanding Futures is helpful for understanding how asyncio works under the hood and for interfacing with older, callback-based code.
The Central Role of the Event Loop
The “event loop” is the heart of every asyncio application. It is the central scheduler that manages all the tasks, I/O operations, and events. You can think of it as the juggler from our earlier analogy. The event loop has a queue of tasks that are ready to run. It picks a task from the queue and runs it. That task’s coroutine executes its code until it hits an await keyword.
When a coroutine awaits an operation (like asyncio.sleep(1) or a network request), it is essentially telling the event loop, “I am going to be busy waiting for 1 second. Please pause me and go do something else.” The event loop pauses that task, places it in a “waiting” state, and then moves on to the next task in the “ready” queue. This is a single, continuous loop: run a task, pause it at an await, run another task, check if any waiting tasks are done, move them to the ready queue, and repeat.
What asyncio.run() Really Does
In modern Python, you will most often interact with the event loop through a single, high-level function: asyncio.run(). As the source example asyncio.run(hello()) shows, this is the main entry point for an asyncio program. This simple function performs several steps for you.
First, it creates a brand new event loop. Second, it takes the coroutine object you passed it (e.g., hello()) and wraps it in a Task to schedule it. Third, it starts the event loop, which begins running that task. The loop will continue to run until that initial task, and any other tasks it creates, are all complete. Finally, once the main task is finished, asyncio.run() will shut down the event loop, clean up any resources, and then return the result from the main coroutine.
Running Tasks Concurrently: asyncio.gather()
The hello() example from Part 1 is not concurrent. It does one thing, sleeps, and then does another. The real power of asyncio is shown when you run multiple tasks at once. The source article provides a perfect example of this using asyncio.gather(). Let us analyze that example in detail.
Python
import asyncio
async def task1():
print(“Event 1 started”)
await asyncio.sleep(2)
print(“Event 1 completed”)
async def task2():
print(“Event 2 started”)
await asyncio.sleep(1)
print(“Event 2 completed”)
async def main():
await asyncio.gather(task1(), task2())
asyncio.run(main())
Here, we have a main coroutine that we use as our entry point. Inside main, we call asyncio.gather(). This function takes one or more awaitables (in this case, the two coroutine objects task1() and task2()) and schedules them to run concurrently.
Walking Through the gather() Example
When asyncio.run(main()) starts, the main coroutine begins. It immediately hits await asyncio.gather(task1(), task2()). The gather function tells the event loop to start running both task1 and task2. The loop starts task1 first. task1 prints “Event 1 started” and then immediately hits await asyncio.sleep(2). It pauses and tells the loop, “Wake me up in 2 seconds.”
The event loop, now free, starts task2. task2 prints “Event 2 started” and then hits await asyncio.sleep(1). It also pauses, telling the loop, “Wake me up in 1 second.” Now the event loop is idle, with two tasks “sleeping.” After one second passes, the loop wakes up task2, which prints “Event 2 completed” and finishes. After another second (two seconds total), the loop wakes up task1, which prints “Event 1 completed” and finishes. The entire operation takes only 2 seconds, not 3 (2 + 1), because the tasks ran concurrently.
The asyncio.gather() Output
The output from the source’s example is key:
Event 1 started
Event 2 started
Event 2 completed
Event 1 completed
This output perfectly demonstrates the concurrent, overlapping execution. The program starts task1, then immediately starts task2 before task1 is finished. task2 finishes first because its sleep duration was shorter. Then, task1 finishes. This is the non-blocking, concurrent behavior in action. The await asyncio.gather(…) line will not complete until all the tasks passed to it have finished.
asyncio.create_task() for True Concurrency
While asyncio.gather() is great for “run these things and wait for them all,” sometimes you want to start a task in the background and continue doing other work. This is done with asyncio.create_task(). This function takes a coroutine and schedules it to run on the loop immediately. It returns a Task object, which you can await later.
Here is an alternative way to write the main function:
Python
async def main():
task_1 = asyncio.create_task(task1())
task_2 = asyncio.create_task(task2())
# We can do other work here while task_1 and task_2 run
print(“Tasks have been created and are running”)
# Now, wait for them to finish
await task_1
await task_2
This approach gives you more flexibility. You can create tasks, let them run, do other processing, and then await them at the very end to ensure they are complete before the main function exits. This is a more common and powerful pattern for building complex applications.
Why We Need Async Libraries
We have established that asyncio is a framework for managing concurrent I/O-bound tasks. However, asyncio itself does not know how to perform I/O. It does not come with a built-in HTTP client or a database driver. It provides the event loop and the async/await syntax, but it relies on a rich ecosystem of third-party libraries to perform the actual I/O operations.
A critical, non-negotiable rule of asyncio is that you must never call a blocking, synchronous function. If you use the standard requests.get() library in an async function, you will freeze the entire event loop. The requests library is not designed for asyncio; it blocks the thread while waiting for the network. To perform I/O, you must use libraries specifically designed with async def functions, like aiohttp, which was mentioned in the source article.
The Synchronous Problem: A requests Example
Let’s first look at a synchronous program that fetches data from three different URLs. We will use the popular requests library.
Python
import requests
import time
def fetch_url(url):
print(f”Fetching {url}…”)
requests.get(url) # This is a blocking call
print(f”Done fetching {url}”)
start = time.time()
fetch_url(“https.com”) # A placeholder, real URLs would be used
fetch_url(“https.com”)
fetch_url(“https.com”)
end = time.time()
print(f”Finished in {end – start:.2f} seconds”)
If each request takes 1 second, this program will run in 3 seconds. First, it fetches URL 1 (1 second). Then, it fetches URL 2 (1 second). Then, it fetches URL 3 (1 second). The execution is sequential. The CPU is idle for most of those 3 seconds, just waiting for the network.
The Asynchronous Solution: An aiohttp Example
Now, let’s convert that synchronous code into a high-performance asynchronous version using aiohttp. First, you must install it: pip install aiohttp.
Python
import aiohttp
import asyncio
import time
async def fetch_url(session, url):
print(f”Fetching {url}…”)
async with session.get(url) as response:
await response.text() # We must await the I/O
print(f”Done fetching {url}”)
async def main():
async with aiohttp.ClientSession() as session:
tasks = [
fetch_url(session, “https.com”),
fetch_url(session, “https.com”),
fetch_url(session, “https.com”)
]
await asyncio.gather(*tasks)
start = time.time()
asyncio.run(main())
end = time.time()
print(f”Finished in {end – start:.2f} seconds”)
If each request takes 1 second, this entire program will run in just over 1 second. It starts all three requests concurrently. It awaits all of them at once. All three “waiting” periods overlap, and the program finishes when the slowest of the three requests is complete. This is a massive performance gain for I/O-bound workloads.
Breaking Down the aiohttp Example
Let us analyze the new asynchronous code. We define an async def fetch_url. It now takes a session object. aiohttp recommends using a single ClientSession for making multiple requests, as it efficiently manages the underlying connection pool.
Inside fetch_url, the line async with session.get(url) as response: is the asynchronous, non-blocking way to make a GET request. We use async with because setting up and tearing down the request are themselves async operations. We then await response.text() to read the content. This is also an I/O operation, so it must be awaited. In the main function, we create the ClientSession and then build a list of coroutine objects. We use asyncio.gather(*tasks) to run them all concurrently.
Working with Asynchronous Files
Network requests are not the only form of I/O. What about reading and writing files from the disk? By default, Python’s standard open(), read(), and write() functions are synchronous and blocking. If you try to read a very large 1GB file, your entire asyncio event loop will freeze until the read is complete. This is another trap that can kill your application’s performance.
The solution is to use a library designed for asynchronous file operations. The most popular one is aiofiles. You can install it with pip install aiofiles. This library provides an async version of the standard file API, allowing you to await file operations and keep the event loop unblocked.
A Synchronous File Operation
First, let us look at the blocking problem.
Python
import time
def write_file_sync():
print(“Writing file…”)
with open(“test.txt”, “w”) as f:
f.write(“This is a large file.”) # Blocks here
time.sleep(2) # Simulating a slow disk write
print(“File write complete”)
async def do_something_else():
print(“Doing something else…”)
await asyncio.sleep(1)
print(“Done with something else”)
If you tried to run write_file_sync() and do_something_else() concurrently, the blocking open() and write() would freeze the loop, and do_something_else would be unable to run until the 2-second simulated write was finished.
The aiofiles Solution
Now, let us fix this using aiofiles.
Python
import aiofiles
import asyncio
import time
async def write_file_async():
print(“Writing file async…”)
async with aiofiles.open(“test.txt”, “w”) as f:
await f.write(“This is a large file.”) # Non-blocking
await asyncio.sleep(2) # Simulating slow I/O
print(“File write complete async”)
async def do_something_else():
print(“Doing something else…”)
await asyncio.sleep(1)
print(“Done with something else”)
async def main():
await asyncio.gather(
write_file_async(),
do_something_else()
)
asyncio.run(main())
In this version, the write_file_async coroutine uses async with aiofiles.open(). When it hits await f.write(), it yields control. The event loop is now free and can immediately start the do_something_else() task, which prints its message and starts its own 1-second sleep. The output will show the tasks running concurrently, and the total time will be around 2 seconds (the duration of the longest task), not 3.
Working with Asynchronous Databases
Databases are the most common source of I/O blocking in web applications. Just like requests, standard database drivers like psycopg2 (for PostgreSQL) or mysql-connector are synchronous. Calling cursor.execute() will block the event loop. To build a high-performance async application, you must use an async database driver.
The source article mentioned aioredis, which is an excellent driver for the Redis in-memory database. For traditional SQL databases, you have options like asyncpg for PostgreSQL (known for its incredible speed) or aiomysql for MySQL. These libraries provide async def functions for connecting, querying, and fetching results.
An aioredis Example
Let’s expand on the source’s suggestion and see a simple aioredis example. First, install it: pip install aioredis. You also need a Redis server running.
Python
import aioredis
import asyncio
async def main():
# Connect to the Redis server
redis = await aioredis.from_url(“redis://localhost”)
# Set a value, this is a non-blocking I/O operation
await redis.set(“my_key”, “hello_async”)
print(“Set ‘my_key’ in Redis”)
# Get the value back, also non-blocking
value = await redis.get(“my_key”)
print(f”Got value: {value.decode(‘utf-8’)}”)
# Close the connection
await redis.close()
asyncio.run(main())
Every interaction with the Redis server, such as await redis.set(…) and await redis.get(…), is an await point. This means that while your program is waiting for the Redis server to respond over the network, the event loop is free to run other tasks. This allows a single server to handle thousands of concurrent database operations efficiently.
Why This Ecosystem Matters
These examples—aiohttp, aiofiles, and aioredis—illustrate the core pattern of asyncio development. The asyncio library itself is the engine, and the aio* libraries are the car’s components (the wheels, the transmission, the steering). You cannot build a useful application with just asyncio. You must build it with the rich ecosystem of libraries that have been designed to be non-blocking and to speak the async/await language.
When choosing a new database, message queue, or API client for your asyncio project, the very first question you must ask is, “Does it have an asyncio-native library?” If the answer is no, you will either need to find an alternative or use advanced techniques (like asyncio.to_thread(), which we will cover later) to avoid blocking the event loop.
Beyond asyncio.gather()
In Part 2, we introduced asyncio.gather() as the primary way to run a list of tasks and wait for them all to complete. This is a fantastic tool for many common “fire and forget” scenarios, where you just want to run a batch of jobs. However, more complex applications require more fine-grained control over their concurrent tasks.
What if you want to set a timeout on a task? What if you want to stop a task that is running for too long? What if you do not want to wait for all tasks, but just the first one to finish? asyncio provides a powerful set of functions for these advanced management scenarios, including asyncio.wait_for(), asyncio.wait(), and the Task object’s cancel() method.
Handling Timeouts with asyncio.wait_for()
A very common problem in I/O operations is that a remote server might be slow or completely unresponsive. Your program could await a network request and get stuck waiting forever, freezing that part of your application. To prevent this, you should almost always wrap your I/O calls in a timeout. The asyncio.wait_for() function is the tool for this.
asyncio.wait_for() takes an awaitable (like a coroutine) and a timeout value in seconds. It runs the coroutine, but if the timeout is exceeded before the coroutine finishes, it raises an asyncio.TimeoutError. You can catch this error to handle the timeout gracefully.
Python
import asyncio
async def slow_operation():
print(“Starting slow operation…”)
await asyncio.sleep(5) # Simulating a 5-second task
print(“Slow operation complete”)
return “Done”
async def main():
try:
result = await asyncio.wait_for(slow_operation(), timeout=3.0)
print(f”Operation succeeded: {result}”)
except asyncio.TimeoutError:
print(“The operation timed out!”)
asyncio.run(main())
In this example, wait_for is given a 3-second timeout for a 5-second operation. After 3 seconds, wait_for will raise TimeoutError, which we catch. The slow_operation task is also automatically cancelled.
Understanding Task Cancellation
When asyncio.wait_for() times out, it cancels the underlying task. Cancellation is an explicit action in asyncio. You can also do it manually. When you have a Task object (from asyncio.create_task()), you can call its cancel() method. This will cause an asyncio.CancelledError to be injected into the coroutine at its next await point.
This is a critical concept. A task does not just stop immediately. It continues running until it next awaits something. At that point, instead of pausing, it will raise CancelledError. Your coroutine can, and should, use a try…finally block to catch this error and clean up any resources (like closing a file or a network connection).
Python
async def cancellable_task():
try:
print(“Task started…”)
await asyncio.sleep(10) # Pauses here
print(“Task finished”)
except asyncio.CancelledError:
print(“Task was cancelled!”)
# Perform cleanup here
raise
async def main():
task = asyncio.create_task(cancellable_task())
await asyncio.sleep(1) # Let the task start
task.cancel()
try:
await task # We must await the task to see the error
except asyncio.CancelledError:
print(“Main caught the cancellation.”)
asyncio.run(main())
The output will show “Task started…” and then “Task was cancelled!” This shows the try…except block inside the coroutine successfully caught the error.
Fine-Grained Control with asyncio.wait()
asyncio.gather() is an all-or-nothing function. asyncio.wait() is its more complex, low-level, and powerful alternative. asyncio.wait() takes a set of tasks and allows you to specify when it should return. You can control this with the return_when parameter.
The options for return_when are asyncio.ALL_COMPLETED (the default, which makes it similar to gather), asyncio.FIRST_COMPLETED, and asyncio.FIRST_EXCEPTION. The FIRST_COMPLETED option is particularly useful. It tells asyncio.wait() to return as soon as any one of the tasks in the set has finished.
Using asyncio.wait() for the First Result
Imagine you are querying three redundant API servers for the same piece of data. You do not care which one answers; you just want the fastest response. This is a perfect use case for asyncio.wait().
Python
import asyncio
async def query_server(name, delay):
await asyncio.sleep(delay)
return f”Response from {name}”
async def main():
tasks = {
asyncio.create_task(query_server(“Server A”, 2)),
asyncio.create_task(query_server(“Server B”, 1)),
asyncio.create_task(query_server(“Server C”, 3)),
}
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
# ‘done’ is a set of the tasks that finished
first_result = done.pop().result()
print(f”Got first result: {first_result}”)
# ‘pending’ is a set of tasks still running
for task in pending:
task.cancel() # Clean up the other tasks
await asyncio.gather(*pending, return_n=True) # Wait for cleanup
asyncio.run(main())
This program will print “Got first result: Response from Server B” after just one second. asyncio.wait() returns two sets: done and pending. We get the result from the done set and then immediately cancel all the pending tasks, saving network resources.
asyncio.TaskGroup (Python 3.11+)
The patterns with create_task and gather are so common that Python 3.11 introduced a new, cleaner, and safer way to manage concurrent tasks: asyncio.TaskGroup. A TaskGroup is a context manager that waits for all tasks spawned within it to complete. It is like asyncio.gather(), but the syntax is often clearer.
A TaskGroup also provides stronger guarantees. If any task within the group fails with an exception, all other tasks in the group are automatically cancelled. This concept is called “structured concurrency” and it makes programs much more robust and easier to reason about.
Python
# This code requires Python 3.11 or newer
async def task1():
await asyncio.sleep(1)
print(“Task 1 done”)
async def task2():
await asyncio.sleep(2)
print(“Task 2 done”)
raise ValueError(“Task 2 failed!”)
async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(task1())
tg.create_task(task2())
# The ‘async with’ block will not exit until both tasks are done
except* ValueError as e:
print(f”TaskGroup caught exception(s): {e.exceptions}”)
# asyncio.run(main())
This example shows how the TaskGroup automatically manages the tasks. When task2 fails, task1 will be cancelled, and the ExceptionGroup (note the except*) is raised after the block exits.
Chaining Coroutines
In Part 1, the source article showed a simple demo function that awaited another.
Python
async def demo1():
await asyncio.sleep(1)
return “Result from demo1”
async def demo():
# Execution will be paused here and return to demo() when demo1() is ready
asy = await demo1()
return asy
This is called “chaining” coroutines, and it is the fundamental building block of an async program. The demo coroutine is dependent on the result of demo1. When demo awaits demo1, demo itself is paused. The event loop then runs demo1. demo1 awaits asyncio.sleep(1), so it is also paused.
The event loop is now free to run other tasks. After 1 second, the loop wakes up demo1, which returns its string. This unblocks the demo coroutine, which receives the return value into the asy variable and then returns it. This shows how await propagates the “pausing” state up the call stack, allowing the event loop to take control.
Running Blocking Code in an Async World
What happens if you must use a synchronous, blocking library (like requests)? You cannot call it directly, or it will block the loop. asyncio provides a critical escape hatch for this: asyncio.to_thread(). This function takes a blocking synchronous function (and its arguments) and runs it in a separate thread pool.
This frees the main event loop to continue running other coroutines. From the perspective of your coroutine, you just await asyncio.to_thread() as if it were a true async function.
Python
import asyncio
import requests # The blocking library
def blocking_http_call():
print(“Starting blocking call in thread…”)
response = requests.get(“https.com”) # Blocks this thread
print(“Blocking call finished”)
return response.status_code
async def main():
print(“Starting main coroutine”)
status = await asyncio.to_thread(blocking_http_call)
print(f”Got status: {status}”)
print(“Main coroutine finished”)
asyncio.run(main())
This pattern is the correct and safe way to mix synchronous, blocking I/O code with an asyncio application. It bridges the gap between the old synchronous world and the new asynchronous one.
Shielding a Task from Cancellation
Sometimes, you may have a critical operation that you do not want to be cancelled, even if the parent task is cancelled. For example, you might be writing a final “I’m done” status to a database. If the parent task is cancelled, you still want this write operation to complete.
For this, asyncio provides asyncio.shield(). This function “shields” an awaitable from cancellation. If the main coroutine containing the shield is cancelled, the shielded task will not be cancelled. It will continue running in the background. The main coroutine will still raise CancelledError, but the protected operation will run to completion.
Python
async def critical_operation():
print(“Critical write started…”)
await asyncio.sleep(3) # Cannot be cancelled
print(“Critical write complete!”)
async def main():
try:
await asyncio.wait_for(
asyncio.shield(critical_operation()),
timeout=1
)
except asyncio.TimeoutError:
print(“Main task timed out (and was cancelled).”)
# The critical operation is still running!
await asyncio.sleep(3) # Wait for it to finish
print(“Main exiting”)
# asyncio.run(main())
The output of this program will show the timeout error, but 3 seconds later, it will also print “Critical write complete!” This demonstrates the shield was successful.
The Problem of Shared State
When you run multiple tasks concurrently, you will eventually face a new problem: what if two tasks need to access or modify the same piece of data at the same time? This is known as a “race condition.” For example, if you have a variable counter = 0 and two tasks try to increment it ( counter += 1 ) at the same time, the final result might be 1 instead of 2. This is because both tasks might read the value 0 before either one has a chance to write the new value 1.
In asyncio, this is slightly less of a problem than in threading because a coroutine can only be “interrupted” at an await point. However, if you have an await between reading a value and writing it back, you have a race condition. To solve this, asyncio provides synchronization primitives that are very similar to those in the threading module.
asyncio.Lock: Protecting Critical Sections
The most fundamental synchronization tool is the asyncio.Lock. A Lock is an object that can be “acquired” by only one task at a time. It is like a bathroom key in an office. Only one person can have the key (acquire the lock) at a time. Anyone else who wants to use the bathroom (access the shared resource) must wait until the first person is done and returns the key (releases the lock).
In asyncio, a coroutine awaits the lock.acquire() method. If the lock is available, the coroutine acquires it and continues. If it is held by another task, the coroutine pauses at the await until the lock is released. The best way to use a lock is with the async with statement.
Python
import asyncio
counter = 0
lock = asyncio.Lock()
async def increment_counter():
global counter
async with lock:
# This is the “critical section”
# Only one task can be in here at a time
temp = counter
await asyncio.sleep(0.01) # Simulate I/O
counter = temp + 1
async def main():
tasks = [increment_counter() for _ in range(100)]
await asyncio.gather(*tasks)
print(f”Final counter: {counter}”) # Will be 100
asyncio.run(main())
Without the lock, the await between reading temp and writing counter would create a race condition, and the final counter would be a random low number. With the lock, the final counter is correctly 100.
asyncio.Queue: Producer-Consumer Problems
A Queue is a data structure designed for safely passing data between concurrent tasks. It is the core of the “producer-consumer” pattern. You have one or more “producer” tasks that generate data and put it into the queue. You also have one or more “consumer” tasks that get data from the queue and process it.
An asyncio.Queue is “async-aware.” If a producer tries to put an item into a full queue, it will pause until a slot is free. If a consumer tries to get an item from an empty queue, it will pause until an item becomes available. This waiting is non-blocking and allows the event loop to run other tasks.
Python
import asyncio
import random
async def producer(queue):
for i in range(5):
data = f”data_item_{i}”
await asyncio.sleep(random.random()) # Simulate work
await queue.put(data)
print(f”Produced: {data}”)
async def consumer(queue):
while True:
data = await queue.get() # Pauses if queue is empty
print(f”Consumed: {data}”)
# Process the data…
queue.task_done() # Signal that this item is done
async def main():
queue = asyncio.Queue(maxsize=2)
pro = asyncio.create_task(producer(queue))
con = asyncio.create_task(consumer(queue))
await pro # Wait for producer to finish
await queue.join() # Wait for all items to be processed
con.cancel() # Stop the consumer
asyncio.run(main())
The queue.join() method is important. It blocks until task_done() has been called for every item that was put into the queue, ensuring all work is finished.
asyncio.Semaphore: Limiting Concurrency
Sometimes you do not want to stop concurrent access, but you want to limit it. A common example is interacting with a third-party API that has a rate limit. The API might only allow you to make 10 concurrent connections. If your program spawns 1000 tasks to hit this API, you will be blocked or banned.
An asyncio.Semaphore is the tool for this. A semaphore is initialized with a number, for example, 10. It is like a club with 10 bouncers. It will allow 10 tasks to “acquire” it and enter the club (run the code). The 11th task that tries to acquire it will have to wait outside (pause) until one of the first 10 tasks “releases” the semaphore and leaves.
Python
import asyncio
async def fetch_api(session, url, semaphore):
async with semaphore:
# Only 5 tasks can be in this block at a time
print(f”Fetching {url}…”)
# response = await session.get(url)
await asyncio.sleep(1) # Simulate request
print(f”Done with {url}”)
return url
async def main():
semaphore = asyncio.Semaphore(5) # Limit to 5 concurrent tasks
urls = [f”url_{i}” for i in range(20)]
# We create 20 tasks, but the semaphore
# will ensure only 5 run at any given time.
tasks = [fetch_api(None, url, semaphore) for url in urls]
await asyncio.gather(*tasks)
asyncio.run(main())
This program will run in batches of 5, taking about 4 seconds total (20 tasks / 5 concurrent = 4 batches of 1 second each).
asyncio.Event: Coordinating Tasks
An asyncio.Event is a simple but powerful tool for communication between coroutines. It is a boolean flag that tasks can wait() for. An Event starts in the “cleared” state (False). Any task that awaits event.wait() will pause. Another task can then call event.set(), which sets the flag to “set” (True). When this happens, all tasks that were waiting for the event are immediately woken up and allowed to proceed.
This is perfect for “startup” or “shutdown” logic. You might have a main task that needs to set up a database connection, and several worker tasks that must not start until that connection is ready.
Python
import asyncio
db_ready = asyncio.Event()
async def setup_database():
print(“Connecting to database…”)
await asyncio.sleep(2) # Simulate connection setup
print(“Database is ready!”)
db_ready.set() # Set the event, waking up workers
async def worker(name):
print(f”Worker {name} is waiting for database…”)
await db_ready.wait() # Pauses here until event is set
print(f”Worker {name} is starting work.”)
# … do work …
async def main():
await asyncio.gather(setup_database(), worker(“A”), worker(“B”))
asyncio.run(main())
The output shows both workers waiting, and as soon as the database is ready, both start their work simultaneously.
asyncio.Barrier: Waiting for Each Other
A Barrier is a more complex primitive. It is like a “checkpoint.” You can create a barrier for a specific number of tasks (e.g., 3). All 3 tasks will run until they await barrier.wait(). The first two tasks to arrive will pause. When the third and final task hits the barrier, all three tasks are unblocked at the same time and allowed to proceed.
This is useful in multi-stage computations where you need to ensure all tasks have completed “Stage 1” before any of them are allowed to begin “Stage 2.”
Python
import asyncio
async def worker(barrier, delay):
print(f”Task started, working for {delay}s (Stage 1)”)
await asyncio.sleep(delay)
print(“Task at the barrier, waiting…”)
await barrier.wait() # Pauses here
print(“Barrier passed! (Stage 2)”)
async def main():
# Create a barrier for 3 tasks
barrier = asyncio.Barrier(3)
tasks = [
asyncio.create_task(worker(barrier, 1)),
asyncio.create_task(worker(barrier, 2)),
asyncio.create_task(worker(barrier, 3)),
]
await asyncio.gather(*tasks)
asyncio.run(main())
The output will show all three tasks arriving at the barrier at different times. After 3 seconds, when the last task arrives, all three will print “Barrier passed!” at the same time.
asyncio.Condition: Advanced Lock and Event
A Condition combines a Lock with an Event. It is a more advanced primitive for complex state management. Tasks can acquire the condition (which is a lock), and then await condition.wait(). This releases the lock and pauses the task until another task acquires the lock and calls condition.notify().
This is useful for complex scenarios where a consumer needs to wait for a specific condition to be true. A producer task can acquire the lock, modify the shared state, and then call condition.notify() to wake up one or more waiting consumers, who will then re-check the state. This is more flexible than a Queue as it allows tasks to wait on arbitrary logical conditions, not just “is the queue non-empty?”
The Challenges of Asynchronous Code
While asyncio provides incredible performance benefits, it is not free. Asynchronous code introduces new and unique challenges in debugging and testing. Because the execution of a function can be paused and interleaved with other functions, the “stack trace” of an error is often not a simple, linear path. Bugs related to timing (race conditions) or resource management (a task that is never awaited) can be subtle and difficult to find.
Traditional debugging techniques, like setting a simple breakpoint, may not work as expected because the program’s state is constantly shifting between different tasks. This final part of the series will cover the tools and techniques for debugging, testing, and safely integrating asyncio code into your projects.
Asyncio Debug Mode
asyncio comes with a built-in “debug mode” that can be a huge help. It provides more verbose logging and can detect common problems. The two most important things it does are:
- Log “slow” coroutines: It will log a warning if a coroutine takes too long to execute between await points, which is a sign that you have a blocking, synchronous call in your code.
- Detect “never-awaited” coroutines: If you call an async def function but forget to await it or create_task(), the coroutine will never run. Debug mode will detect this and log a warning.
You can enable debug mode in two ways. The simplest is to set an environment variable before running your script: PYTHONASYNCIODEBUG=1. Alternatively, you can pass debug=True to the asyncio.run() function: asyncio.run(main(), debug=True). Running in debug mode during development is a highly recommended best practice.
Common Pitfall: Calling a Coroutine Without Awaiting
The single most common mistake for beginners is calling an async def function and forgetting to await it.
Python
async def my_coro():
print(“This will never print!”)
async def main():
# WRONG
my_coro()
# RIGHT
await my_coro()
# ALSO RIGHT
asyncio.create_task(my_coro())
In the “WRONG” example, my_coro() is called. This does not run the code inside. It just creates a coroutine object, which is then immediately discarded. The program will not print anything and will not raise an error (unless debug mode is on). This is a silent failure that can be very confusing. You must always await a coroutine or schedule it as a Task.
Common Pitfall: Calling Synchronous Blocking Code
The second most common mistake is calling a blocking I/O function in a coroutine.
Python
import time
import requests
async def my_bad_coro():
print(“Blocking the event loop…”)
# WRONG: This freezes the entire program
time.sleep(5)
print(“…unblocked”)
async def other_coro():
print(“I want to run!”)
async def main():
await asyncio.gather(
my_bad_coro(),
other_coro()
)
In this example, my_bad_coro will call time.sleep(5). This is a synchronous, blocking call. It freezes the one and only thread. The event loop is stuck and cannot switch to other_coro. The output will be “Blocking…”, then a 5-second freeze, then “…unblocked”, and only then “I want to run!”. You must use await asyncio.sleep(5) or await asyncio.to_thread(time.sleep, 5).
Testing Asynchronous Code with pytest-asyncio
Testing async def functions is not as simple as testing normal functions, because you need a running event loop to await them. The standard Python unittest module has some support, but the community standard is a plugin called pytest-asyncio.
After installing pytest and pytest-asyncio, you can write your tests as async def functions and mark them with @pytest.mark.asyncio. The plugin will automatically manage the event loop for you.
Python
# In a file named test_my_app.py
import pytest
# The function we want to test
async def add_async(a, b):
await asyncio.sleep(0) # Simulate async work
return a + b
@pytest.mark.asyncio
async def test_add_async():
result = await add_async(2, 3)
assert result == 5
You can then run this test from your terminal with the simple pytest command. This makes testing async code just as easy as testing synchronous code.
Interfacing with Synchronous Code
We have already seen asyncio.to_thread(), which allows an async function to safely call a sync function. But what about the other way around? What if you have a large, existing synchronous application (like a Flask web app) and you want to call an async function from it?
You cannot just await from synchronous code. The solution is to get access to an event loop. If you are in the main thread and no loop is running, you can simply use asyncio.run(). This will create a new loop, run your function, and shut the loop down.
Python
import asyncio
async def my_async_utility():
await asyncio.sleep(1)
return “Async result”
def my_sync_function():
print(“In sync code, calling async…”)
# Create a new loop just for this one call
result = asyncio.run(my_async_utility())
print(f”Got result: {result}”)
my_sync_function()
If a loop is already running in another thread, asyncio.run() cannot be used. In that advanced case, you must use asyncio.run_coroutine_threadsafe() to safely submit the coroutine to the other thread’s loop.
The Broader Async Ecosystem: Web Frameworks
The biggest beneficiary of asyncio has been the world of Python web frameworks. asyncio is perfectly suited for web servers, which must handle thousands of concurrent network connections (HTTP requests) that are all I/O-bound.
This has led to a new generation of high-performance web frameworks. The most popular is FastAPI, which is built from the ground up on asyncio. It allows you to define your API endpoints as async def functions, giving you massive performance with very little code. Other popular frameworks in this space include Starlette (which FastAPI is based on), Sanic, and Quart (an async version of Flask).
Conclusion:
asyncio is more than just a library; it is a new paradigm for writing concurrent programs in Python. It provides a powerful and efficient solution to the problem of I/O-bound workloads, which are dominant in modern web-connected applications. By using cooperative multitasking with coroutines and an event loop, it avoids the overhead and complexity of traditional threading.
The journey begins with async and await, but it expands into a rich ecosystem of asynchronous libraries and advanced primitives for managing tasks, timeouts, and shared state. While the learning curve can be steep, mastering asyncio unlocks the ability to write truly high-performance, scalable, and modern Python applications.