Pattern: Callback-based AAEP Integration

Use this pattern when your agent framework fires named lifecycle callbacks.

This is the natural integration for frameworks built around the observer pattern. LangChain's BaseCallbackHandler is the canonical example. AutoGen event handlers, Hugging Face transformers callbacks, and many custom frameworks follow the same shape.


When callbacks are the right choice

Pick callbacks when:

Pick something else when:


The callback emission model

Each framework callback maps to one or more AAEP events:

Framework callback AAEP event to emit Notes
on_chain_start / on_agent_start agent.session.started The top-level callback only; nested chains don't re-emit
on_chain_end / on_agent_end agent.session.completed Only on the top-level
on_chain_error / on_agent_error agent.session.errored Map exception class to error_category
on_llm_start agent.state.changed (to thinking)
on_llm_end agent.state.changed (from thinking)
on_llm_new_token agent.output.streaming Coalesce at sentence boundaries before emitting
on_tool_start agent.tool.invoked See gap-handling section below
on_tool_end agent.tool.completed (status=success)
on_tool_error agent.tool.completed (status=error)
on_text agent.output.streaming (depending on context)
on_retry informative log only Not normatively required

Complete LangChain implementation

from typing import Any
from uuid import UUID

from langchain.callbacks.base import BaseCallbackHandler
from langchain.schema import LLMResult, AgentAction, AgentFinish

from aaep_helpers import AAEPEmitter, StreamCoalescer, make_id


class AAEPCallbackHandler(BaseCallbackHandler):
    """
    LangChain callback handler that emits AAEP events for each lifecycle event.
    Combine with @aaep_tool decorators for pre-execution confirmation flow.
    """

    def __init__(self, emitter: AAEPEmitter):
        self.emitter = emitter
        self.session_id: str | None = None
        self.active_tools: dict[str, str] = {}  # run_id -> tool_call_id
        self.coalescer: StreamCoalescer | None = None
        self.last_state = "idle"

    # === Chain lifecycle (top-level only) ===

    def on_chain_start(
        self, serialized: dict[str, Any], inputs: dict[str, Any],
        run_id: UUID, parent_run_id: UUID | None = None, **kwargs
    ):
        if parent_run_id is not None:
            return  # nested chain, ignore
        user_message = inputs.get("input") or inputs.get("query") or ""
        self.session_id = self.emitter.start_session(
            summary_normal=f"Processing: {str(user_message)[:80]}",
            request_text=str(user_message),
        )

    def on_chain_end(
        self, outputs: dict[str, Any], run_id: UUID,
        parent_run_id: UUID | None = None, **kwargs
    ):
        if parent_run_id is not None or not self.session_id:
            return
        output_text = outputs.get("output") or outputs.get("answer") or ""
        self.emitter.complete_session(
            session_id=self.session_id,
            summary_normal="Response ready.",
            output_summary=str(output_text)[:200],
        )

    def on_chain_error(
        self, error: BaseException, run_id: UUID,
        parent_run_id: UUID | None = None, **kwargs
    ):
        if parent_run_id is not None or not self.session_id:
            return
        self.emitter.error_session(
            session_id=self.session_id,
            error_category=self._classify_error(error),
            summary_normal=f"Error: {type(error).__name__}",
            error_message=str(error)[:1000],
            recoverable=self._is_recoverable(error),
        )

    # === LLM lifecycle ===

    def on_llm_start(self, serialized: dict[str, Any], prompts: list[str], **kwargs):
        if not self.session_id:
            return
        if self.last_state != "thinking":
            self.emitter.state_changed(
                session_id=self.session_id,
                from_state=self.last_state,
                to_state="thinking",
                summary_normal="Considering the request.",
            )
            self.last_state = "thinking"

    def on_llm_end(self, response: LLMResult, **kwargs):
        # No state change needed here; next callback will emit transition
        pass

    def on_llm_new_token(self, token: str, **kwargs):
        if not self.session_id:
            return
        if self.coalescer is None:
            self.coalescer = StreamCoalescer(
                emitter=self.emitter,
                session_id=self.session_id,
                output_id=make_id("out"),
            )
        self.coalescer.add_token(token)

    # === Tool lifecycle ===

    def on_tool_start(
        self, serialized: dict[str, Any], input_str: str,
        run_id: UUID, **kwargs
    ):
        if not self.session_id:
            return

        if self.last_state != "calling_tool":
            self.emitter.state_changed(
                session_id=self.session_id,
                from_state=self.last_state,
                to_state="calling_tool",
                summary_normal="Calling a tool.",
            )
            self.last_state = "calling_tool"

        tool_name = serialized.get("name", "unknown_tool")
        tool_call_id = make_id("call")
        self.active_tools[str(run_id)] = tool_call_id

        self.emitter.tool_invoked(
            session_id=self.session_id,
            tool=tool_name,
            tool_call_id=tool_call_id,
            args_summary=self._safe_summary(input_str),
            risk_level=self._classify_risk(tool_name),
            irreversible=self._is_irreversible(tool_name),
        )

    def on_tool_end(self, output: str, run_id: UUID, **kwargs):
        if not self.session_id:
            return
        tool_call_id = self.active_tools.pop(str(run_id), None)
        self.emitter.tool_completed(
            session_id=self.session_id,
            tool_call_id=tool_call_id,
            status="success",
            summary_normal=self._safe_summary(output),
        )

    def on_tool_error(self, error: BaseException, run_id: UUID, **kwargs):
        if not self.session_id:
            return
        tool_call_id = self.active_tools.pop(str(run_id), None)
        self.emitter.tool_completed(
            session_id=self.session_id,
            tool_call_id=tool_call_id,
            status="error",
            error_message=str(error)[:1000],
        )

    # === Agent actions ===

    def on_agent_action(self, action: AgentAction, **kwargs):
        # LangChain agents emit this when picking a tool to use; useful for state transitions
        pass

    def on_agent_finish(self, finish: AgentFinish, **kwargs):
        if not self.session_id:
            return
        # Finish coalescer so any buffered output flushes
        if self.coalescer is not None:
            self.coalescer.finish()
            self.coalescer = None

        if self.last_state != "idle":
            self.emitter.state_changed(
                session_id=self.session_id,
                from_state=self.last_state,
                to_state="idle",
                summary_normal="Final response generated.",
            )
            self.last_state = "idle"

    # === Helpers ===

    @staticmethod
    def _classify_error(error: BaseException) -> str:
        if isinstance(error, TimeoutError):
            return "transient"
        if isinstance(error, (PermissionError, ValueError)):
            return "requires_user"
        return "unknown"

    @staticmethod
    def _is_recoverable(error: BaseException) -> bool:
        return isinstance(error, (TimeoutError, ConnectionError))

    @staticmethod
    def _safe_summary(text: str, limit: int = 200) -> str:
        """Truncate and redact obvious secrets."""
        if not isinstance(text, str):
            text = str(text)
        # Naive secret redaction (real implementation should be more thorough)
        for keyword in ("api_key", "password", "token", "secret"):
            if keyword in text.lower():
                text = "[redacted: contains sensitive keyword]"
                break
        return text[:limit]

    @staticmethod
    def _classify_risk(tool_name: str) -> str:
        # Customize per your tool registry
        high_risk = {"send_email", "transfer_funds", "delete_record", "execute_code"}
        medium_risk = {"create_record", "update_record", "publish_post"}
        if tool_name in high_risk:
            return "high"
        if tool_name in medium_risk:
            return "medium"
        return "low"

    @staticmethod
    def _is_irreversible(tool_name: str) -> bool:
        irreversible = {"send_email", "transfer_funds", "delete_record", "publish_post"}
        return tool_name in irreversible

Usage:

from langchain.agents import AgentExecutor

emitter = AAEPEmitter()
handler = AAEPCallbackHandler(emitter)

agent_executor = AgentExecutor(
    agent=my_agent,
    tools=my_tools,
    callbacks=[handler],
)

result = await agent_executor.arun("plan my retirement")

The callback gap problem

LangChain's on_tool_start fires after the tool function has been called (more accurately, after the framework decided to call it). By the time your callback runs, you can emit agent.tool.invoked, but the tool is already executing in another stack frame.

For Conformance Level 1 (notification only), this is fine.

For Conformance Level 2 with irreversible actions, this is broken: you need to emit agent.awaiting.confirmation before the tool runs, and you need to block until the user replies. Callbacks fire after-the-fact and cannot block the framework's flow.

Solution: Combine the callback pattern with the decorator pattern. Wrap each irreversible tool with @aaep_tool so confirmation happens inside the tool function before any side effect. The callback handler still emits the standard tool lifecycle events; the decorator handles the confirmation interposition.

@aaep_tool(emitter, risk_level="high", irreversible=True)
async def send_email(to: str, subject: str, body: str):
    # Decorator emits agent.awaiting.confirmation here, BLOCKS for reply,
    # only then executes the real function
    await actual_send_email(to, subject, body)

# Pass the decorated tool to LangChain
tools = [Tool(name="send_email", func=send_email, ...)]

# Callback handler emits the standard tool lifecycle events
agent_executor = AgentExecutor(
    agent=my_agent,
    tools=tools,
    callbacks=[AAEPCallbackHandler(emitter)],
)

This is the combination most production LangChain deployments use.


AutoGen variant

AutoGen v0.4+ uses message-passing rather than named callbacks. The pattern is similar but you subscribe to a message stream:

from autogen_core.base import MessageContext

class AAEPMessageObserver:
    async def on_message(self, message, context: MessageContext):
        if isinstance(message, UserMessage):
            self.session_id = self.emitter.start_session(...)
        elif isinstance(message, ToolCall):
            self.emitter.tool_invoked(...)
        elif isinstance(message, ToolResult):
            self.emitter.tool_completed(...)
        # ... etc

Common pitfalls

Mistake Consequence Fix
Treating every on_chain_start as session start Nested chains create duplicate sessions Only emit when parent_run_id is None
Emitting on_llm_new_token without coalescing Subscribers flooded Use StreamCoalescer to buffer to sentence boundaries
Not tracking run_id for tool pairing tool.completed can't be matched to tool.invoked Use run_id → tool_call_id mapping
Catching errors in callbacks and swallowing them LangChain expects exceptions to propagate Never swallow; let LangChain handle them
Forgetting to flush coalescer on on_agent_finish Last fragment of output never emitted Always call coalescer.finish()

Testing

aaep-conformance producer \
    --endpoint <your-endpoint> \
    --langchain-mode \
    --level 1

The conformance suite has a LangChain-specific test profile that exercises typical agent flows (single-turn, multi-turn, tool-heavy, streaming-heavy) and checks AAEP emission at each phase.


See also