Skip to main content

Events / Streaming Guide

This guide explains how to use the SDK's Events module to deliver agent execution state to the frontend in real time.

Overview

The Events module is an asynchronous event delivery system centered around EventEmitter. Events emitted by the agent can be simultaneously delivered to three destinations: SSE (Server-Sent Events), Webhook, and DB.

Agent → EventEmitter → ┬→ SSE Handler     → Frontend
├→ DB Handler → PostgreSQL
└→ Webhook Handler → External System

Event Types

A list of event types that agents can emit.

EventTypePurposeUI Display
PHASE_STARTPhase startPhase name and spinner
PROGRESS_UPDATEProgress updateProgress bar / message
THOUGHT_MESSAGEAgent thought processThought bubble
TOOL_STARTTool execution startTool name and spinner
TOOL_RESULTTool execution completeTool result display
FINAL_RESULTFinal resultFinal response
COMPLETION_SUCCESSProcessing complete (success)Completion notification
COMPLETION_FAILUREProcessing complete (failure)Error message
FILE_CREATEDFile generation completeDownload link
HITL_REQUIRED_BROWSER_VNCHuman intervention request for browser operationVNC connection UI
HITL_REQUIRED_BROWSER_CLIHuman intervention request for CLI operationCLI input UI
HITL_COMPLETEDHuman intervention completeCompletion notification
PERMISSION_REQUESTExecution permission requestApproval dialog
PERMISSION_RESPONSEExecution permission responseApproval result
USER_INTERACTION_REQUIREDUser input requestInput form
UNEXPECTED_ERRORUnexpected errorError notification

Basic Usage

Initializing and Using EventEmitter

emit_event() accepts message (string) and metadata (dictionary, optional).

Python
from agenticstar_platform.events import EventEmitter, EventType, SubEventType

# Initialize EventEmitter (execution_id is required)
emitter = EventEmitter(execution_id="exec-abc-123")

# Phase start event
await emitter.emit_event(
event_type=EventType.PHASE_START,
message="Analyzing intent...",
metadata={"phase": "intent"},
)

# Streaming thought process
await emitter.emit_event(
event_type=EventType.THOUGHT_MESSAGE,
message="The user is requesting a RAG search",
sub_event_type=SubEventType.SEARCH_WEB,
)

# Tool execution
await emitter.emit_event(
event_type=EventType.TOOL_START,
message="Executing search_knowledge",
metadata={"tool_name": "search_knowledge", "input": {"query": "AI agent"}},
)

# Tool result
await emitter.emit_event(
event_type=EventType.TOOL_RESULT,
message="Search complete: 5 results",
metadata={"tool_name": "search_knowledge", "result_count": 5},
)

# Completion
await emitter.emit_event(
event_type=EventType.COMPLETION_SUCCESS,
message="Response complete",
)

# Cleanup (required)
await emitter.cleanup()

Automatic Sequence Number Management

EventEmitter automatically assigns sequence numbers to emitted events. The frontend can use these numbers to guarantee event ordering.

Python
# Sequence number is available as a property
print(emitter.sequence_number) # 0, 1, 2, ... auto-incremented

Event Handlers

EventEmitter accepts a single handler in its constructor.

SSE Handler (Frontend Delivery)

Python
from agenticstar_platform.events import EventEmitter, create_sse_handler

# Combine with FastAPI's StreamingResponse
async def chat_stream(request: ChatRequest):
sse_handler = create_sse_handler()
emitter = EventEmitter(execution_id="exec-123", handler=sse_handler)

# Start agent processing asynchronously
asyncio.create_task(run_agent(emitter, request))

# Return SSE stream
return StreamingResponse(
emitter.consume_events(),
media_type="text/event-stream",
)

DB Handler (Persistence)

Python
from agenticstar_platform.events.handlers import DatabaseEventHandler

# Persist events to PostgreSQL with DB handler
db_handler = DatabaseEventHandler(
data_access=data_access,
user_id="user-001",
conversation_id="conv-001",
message_id="msg-001",
)
emitter = EventEmitter(execution_id="exec-123", handler=db_handler)

Webhook Handler (External Notification)

Python
from agenticstar_platform.events.handlers import WebhookEventHandler

# Send webhook notifications to external systems
webhook_handler = WebhookEventHandler(
webhook_url="https://your-system.example.com/webhook",
conversation_id="conv-001",
message_id="msg-001",
)
emitter = EventEmitter(execution_id="exec-123", handler=webhook_handler)

Composite Handler

Multiple handlers can be combined and managed as one.

Python
from agenticstar_platform.events.handlers import CompositeEventHandler

composite = CompositeEventHandler([
create_sse_handler(),
DatabaseEventHandler(data_access=da, user_id="u", conversation_id="c", message_id="m"),
WebhookEventHandler(webhook_url="https://...", conversation_id="c", message_id="m"),
])
emitter = EventEmitter(execution_id="exec-123", handler=composite)

Controlling UI Display with SubEventType

Even with the same EventType, you can control frontend display granularly by changing the SubEventType.

SubEventTypePurpose
SEARCH_WEBWeb search in progress
COMMAND_EXECUTIONCommand execution in progress
FILE_OPERATIONFile operation in progress
FILE_EDITEDFile edit complete
FILE_READFile read complete
FILE_SEARCHEDFile search complete
BASH_EXECUTEDBash command execution complete
WEB_FETCHEDWeb page fetch complete
MCP_TOOLMCP tool execution in progress
LOCAL_ASSISTANTLocal assistant processing
TASK_LAUNCHEDTask launched
TODO_UPDATEDTODO updated
VIDEO_GENERATEDVideo generation complete
IMAGE_GENERATEDImage generation complete
SLIDE_CREATEDSlide creation complete
MACOS_AUTOMATIONmacOS automation in progress
Python
# Fine-grained control of tool execution display
await emitter.emit_event(
event_type=EventType.TOOL_START,
message="Executing web search...",
sub_event_type=SubEventType.SEARCH_WEB,
)

Marketplace Handler

For agents provided through the marketplace, use the dedicated handler factory. It generates a composite handler for DB + Webhook in one step.

Python
from agenticstar_platform.events.handlers import create_marketplace_handler

handler = create_marketplace_handler(
data_access=data_access,
webhook_url="https://tenant.example.com/webhook",
user_id="user-001",
conversation_id="conv-001",
message_id="msg-001",
)
emitter = EventEmitter(execution_id="exec-123", handler=handler)

Error Handling

Python
try:
await run_agent_logic()
except Exception as e:
# Emit error event to notify the frontend
await emitter.emit_event(
event_type=EventType.COMPLETION_FAILURE,
message=str(e),
metadata={"error_type": type(e).__name__},
)
finally:
# Always execute cleanup
await emitter.cleanup()

Next Steps

SDK API Reference — Events Module

Complete specifications for EventEmitter / StreamingEvent / EventType

ガイドを見る

Architecture Guide

SDK module structure and design philosophy

ガイドを見る