Events / Streaming Guide
This guide explains how to use the SDK's Events module to deliver agent execution state to the frontend in real time.
Overview
The Events module is an asynchronous event delivery system centered around EventEmitter. Events emitted by the agent can be simultaneously delivered to three destinations: SSE (Server-Sent Events), Webhook, and DB.
Agent → EventEmitter → ┬→ SSE Handler → Frontend
├→ DB Handler → PostgreSQL
└→ Webhook Handler → External System
Event Types
A list of event types that agents can emit.
| EventType | Purpose | UI Display |
|---|---|---|
PHASE_START | Phase start | Phase name and spinner |
PROGRESS_UPDATE | Progress update | Progress bar / message |
THOUGHT_MESSAGE | Agent thought process | Thought bubble |
TOOL_START | Tool execution start | Tool name and spinner |
TOOL_RESULT | Tool execution complete | Tool result display |
FINAL_RESULT | Final result | Final response |
COMPLETION_SUCCESS | Processing complete (success) | Completion notification |
COMPLETION_FAILURE | Processing complete (failure) | Error message |
FILE_CREATED | File generation complete | Download link |
HITL_REQUIRED_BROWSER_VNC | Human intervention request for browser operation | VNC connection UI |
HITL_REQUIRED_BROWSER_CLI | Human intervention request for CLI operation | CLI input UI |
HITL_COMPLETED | Human intervention complete | Completion notification |
PERMISSION_REQUEST | Execution permission request | Approval dialog |
PERMISSION_RESPONSE | Execution permission response | Approval result |
USER_INTERACTION_REQUIRED | User input request | Input form |
UNEXPECTED_ERROR | Unexpected error | Error notification |
Basic Usage
Initializing and Using EventEmitter
emit_event() accepts message (string) and metadata (dictionary, optional).
from agenticstar_platform.events import EventEmitter, EventType, SubEventType
# Initialize EventEmitter (execution_id is required)
emitter = EventEmitter(execution_id="exec-abc-123")
# Phase start event
await emitter.emit_event(
event_type=EventType.PHASE_START,
message="Analyzing intent...",
metadata={"phase": "intent"},
)
# Streaming thought process
await emitter.emit_event(
event_type=EventType.THOUGHT_MESSAGE,
message="The user is requesting a RAG search",
sub_event_type=SubEventType.SEARCH_WEB,
)
# Tool execution
await emitter.emit_event(
event_type=EventType.TOOL_START,
message="Executing search_knowledge",
metadata={"tool_name": "search_knowledge", "input": {"query": "AI agent"}},
)
# Tool result
await emitter.emit_event(
event_type=EventType.TOOL_RESULT,
message="Search complete: 5 results",
metadata={"tool_name": "search_knowledge", "result_count": 5},
)
# Completion
await emitter.emit_event(
event_type=EventType.COMPLETION_SUCCESS,
message="Response complete",
)
# Cleanup (required)
await emitter.cleanup()
Automatic Sequence Number Management
EventEmitter automatically assigns sequence numbers to emitted events. The frontend can use these numbers to guarantee event ordering.
# Sequence number is available as a property
print(emitter.sequence_number) # 0, 1, 2, ... auto-incremented
Event Handlers
EventEmitter accepts a single handler in its constructor.
SSE Handler (Frontend Delivery)
from agenticstar_platform.events import EventEmitter, create_sse_handler
# Combine with FastAPI's StreamingResponse
async def chat_stream(request: ChatRequest):
sse_handler = create_sse_handler()
emitter = EventEmitter(execution_id="exec-123", handler=sse_handler)
# Start agent processing asynchronously
asyncio.create_task(run_agent(emitter, request))
# Return SSE stream
return StreamingResponse(
emitter.consume_events(),
media_type="text/event-stream",
)
DB Handler (Persistence)
from agenticstar_platform.events.handlers import DatabaseEventHandler
# Persist events to PostgreSQL with DB handler
db_handler = DatabaseEventHandler(
data_access=data_access,
user_id="user-001",
conversation_id="conv-001",
message_id="msg-001",
)
emitter = EventEmitter(execution_id="exec-123", handler=db_handler)
Webhook Handler (External Notification)
from agenticstar_platform.events.handlers import WebhookEventHandler
# Send webhook notifications to external systems
webhook_handler = WebhookEventHandler(
webhook_url="https://your-system.example.com/webhook",
conversation_id="conv-001",
message_id="msg-001",
)
emitter = EventEmitter(execution_id="exec-123", handler=webhook_handler)
Composite Handler
Multiple handlers can be combined and managed as one.
from agenticstar_platform.events.handlers import CompositeEventHandler
composite = CompositeEventHandler([
create_sse_handler(),
DatabaseEventHandler(data_access=da, user_id="u", conversation_id="c", message_id="m"),
WebhookEventHandler(webhook_url="https://...", conversation_id="c", message_id="m"),
])
emitter = EventEmitter(execution_id="exec-123", handler=composite)
Controlling UI Display with SubEventType
Even with the same EventType, you can control frontend display granularly by changing the SubEventType.
| SubEventType | Purpose |
|---|---|
SEARCH_WEB | Web search in progress |
COMMAND_EXECUTION | Command execution in progress |
FILE_OPERATION | File operation in progress |
FILE_EDITED | File edit complete |
FILE_READ | File read complete |
FILE_SEARCHED | File search complete |
BASH_EXECUTED | Bash command execution complete |
WEB_FETCHED | Web page fetch complete |
MCP_TOOL | MCP tool execution in progress |
LOCAL_ASSISTANT | Local assistant processing |
TASK_LAUNCHED | Task launched |
TODO_UPDATED | TODO updated |
VIDEO_GENERATED | Video generation complete |
IMAGE_GENERATED | Image generation complete |
SLIDE_CREATED | Slide creation complete |
MACOS_AUTOMATION | macOS automation in progress |
# Fine-grained control of tool execution display
await emitter.emit_event(
event_type=EventType.TOOL_START,
message="Executing web search...",
sub_event_type=SubEventType.SEARCH_WEB,
)
Marketplace Handler
For agents provided through the marketplace, use the dedicated handler factory. It generates a composite handler for DB + Webhook in one step.
from agenticstar_platform.events.handlers import create_marketplace_handler
handler = create_marketplace_handler(
data_access=data_access,
webhook_url="https://tenant.example.com/webhook",
user_id="user-001",
conversation_id="conv-001",
message_id="msg-001",
)
emitter = EventEmitter(execution_id="exec-123", handler=handler)
Error Handling
try:
await run_agent_logic()
except Exception as e:
# Emit error event to notify the frontend
await emitter.emit_event(
event_type=EventType.COMPLETION_FAILURE,
message=str(e),
metadata={"error_type": type(e).__name__},
)
finally:
# Always execute cleanup
await emitter.cleanup()
Next Steps
SDK API Reference — Events Module
Complete specifications for EventEmitter / StreamingEvent / EventType
Architecture Guide
SDK module structure and design philosophy