September 26, 2025
Python Async Concurrency asyncio

Python asyncio Fundamentals: Coroutines, Tasks, and the Event Loop

You've learned about threading (which lets one process fake parallelism) and multiprocessing (which gives you real parallelism with separate processes). Now we're stepping into a third concurrency model: asyncio, Python's lightweight, single-threaded cooperative multitasking framework.

Here's the appeal: asyncio lets you run thousands of concurrent operations on a single thread without the overhead of thread management or the complexity of process spawning. It's the goldilocks zone for I/O-bound work at scale, think web servers handling thousands of connections, API clients making hundreds of concurrent requests, or any scenario where you're waiting a lot.

The catch? It requires rethinking how you write code. No more requests.get() blocking the whole program. Instead, you'll use async/await syntax to write code that yields control when it waits. This article builds your mental model from the ground up: the event loop, coroutines, tasks, and how to orchestrate them all.

Understanding asyncio is particularly important in the AI/ML world because virtually every modern AI framework, from OpenAI's Python SDK to LangChain to FastAPI-based model servers, is built on async foundations. When you're streaming tokens from a language model, batching inference requests, or building a high-throughput API wrapper, asyncio is the engine doing the heavy lifting. Whether you're building the next chatbot backend or just want your data-fetching scripts to stop crawling, the patterns we cover here will apply directly to your daily work. By the end of this article, you'll understand not just the syntax but why asyncio was designed the way it was, and that deeper understanding is what separates developers who use async code from developers who truly master it.

Table of Contents
  1. The Event Loop: Your Program's Traffic Director
  2. async def: Defining Coroutines
  3. asyncio.run(): Your Entry Point
  4. await: Yielding Control
  5. await vs Just Calling
  6. asyncio.Task and asyncio.create_task(): Concurrent Execution
  7. The Mental Model of Task Concurrency
  8. Task vs Coroutine: Understanding the Difference
  9. asyncio.gather(): Coordinating Multiple Coroutines
  10. asyncio.gather() vs asyncio.wait()
  11. asyncio.sleep(): Non-Blocking Delays
  12. Timeouts and Cancellation
  13. asyncio.timeout() for Timeouts
  14. Task.cancel() for Manual Cancellation
  15. Bridging Sync and Async: run_in_executor()
  16. Event Loop Internals
  17. Putting It Together: A Real-World Example
  18. Common Asyncio Mistakes
  19. Common Pitfalls
  20. Pitfall 1: Forgetting to await
  21. Pitfall 2: Mixing async and sync
  22. Pitfall 3: Blocking in a coroutine
  23. The Event Loop Under the Hood
  24. Advanced: Understanding Task States and Transitions
  25. Race Conditions and Task Ordering
  26. Exception Handling in Async Code
  27. Async Context Managers: Cleaner Resource Management
  28. Async Context and asyncio.get_running_loop()
  29. Performance Considerations: When to Use Asyncio
  30. Debugging and Testing Async Code
  31. Using asyncio.create_task() for Better Tracebacks
  32. Testing Async Functions
  33. Mocking Async Functions
  34. Comparing the Three Concurrency Models
  35. Asyncio Patterns: Producer-Consumer
  36. Asyncio and Event Loop Policies
  37. Key Takeaways
  38. Coroutine Best Practices
  39. 1. Keep Coroutines Focused
  40. 2. Avoid Nested create_task()
  41. 3. Use Type Hints
  42. 4. Document Your Async API
  43. Real-World Scenario: Building an Async Web Crawler
  44. The Mental Shift: From Thinking Sequentially to Thinking Concurrently

The Event Loop: Your Program's Traffic Director

The event loop is the heart of asyncio. It's a single-threaded control center that runs your code and manages I/O.

Here's the mental model:

┌─────────────────────────────────────────┐
│         Event Loop (Single Thread)      │
│                                         │
│  ┌───────────────────────────────────┐  │
│  │ Queue of Pending Coroutines/Tasks │  │
│  │  [task_a, task_b, task_c, ...]   │  │
│  └───────────────────────────────────┘  │
│                                         │
│  ┌───────────────────────────────────┐  │
│  │  Pick next task → Run until       │  │
│  │  it hits 'await' → Switch to next │  │
│  └───────────────────────────────────┘  │
│                                         │
│  ┌───────────────────────────────────┐  │
│  │   Monitor I/O (Network, File,     │  │
│  │   Timers, etc.)                   │  │
│  │   When I/O completes →            │  │
│  │   Resume the waiting task         │  │
│  └───────────────────────────────────┘  │
└─────────────────────────────────────────┘

The event loop runs one piece of code at a time. When that code hits an await statement, it says: "I'm waiting for something. I'll give up control now." The loop then picks the next task and runs it. When the I/O completes (a network response arrives, a file loads, a timer expires), the loop resumes the waiting task.

This is cooperative multitasking: tasks voluntarily yield control at await points. No preemption, no race conditions, no need for locks. It's elegant.

Let's see it in action.

async def: Defining Coroutines

A coroutine is a function that uses async def. Here's the simplest possible coroutine:

python
async def greet():
    return "Hello, asyncio!"
 
# This doesn't run the coroutine, it creates a coroutine object
coro = greet()
print(coro)
# Output: <coroutine object greet at 0x...>

This is one of the first things that trips people up: calling an async def function doesn't execute its body immediately. Instead, Python hands you back a coroutine object, a suspended computation waiting to be scheduled. Think of it like building a recipe card versus actually cooking the meal. The coroutine object describes what will happen, but nothing runs until you hand it to the event loop.

Notice: just calling greet() doesn't execute the function. It returns a coroutine object, not the result. To actually run it, you need the event loop.

asyncio.run(): Your Entry Point

asyncio.run() creates an event loop, runs your top-level coroutine, and cleans up afterward.

python
import asyncio
 
async def greet():
    return "Hello, asyncio!"
 
result = asyncio.run(greet())
print(result)  # Output: Hello, asyncio!

Under the hood, asyncio.run() is doing several things at once: it creates a fresh event loop, sets it as the current running loop, executes your coroutine to completion, then tears down the loop and cancels any remaining tasks before returning. This careful lifecycle management is why you should always use asyncio.run() at your program's entry point rather than manually creating and managing loops, it handles the cleanup details that are easy to get wrong on your own.

What's happening:

  1. asyncio.run() creates a new event loop
  2. Runs your coroutine on that loop
  3. Returns the result
  4. Closes the loop

This is the entry point for asyncio programs. You'll typically have one asyncio.run() call in your main code.

await: Yielding Control

await is where the magic happens. It tells the event loop: "I'm waiting for something. Run other tasks while I wait."

python
import asyncio
 
async def fetch_data(url):
    print(f"Fetching {url}...")
    await asyncio.sleep(2)  # Simulate network delay
    print(f"Done fetching {url}")
    return f"Data from {url}"
 
async def main():
    result = await fetch_data("https://api.example.com")
    print(result)
 
asyncio.run(main())

Expected output:

Fetching https://api.example.com...
Done fetching https://api.example.com
Data from https://api.example.com

Nothing surprising here, the code runs sequentially. But notice the power: await asyncio.sleep(2) doesn't block the thread. It yields control, letting other tasks run. If we had multiple concurrent tasks, we'd see dramatic speedups.

await vs Just Calling

Here's a critical distinction:

python
async def task_a():
    await asyncio.sleep(1)
    print("A done")
 
async def task_b():
    await asyncio.sleep(1)
    print("B done")
 
async def main():
    # Sequential: takes 2 seconds
    await task_a()
    await task_b()
 
asyncio.run(main())

Both tasks run sequentially. Task B waits for A to finish. Total time: ~2 seconds.

But what if we want them concurrent? That's where tasks come in.

asyncio.Task and asyncio.create_task(): Concurrent Execution

A Task wraps a coroutine and schedules it on the event loop. Multiple tasks run concurrently. The key insight here is the timing: when you call asyncio.create_task(), the coroutine is immediately registered with the event loop and begins running as soon as control is yielded. You're not queuing something for "later", you're starting it right now, in parallel with everything else.

python
import asyncio
 
async def task_a():
    await asyncio.sleep(1)
    print("A done")
 
async def task_b():
    await asyncio.sleep(1)
    print("B done")
 
async def main():
    # Create tasks (they start running immediately)
    ta = asyncio.create_task(task_a())
    tb = asyncio.create_task(task_b())
 
    # Wait for both to complete
    await ta
    await tb
 
asyncio.run(main())

Expected output:

A done
B done

Total time: ~1 second, not 2! Both tasks run concurrently.

Here's what's happening:

  1. asyncio.create_task() schedules the coroutine on the event loop
  2. Control returns immediately (the task is running in the background)
  3. await ta waits for task A
  4. await tb waits for task B
  5. The event loop runs both concurrently

The Mental Model of Task Concurrency

Without Tasks (Sequential):
┌─────────┐         ┌─────────┐
│  Task A │ ──→     │  Task B │
│ (1 sec) │         │ (1 sec) │
└─────────┘         └─────────┘
  Total: 2 seconds

With Tasks (Concurrent):
┌─────────┐
│  Task A │ (1 sec)
├─────────┤  ← Both run at once!
│  Task B │ (1 sec)
└─────────┘
  Total: 1 second

Task vs Coroutine: Understanding the Difference

This distinction matters more than it might seem, and it's the source of a lot of subtle bugs in async code. A coroutine is a paused computation, a function defined with async def that hasn't started executing yet. A Task is a coroutine that has been handed to the event loop and is actively being managed. When you await a coroutine directly, you run it inline and the current coroutine pauses until it completes. When you wrap it in asyncio.create_task(), you hand it to the event loop independently, and both your current code and the new task can progress simultaneously.

The practical difference: if you have three API calls to make and you await each one in sequence, you're waiting for one at a time, even in async code. If you create tasks for all three and then await them (or use gather()), all three network requests fly out simultaneously. This is the difference between "I wrote async code" and "I wrote concurrent async code", and it's the mistake that makes beginners wonder why their async code isn't any faster than sync code.

There's one more nuance worth knowing: a bare coroutine that you forget to either await or pass to create_task() simply never runs. Python 3.8+ will warn you about unawaited coroutines, but it won't stop you from creating them. A Task, by contrast, is self-driving, once created, it will run to completion (or cancellation) regardless of whether you hold a reference to it.

asyncio.gather(): Coordinating Multiple Coroutines

asyncio.gather() is cleaner for running multiple coroutines concurrently and collecting their results.

python
import asyncio
 
async def fetch(url):
    await asyncio.sleep(1)
    return f"Data from {url}"
 
async def main():
    urls = ["api1.com", "api2.com", "api3.com"]
    results = await asyncio.gather(*[fetch(url) for url in urls])
    print(results)
 
asyncio.run(main())

Expected output:

['Data from api1.com', 'Data from api2.com', 'Data from api3.com']

Time taken: ~1 second (all concurrent).

gather() wraps each coroutine in a task, runs them all concurrently, and returns a list of results in the same order. Notice that even though the individual fetches might complete in arbitrary order internally, gather() preserves the original input order in the result list, which is a quality-of-life feature that makes downstream processing much cleaner.

asyncio.gather() vs asyncio.wait()

gather() is simple and returns results directly. wait() is more flexible, giving you fine-grained control over how tasks complete.

python
import asyncio
 
async def task(n):
    await asyncio.sleep(n)
    return f"Task {n} done"
 
async def main():
    # gather: simple, returns results directly
    results = await asyncio.gather(task(1), task(2), task(3))
    print(results)
 
    # wait: more control over task completion
    tasks = {asyncio.create_task(task(i)) for i in [1, 2, 3]}
    done, pending = await asyncio.wait(tasks)
    for t in done:
        print(t.result())
 
asyncio.run(main())

When to use each:

  • gather(): You want results in order, all at once. Cleaner syntax.
  • wait(): You want to handle tasks as they finish, set timeouts per-task, or use return_when to get partial results.

asyncio.sleep(): Non-Blocking Delays

asyncio.sleep() yields control, unlike time.sleep() which blocks the entire thread.

python
import asyncio
import time
 
# WRONG: Blocks the thread, defeats the purpose of asyncio
async def bad_delay():
    time.sleep(2)  # Blocks! Other tasks can't run
    print("Done waiting")
 
# RIGHT: Yields control
async def good_delay():
    await asyncio.sleep(2)  # Yields! Other tasks can run
    print("Done waiting")

This is the core difference. time.sleep(2) makes the thread stop for 2 seconds. await asyncio.sleep(2) tells the event loop: "I'm waiting 2 seconds; run other tasks." One call freezes your entire program; the other hands control back gracefully. This applies to any blocking operation, not just sleep. File reads with the standard open(), synchronous database queries, CPU-heavy calculations, they all have the same problem: they hold the thread hostage while the event loop's other tasks sit idle waiting for their turn.

Timeouts and Cancellation

asyncio.timeout() for Timeouts

Control how long a coroutine can run:

python
import asyncio
 
async def slow_task():
    await asyncio.sleep(10)
    return "Done"
 
async def main():
    try:
        result = await asyncio.timeout(2)(slow_task())
    except asyncio.TimeoutError:
        print("Task took too long!")
 
asyncio.run(main())

Expected output:

Task took too long!

The task was cancelled because it exceeded 2 seconds.

Note: asyncio.timeout() syntax (the ()() thing) looks odd because it's a context manager that returns an awaitable. Modern Python (3.11+) has cleaner syntax:

python
async def main():
    try:
        async with asyncio.timeout(2):
            await slow_task()
    except asyncio.TimeoutError:
        print("Task took too long!")

The 3.11+ syntax is the one you should default to in new code, it reads more naturally and makes the scope of the timeout explicit at a glance. When working with external APIs, always wrap your calls in a timeout. Network calls that hang indefinitely will eventually exhaust your event loop's capacity and bring your entire service to a crawl.

Task.cancel() for Manual Cancellation

Cancel a task programmatically:

python
import asyncio
 
async def long_running():
    try:
        while True:
            print("Still running...")
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print("Task was cancelled!")
        raise  # Re-raise to complete cancellation
 
async def main():
    task = asyncio.create_task(long_running())
    await asyncio.sleep(2)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("Cleanup complete")
 
asyncio.run(main())

Expected output:

Still running...
Still running...
Task was cancelled!
Cleanup complete

When you call task.cancel(), it raises CancelledError in the task. The task can catch it for cleanup (closing files, releasing resources), then re-raise to complete the cancellation. Notice the raise inside the except asyncio.CancelledError block, this is not optional. If you swallow the CancelledError without re-raising, your task silently resumes running, the await task in your main coroutine will hang, and you'll have a very hard-to-debug situation on your hands. Always re-raise cancellation errors unless you have an extremely specific reason not to.

Bridging Sync and Async: run_in_executor()

Real life is messy. Sometimes you have blocking code you can't change (legacy libraries, CPU-intensive operations). run_in_executor() lets you run blocking code without freezing the event loop.

python
import asyncio
import requests  # Blocking HTTP library
 
async def main():
    loop = asyncio.get_event_loop()
 
    # Run blocking code in a thread pool
    response = await loop.run_in_executor(
        None,  # Use default ThreadPoolExecutor
        requests.get,
        "https://httpbin.org/delay/2"
    )
 
    print(f"Status: {response.status_code}")
 
asyncio.run(main())

run_in_executor() runs requests.get() in a background thread pool, so it doesn't block the event loop. Other tasks keep running. The None executor argument tells asyncio to use its default ThreadPoolExecutor, you can also pass a custom ProcessPoolExecutor if you need true CPU parallelism for compute-heavy tasks. Keep in mind that run_in_executor() is a bridge, not a free pass: you're still using threads under the hood, so the usual thread safety concerns apply for any shared state.

Common use cases:

  • Blocking HTTP requests (use aiohttp for true async)
  • CPU-intensive operations (better to use multiprocessing, but executor works for small tasks)
  • Database queries (use async database drivers like asyncpg, but executor bridges old code)

Event Loop Internals

To understand asyncio deeply, you need to know what the event loop is actually doing between your await points. At its core, the loop is running a variation of the classic select/poll/epoll pattern that operating systems expose for monitoring multiple file descriptors simultaneously. On Linux, asyncio uses epoll; on macOS, kqueue; on Windows, IOCP. These are the OS-level primitives that make it possible to watch thousands of network sockets simultaneously on a single thread.

Each iteration of the event loop follows a predictable cycle: it first runs all callbacks that are already ready (tasks that have been woken up), then calls the OS's I/O selector to ask "what I/O events have completed since I last checked?" with a timeout equal to the time until the next scheduled callback. The OS returns a list of file descriptors that are ready for reading or writing, the loop wakes up the corresponding tasks, and the cycle repeats.

This architecture has a critical implication: any code that runs without hitting an await holds the entire event loop hostage. If you spend 500ms doing a CPU-intensive calculation inside an async function without yielding, every other task in your program is frozen for those 500ms. The event loop cannot preempt you the way a thread scheduler can. This is why the rule "async code should only do I/O, not heavy computation" is so important in practice, and why compute-heavy workloads belong in run_in_executor() or a separate process pool, not inline in your coroutines.

Understanding this also explains why await asyncio.sleep(0) is a useful trick: it yields control back to the event loop for exactly one iteration without actually waiting any time, letting other pending tasks get a turn. You'll see this pattern in CPU-bound loops that can't be moved to an executor but need to play nicely with the rest of the async system.

Putting It Together: A Real-World Example

Let's build a web scraper that fetches multiple URLs concurrently.

python
import asyncio
 
async def fetch_url(url, delay=1):
    """Simulate fetching a URL with a delay"""
    print(f"Fetching {url}...")
    await asyncio.sleep(delay)  # Simulate network request
    return f"Content from {url}"
 
async def main():
    urls = [
        "https://example.com/page1",
        "https://example.com/page2",
        "https://example.com/page3",
    ]
 
    # Fetch all URLs concurrently
    results = await asyncio.gather(
        *[fetch_url(url) for url in urls]
    )
 
    for result in results:
        print(result)
 
asyncio.run(main())

Expected output:

Fetching https://example.com/page1...
Fetching https://example.com/page2...
Fetching https://example.com/page3...
Content from https://example.com/page1
Content from https://example.com/page2
Content from https://example.com/page3

Time taken: ~1 second (all concurrent).

If we did this sequentially with time.sleep(), it would take ~3 seconds. Asyncio does it in 1. Notice the output pattern: all three fetch messages appear immediately because gather() starts all tasks before any of them finish. The results appear afterward in input order, not completion order, that predictability is one of the things that makes gather() so pleasant to work with compared to raw task management.

Common Asyncio Mistakes

Even experienced developers hit the same async pitfalls repeatedly. Here are the ones that will cost you the most debugging time.

Mistake 1: Calling blocking I/O inside async functions. Using requests.get(), open() with standard file I/O, or any synchronous database driver inside a coroutine will freeze your entire event loop for the duration of that call. Every other task stops making progress. The fix is always either switch to an async equivalent (aiohttp instead of requests, aiofiles for files, asyncpg for PostgreSQL) or wrap the call in run_in_executor(). This mistake is insidious because your code works correctly in isolation, it just silently destroys your concurrency.

Mistake 2: Forgetting that create_task() is required for true concurrency. A lot of beginners write async code where every call is await some_coroutine() in sequence, then wonder why they see no performance improvement. If you're not creating tasks or using gather(), you're writing sequential code with extra ceremony. Concurrency only happens when multiple tasks are running simultaneously, and that only happens when you explicitly create them.

Mistake 3: Fire-and-forget tasks that silently fail. When you call asyncio.create_task() and don't store a reference to the result, the task runs independently. If it raises an exception, that exception is silently swallowed, you'll get a warning log message if you're lucky, nothing if you're not. Always either await the task, pass it to gather(), or attach a .add_done_callback() that inspects the result. The pattern of creating tasks and storing them in a set (removing them in a done callback) is the standard idiom for managed fire-and-forget.

Mistake 4: Swallowing CancelledError. If you catch asyncio.CancelledError for cleanup purposes, you must re-raise it. Swallowing it causes task.cancel() to appear to work but the coroutine silently continues, leading to tasks that won't stop and await task calls that hang indefinitely.

Mistake 5: Running asyncio inside asyncio. If you're already inside a running event loop (common in Jupyter notebooks, some test frameworks, and certain web frameworks), calling asyncio.run() will raise a RuntimeError. The fix depends on context: in Jupyter you can use await directly or nest_asyncio; in production code, restructure so there's only one asyncio.run() at the top level.

Common Pitfalls

Pitfall 1: Forgetting to await

python
async def task():
    return "Done"
 
async def main():
    result = task()  # BUG: Creates coroutine, doesn't run it
    print(result)  # Prints: <coroutine object...>
 
asyncio.run(main())

Fix: Use await:

python
result = await task()

Pitfall 2: Mixing async and sync

python
import asyncio
import requests
 
async def main():
    response = requests.get("https://example.com")  # BLOCKING!
    # Event loop is frozen until response arrives
    # Other tasks can't run
    print(response.status_code)
 
asyncio.run(main())

Fix: Use async libraries or run_in_executor():

python
import aiohttp
 
async def main():
    async with aiohttp.ClientSession() as session:
        async with session.get("https://example.com") as response:
            status = response.status

Pitfall 3: Blocking in a coroutine

python
import asyncio
import time
 
async def task():
    time.sleep(5)  # WRONG: Blocks the entire event loop
    print("Done")
 
asyncio.run(task())

Fix: Use await asyncio.sleep():

python
await asyncio.sleep(5)

The Event Loop Under the Hood

To deepen your mental model, here's what the event loop actually does on each iteration:

  1. Check for new tasks: Are there tasks scheduled?
  2. Run ready tasks: Execute until the next await
  3. Wait for I/O: Check if any awaited I/O is done (network response, timer expired, file loaded)
  4. Resume tasks: Re-queue tasks whose I/O is complete
  5. Repeat: Loop until all tasks are done

This is single-threaded, but because tasks yield at await points, it creates the illusion of concurrency.

python
import asyncio
 
async def demo():
    print("1. Start")
    await asyncio.sleep(0)  # Yield, let other tasks run
    print("2. Middle")
    await asyncio.sleep(0)  # Yield again
    print("3. End")
 
async def main():
    task1 = asyncio.create_task(demo())
    task2 = asyncio.create_task(demo())
 
    await asyncio.gather(task1, task2)
 
asyncio.run(main())

Expected output:

1. Start
1. Start
2. Middle
2. Middle
3. End
3. End

Both tasks start, yield, resume. The order might vary, but the pattern is clear: task1 runs until await, task2 runs until await, task1 resumes, task2 resumes. This interleaving behavior is the fundamental mechanism of cooperative concurrency, and once you can visualize it clearly, reading complex async code becomes much more intuitive. You're essentially tracing a path through a state machine where each await is a potential context switch point.

Advanced: Understanding Task States and Transitions

Every task has a lifecycle. Understanding task states helps you debug and design robust async systems.

A task can be in one of these states:

  • Pending: Created but not yet started
  • Running: Currently executing
  • Done: Completed (either with result or exception)
  • Cancelled: Manually cancelled or timed out

You can check a task's state using methods:

python
import asyncio
 
async def slow_task():
    await asyncio.sleep(5)
    return "Complete"
 
async def main():
    task = asyncio.create_task(slow_task())
 
    # Check states
    print(f"Task done? {task.done()}")  # False
    print(f"Task cancelled? {task.cancelled()}")  # False
 
    # Cancel it
    task.cancel()
    try:
        result = await task
    except asyncio.CancelledError:
        print(f"Task done? {task.done()}")  # True
        print(f"Task cancelled? {task.cancelled()}")  # True
 
asyncio.run(main())

This is crucial for monitoring long-running operations and handling edge cases. A "done" task might be cancelled, succeeded, or failed, you need to check which. In production systems, monitoring task states is how you implement health checks, graceful shutdown sequences, and circuit breakers. A task that shows as "done" but returns an exception is a fundamentally different situation from one that completed successfully, and conflating them leads to silent failures that are painful to diagnose after the fact.

Race Conditions and Task Ordering

One subtle point: when multiple tasks complete around the same time, their order isn't guaranteed. This matters if you rely on ordering.

python
import asyncio
 
async def task(name, delay):
    await asyncio.sleep(delay)
    return f"{name} done"
 
async def main():
    tasks = [
        asyncio.create_task(task("A", 0.5)),
        asyncio.create_task(task("B", 0.3)),
        asyncio.create_task(task("C", 0.7)),
    ]
 
    results = await asyncio.gather(*tasks)
    print(results)  # Might be: ['A done', 'B done', 'C done']
 
asyncio.run(main())

The results list preserves the order you passed in, not completion order. Task B finishes first, but it's still in position 1 of the results list.

If you need to process tasks as they complete, use asyncio.as_completed():

python
import asyncio
 
async def task(name, delay):
    await asyncio.sleep(delay)
    return f"{name} done"
 
async def main():
    tasks = [
        asyncio.create_task(task("A", 0.5)),
        asyncio.create_task(task("B", 0.3)),
        asyncio.create_task(task("C", 0.7)),
    ]
 
    # Process as they complete, not in submission order
    for future in asyncio.as_completed(tasks):
        result = await future
        print(result)  # B, then A, then C
 
asyncio.run(main())

Now B prints first (finishes at 0.3s), then A (0.5s), then C (0.7s).

Exception Handling in Async Code

Exceptions in async code behave similarly to sync code, but with a twist: exceptions in concurrent tasks don't automatically bubble up. You need to capture them.

python
import asyncio
 
async def failing_task():
    await asyncio.sleep(0.5)
    raise ValueError("Something went wrong!")
 
async def good_task():
    await asyncio.sleep(1)
    return "Success!"
 
async def main():
    tasks = [
        asyncio.create_task(failing_task()),
        asyncio.create_task(good_task()),
    ]
 
    results = await asyncio.gather(*tasks, return_exceptions=True)
    print(results)
    # Output: [ValueError('Something went wrong!'), 'Success!']
 
asyncio.run(main())

Without return_exceptions=True, the first exception would propagate and cancel other tasks. With it, exceptions are returned as results, letting you handle them individually. Which behavior you want depends on your use case: if you're fetching 100 URLs and one fails, you probably want to keep the other 99 results. If you're running a sequence of dependent operations where any failure should abort the whole thing, letting the exception propagate naturally (without return_exceptions) makes more sense.

Async Context Managers: Cleaner Resource Management

Coroutines can use async with to manage resources (file handles, network connections, database cursors). This ensures cleanup happens even if exceptions occur.

python
import asyncio
 
class AsyncFile:
    def __init__(self, name):
        self.name = name
 
    async def __aenter__(self):
        print(f"Opening {self.name}")
        await asyncio.sleep(0.1)  # Simulate async open
        return self
 
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print(f"Closing {self.name}")
        await asyncio.sleep(0.1)  # Simulate async close
        return False
 
async def main():
    async with AsyncFile("data.txt") as f:
        print(f"Working with {f.name}")
 
asyncio.run(main())

Expected output:

Opening data.txt
Working with data.txt
Closing data.txt

The async with ensures cleanup happens automatically, just like synchronous context managers. This pattern is used everywhere in the async ecosystem: aiohttp.ClientSession(), asyncpg connection pools, aiofiles.open(), and most database connection managers all use async context managers. Getting comfortable with async with is essential because it's how async libraries enforce proper resource lifecycle management, open, use, close, even when exceptions interrupt the middle step.

Async Context and asyncio.get_running_loop()

Sometimes you need access to the current event loop inside a coroutine. Use asyncio.get_running_loop():

python
import asyncio
 
async def get_loop_info():
    loop = asyncio.get_running_loop()
    print(f"Running loop: {loop}")
    print(f"Loop type: {type(loop)}")
 
asyncio.run(get_loop_info())

This is useful when you need to schedule callbacks or check loop properties. Don't confuse it with asyncio.get_event_loop(), which may create a new loop if none exists (deprecated in recent Python versions).

Performance Considerations: When to Use Asyncio

Asyncio shines for I/O-bound concurrency at scale:

  • Web servers handling thousands of connections (frameworks like FastAPI, aiohttp)
  • API clients making hundreds of concurrent requests
  • Database pooling with async drivers
  • Real-time systems like WebSocket servers

Asyncio struggles with:

  • CPU-bound work: Use multiprocessing instead
  • Heavy computation: Async doesn't help; the GIL still applies
  • Simple scripts: Threading or sync code is clearer for modest concurrency

Here's a benchmark showing asyncio's strength:

python
import asyncio
import time
 
# Simulating 100 I/O operations, each taking 0.1 seconds
async def io_task():
    await asyncio.sleep(0.1)
 
async def async_approach():
    tasks = [asyncio.create_task(io_task()) for _ in range(100)]
    await asyncio.gather(*tasks)
 
start = time.time()
asyncio.run(async_approach())
print(f"Asyncio: {time.time() - start:.2f}s")  # ~0.1s (all concurrent)
 
# Sequential approach would take ~10s
# Threading would take ~0.1s too, but with thread overhead
# Asyncio is lightweight and scales to thousands of tasks

Asyncio's magic: 100 tasks, 100 parallel I/O waits, all on a single thread in ~0.1 seconds. The comparison to threading here is worth dwelling on: threading achieves similar wall-clock time for I/O-bound work, but each thread consumes memory (typically 8MB stack space by default), and coordinating thousands of threads introduces significant scheduler overhead. Asyncio tasks are cheap enough that running 10,000 of them simultaneously is entirely practical, an equivalent thread-per-request server would collapse under the memory pressure alone.

Debugging and Testing Async Code

Async code introduces new debugging challenges. Here are practical strategies:

Using asyncio.create_task() for Better Tracebacks

When a task fails silently, you might miss the error. Use proper task handling:

python
import asyncio
 
async def failing():
    raise ValueError("Oops!")
 
async def main():
    # DON'T: Just create and forget
    asyncio.create_task(failing())
    await asyncio.sleep(1)  # Task fails silently!
 
    # DO: Track the task
    task = asyncio.create_task(failing())
    try:
        await task
    except ValueError as e:
        print(f"Caught error: {e}")
 
asyncio.run(main())

Tasks that fail silently are hard to debug. Always capture and await them, or use callbacks:

python
async def main():
    task = asyncio.create_task(failing())
    task.add_done_callback(lambda t: t.result() if not t.cancelled() else None)

Testing Async Functions

Use pytest with pytest-asyncio to test async code:

python
# test_async.py
import asyncio
import pytest
 
async def fetch(url):
    await asyncio.sleep(0.1)
    return f"Data from {url}"
 
@pytest.mark.asyncio
async def test_fetch():
    result = await fetch("api.example.com")
    assert result == "Data from api.example.com"
 
@pytest.mark.asyncio
async def test_concurrent_fetch():
    results = await asyncio.gather(
        fetch("api1.com"),
        fetch("api2.com"),
        fetch("api3.com"),
    )
    assert len(results) == 3

Async unit tests must use async test functions and runners. The pytest-asyncio plugin handles this automatically.

Mocking Async Functions

python
from unittest.mock import AsyncMock
 
async def fetch(url):
    # Real implementation
    pass
 
async def test_with_mock():
    # Mock the coroutine
    with patch('module.fetch', new_callable=AsyncMock) as mock_fetch:
        mock_fetch.return_value = "Mocked data"
        result = await fetch("test.com")
        assert result == "Mocked data"

Comparing the Three Concurrency Models

Now that you've learned threading, multiprocessing, and asyncio, here's when to use each:

ModelBest ForOverheadScaleComplexity
ThreadingI/O-bound, small scaleLow, but GIL-limited~10s-100s of tasksMedium
MultiprocessingCPU-bound, high computeHigh, separate processes~10s of processesHigh
AsyncioI/O-bound, high scaleUltra-low, cooperative1000s of tasksMedium

Real-world decision tree:

  1. Are you doing CPU-bound work? → Use multiprocessing
  2. Are you doing I/O-bound work? → Ask: "Do I need 100s of concurrent operations?"
    • Yes? → Use asyncio
    • No? → Use threading (simpler)
  3. Unsure? → Start with asyncio. It scales better and forces you to think about concurrency properly.

Asyncio Patterns: Producer-Consumer

Here's a classic pattern for coordinating tasks:

python
import asyncio
 
async def producer(queue, n):
    """Generate items and put them in a queue"""
    for i in range(n):
        print(f"Producing {i}")
        await queue.put(i)
        await asyncio.sleep(0.1)
    await queue.put(None)  # Signal end
 
async def consumer(queue):
    """Consume items from the queue"""
    while True:
        item = await queue.get()
        if item is None:
            break
        print(f"Consuming {item}")
        await asyncio.sleep(0.2)
 
async def main():
    queue = asyncio.Queue()
 
    # Run producer and consumer concurrently
    await asyncio.gather(
        producer(queue, 5),
        consumer(queue),
    )
 
asyncio.run(main())

Expected output:

Producing 0
Consuming 0
Producing 1
Producing 2
Consuming 1
...

Producer and consumer run concurrently, communicating through an async queue. This decouples them: the producer doesn't care when the consumer processes items, and vice versa.

Asyncio and Event Loop Policies

Advanced note: Python lets you configure the event loop policy (which loop type to use). On Windows, the default is ProactorEventLoop (for proper async I/O support). On Unix, it's SelectorEventLoop. Most of the time, you don't need to care, asyncio.run() picks the right one.

But if you need a specific policy:

python
import asyncio
 
# Set Windows to use ProactorEventLoop
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
 
asyncio.run(main())

This is rarely needed, but it's good to know it exists.

Key Takeaways

  • Event loop: Single-threaded control center that schedules and runs coroutines
  • async def: Defines a coroutine (requires await to run)
  • await: Yields control, letting other tasks run while waiting
  • asyncio.run(): Creates the event loop and runs your top-level coroutine
  • asyncio.create_task(): Schedules a coroutine as a task (runs concurrently)
  • asyncio.gather(): Run multiple coroutines concurrently and collect results
  • asyncio.sleep(): Non-blocking delay (yields control)
  • asyncio.timeout() and task.cancel(): Control task duration and abort execution
  • run_in_executor(): Bridge to blocking code without freezing the loop

Coroutine Best Practices

Now that you understand the mechanics, here are practical guidelines for writing good async code:

1. Keep Coroutines Focused

A coroutine should do one thing well. Don't mix I/O with processing:

python
# GOOD: Separated concerns
async def fetch_user(user_id):
    """Just fetch the data"""
    await asyncio.sleep(0.1)  # Simulate API call
    return {"id": user_id, "name": f"User {user_id}"}
 
async def process_users(user_ids):
    """Fetch all, then process all"""
    users = await asyncio.gather(*[fetch_user(uid) for uid in user_ids])
    return [u["name"].upper() for u in users]
 
# BAD: Mixed concerns
async def fetch_and_process(user_id):
    """Mixing I/O and CPU work"""
    user = await fetch_user(user_id)
    # Now we're doing CPU work in an async function
    processed = user["name"].upper() * 1000000  # Expensive
    return processed

Mixing concerns makes code harder to test and reason about.

2. Avoid Nested create_task()

Deep nesting gets confusing. Flatten your task creation:

python
# GOOD: Flat structure
async def main():
    tasks = [asyncio.create_task(fetch(url)) for url in urls]
    results = await asyncio.gather(*tasks)
 
# OKAY: If nesting is minimal
async def main():
    task = asyncio.create_task(fetch_and_process())
    result = await task

3. Use Type Hints

Async functions benefit from explicit type annotations:

python
from typing import List, Optional
 
async def fetch(url: str) -> str:
    """Fetch a single URL, return content."""
    await asyncio.sleep(0.1)
    return f"Content from {url}"
 
async def fetch_all(urls: List[str]) -> List[str]:
    """Fetch multiple URLs concurrently."""
    return await asyncio.gather(*[fetch(url) for url in urls])
 
async def maybe_fetch(url: Optional[str]) -> Optional[str]:
    """Fetch only if URL is provided."""
    if url:
        return await fetch(url)
    return None

Type hints make async code clearer and enable better IDE support.

4. Document Your Async API

Be explicit about what can run concurrently:

python
async def fetch_and_cache(url: str) -> str:
    """
    Fetch a URL and cache the result.
 
    Safe to call concurrently: Multiple callers can await this
    simultaneously without issue.
 
    Args:
        url: The URL to fetch.
 
    Returns:
        The fetched content.
 
    Raises:
        asyncio.TimeoutError: If fetch exceeds timeout.
    """
    # Implementation...
    pass

Real-World Scenario: Building an Async Web Crawler

Let's build a practical web crawler that fetches multiple pages concurrently with error handling:

python
import asyncio
from typing import List, Dict
 
class SimpleCrawler:
    def __init__(self, max_concurrent: int = 5, timeout: float = 10):
        self.max_concurrent = max_concurrent
        self.timeout = timeout
        self.semaphore = asyncio.Semaphore(max_concurrent)
 
    async def fetch_page(self, url: str) -> Dict[str, str]:
        """Fetch a single page with rate limiting."""
        async with self.semaphore:  # Limit concurrency
            try:
                print(f"Fetching {url}")
                # Simulate async HTTP request
                await asyncio.sleep(1)
                return {
                    "url": url,
                    "status": "success",
                    "content": f"Content from {url}"
                }
            except asyncio.TimeoutError:
                return {"url": url, "status": "timeout"}
            except Exception as e:
                return {"url": url, "status": "error", "error": str(e)}
 
    async def crawl(self, urls: List[str]) -> List[Dict[str, str]]:
        """Crawl multiple URLs concurrently."""
        tasks = [self.fetch_page(url) for url in urls]
        return await asyncio.gather(*tasks, return_exceptions=True)
 
async def main():
    crawler = SimpleCrawler(max_concurrent=3)
    urls = [f"https://example.com/page{i}" for i in range(10)]
    results = await crawler.crawl(urls)
 
    successes = [r for r in results if r.get("status") == "success"]
    print(f"Successfully fetched {len(successes)} pages")
 
asyncio.run(main())

This example shows:

  • Semaphore to limit concurrent requests (avoiding overwhelming the target)
  • Error handling with return_exceptions=True
  • Structured concurrency with a class wrapper
  • Real-world scalability patterns

This crawler can handle thousands of URLs efficiently, thanks to asyncio's lightweight task model. The Semaphore pattern here is particularly worth studying: without it, fetching 1,000 URLs would simultaneously open 1,000 connections, potentially getting you rate-limited, blacklisted, or simply overwhelming both the target server and your own system's file descriptor limits. A semaphore lets you say "no more than 10 at a time" while still letting the other 990 tasks wait patiently in the queue, ready to proceed as slots open up.

The Mental Shift: From Thinking Sequentially to Thinking Concurrently

The hardest part of asyncio isn't the syntax, it's the mindset shift. You're no longer writing "do A, then do B, then do C." Instead, you're saying: "Start A, start B, start C, they all run concurrently, wait for all to finish."

This requires thinking differently about control flow. But once it clicks, you'll write code that's both simpler and more scalable than threading-based approaches.

Asyncio is powerful for I/O-bound concurrency at scale. Master these fundamentals, and you'll build responsive, scalable systems without the threading complexity or multiprocessing overhead. You now understand the mental models, the syntax, the pitfalls, and the patterns. More importantly, you understand the why, why the event loop is single-threaded, why you must re-raise CancelledError, why create_task() is different from await, why blocking calls are catastrophic. That conceptual foundation is what makes the difference between someone who writes async code and someone who writes correct async code. The ecosystem of async Python, FastAPI, aiohttp, asyncpg, LangChain's async chain execution, is all built on exactly these primitives. Every async with, every gather(), every semaphore you encounter in a real codebase is just a more elaborate version of the patterns you practiced here.

Ready to go deeper? Next, we'll explore real-world asyncio patterns: HTTP clients with aiohttp, async generators, semaphores for rate limiting, and advanced coordination techniques that let you build production-grade async systems.

Need help implementing this?

We build automation systems like this for clients every day.

Discuss Your Project