Documentation

Async Functions

Trace async workflows and awaited tool calls.

Overview

The @trace decorator fully supports async functions. When applied to an async def function, the decorator returns an async wrapper that awaits the original function and captures the same trace lifecycle — all while preserving the async event loop context.

How It Works

Detection is automatic: inspect.iscoroutinefunction(func) returns True for async def functions, and the decorator generates an async def async_wrapper instead of a sync wrapper. The trace lifecycle is identical in both paths:

Sync Path

result = func(*args, **kwargs)

Async Path

result = await func(*args, **kwargs)

The context variable (contextvars.ContextVar) used for step collection is natively async-safe. Each concurrent coroutine chain gets its own isolated context, so tracing parallel async workflows does not produce interleaved steps.

Example: Async API Handler

async_api.pyCopy
python
from tracellm import trace

@trace(
    prompt="generate_embedding",
    model_name="text-embedding-3-large",
    project="search-service",
    environment="production",
)
async def generate_embedding(text: str) -> list[float]:
    # Simulate an async embedding API call
    import asyncio
    await asyncio.sleep(0.3)
    return [0.0123] * 1536

@trace(
    prompt="search_documents",
    model_name="gpt-4o-mini",
    project="search-service",
    environment="production",
)
async def search_documents(query: str) -> list[dict]:
    embedding = await generate_embedding(query)
    results = await vector_search(embedding)
    return rerank(results)

# In your async application
async def handler(request):
    results = await search_documents(request.query)
    return {"results": results}

Async Error Handling

Async error handling follows the same pattern as sync — exceptions are captured, the trace is persisted with status: "failed", and the exception is re-raised.

async_error.pyCopy
python
@trace(prompt="fetch_external_data", model_name="gpt-4o")
async def fetch_data(url: str) -> dict:
    import httpx
    async with httpx.AsyncClient() as client:
        response = await client.get(url, timeout=10)
        response.raise_for_status()
        return response.json()

# If the HTTP call fails, the trace captures the exception
# and re-raises so your application can handle it
try:
    data = await fetch_data("https://api.example.com/data")
except httpx.HTTPStatusError:
    # Trace was already persisted with failure details
    log.error("Failed to fetch data")

Running Async Traces

Async traced functions must be called with await from within an async context, or executed via asyncio.run():

async_execution.pyCopy
python
# From an async context
response = await my_traced_function("input")

# From a sync entry point
import asyncio
result = asyncio.run(my_traced_function("input"))

Warning

Calling an async-traced function without await returns a coroutine object instead of executing the function. The trace is not created until the coroutine is awaited.

Production Patterns

async_production.pyCopy
python
from tracellm import trace
import asyncio

# Parallel async tracing with isolated contexts
@trace(prompt="process_item", model_name="gpt-4o-mini")
async def process_item(item: dict) -> dict:
    enriched = await enrich(item)
    classified = await classify(enriched)
    return classified

@trace(prompt="batch_process", project="data-pipeline")
async def batch_process(items: list[dict]) -> list[dict]:
    tasks = [process_item(item) for item in items]
    return await asyncio.gather(*tasks)

# FastAPI integration
from fastapi import FastAPI

app = FastAPI()

@app.post("/classify")
async def classify_endpoint(text: str):
    result = await classify_document(text)
    return {"trace_id": result.trace_id, "category": result.category}