イベント / ストリーミングガイド
SDK の Events モジュールを使って、エージェントの実行状態をリアルタイムにフロントエンドへ配信する方法を解説します。
概要
Events モジュールは EventEmitter を中心とした非同期イベント配信システムです。エージェントが発行したイベントは SSE(Server-Sent Events)、Webhook、DB の3つの配信先に同時配信できます。
エージェント → EventEmitter → ┬→ SSE Handler → フロントエンド
├→ DB Handler → PostgreSQL
└→ Webhook Handler → 外部システム
イベントタイプ
エージェントが発行できるイベントタイプの一覧です。
| EventType | 用途 | UI 表示 |
|---|---|---|
PHASE_START | フェーズ開始 | フェーズ名とスピナー |
PROGRESS_UPDATE | 進捗更新 | プログレスバー / メッセージ |
THOUGHT_MESSAGE | エージェントの思考過程 | 思考バブル |
TOOL_START | ツール実行開始 | ツール名とスピナー |
TOOL_RESULT | ツール実行完了 | ツール結果の表示 |
FINAL_RESULT | 最終結果 | 最終レスポンス |
COMPLETION_SUCCESS | 処理完了(成功) | 完了通知 |
COMPLETION_FAILURE | 処理完了(失敗) | エラーメッセージ |
FILE_CREATED | ファイル生成完了 | ダウンロードリンク |
HITL_REQUIRED_BROWSER_VNC | ブラウザ操作の人間介入要求 | VNC 接続 UI |
HITL_REQUIRED_BROWSER_CLI | CLI 操作の人間介入要求 | CLI 入力 UI |
HITL_COMPLETED | 人間介入完了 | 完了通知 |
PERMISSION_REQUEST | 実行許可のリクエスト | 承認ダイアログ |
PERMISSION_RESPONSE | 実行許可の応答 | 承認結果 |
USER_INTERACTION_REQUIRED | ユーザー入力要求 | 入力フォーム |
UNEXPECTED_ERROR | 予期しないエラー | エラー通知 |
基本的な使い方
EventEmitter の初期化と使用
emit_event() は message(文字列)と metadata(辞書、オプション)を受け取ります。
Python
from agenticstar_platform.events import EventEmitter, EventType, SubEventType
# EventEmitter を初期化(execution_id は必須)
emitter = EventEmitter(execution_id="exec-abc-123")
# フェーズ開始イベント
await emitter.emit_event(
event_type=EventType.PHASE_START,
message="意図を解析中...",
metadata={"phase": "intent"},
)
# 思考過程のストリーミング
await emitter.emit_event(
event_type=EventType.THOUGHT_MESSAGE,
message="ユーザーはRAG検索を求めています",
sub_event_type=SubEventType.SEARCH_WEB,
)
# ツール実行
await emitter.emit_event(
event_type=EventType.TOOL_START,
message="search_knowledge を実行中",
metadata={"tool_name": "search_knowledge", "input": {"query": "AI エージェント"}},
)
# ツール結果
await emitter.emit_event(
event_type=EventType.TOOL_RESULT,
message="検索完了: 5件の結果",
metadata={"tool_name": "search_knowledge", "result_count": 5},
)
# 完了
await emitter.emit_event(
event_type=EventType.COMPLETION_SUCCESS,
message="回答が完了しました",
)
# クリーンアップ(必須)
await emitter.cleanup()
シーケンス番号の自動管理
EventEmitter は発行されたイベントに自動でシーケンス番号を付与します。フロントエンドはこの番号でイベントの順序を保証できます。
Python
# シーケンス番号はプロパティで取得
print(emitter.sequence_number) # 0, 1, 2, ... 自動インクリメント
イベントハンドラー
EventEmitter はコンストラクタで1つのハンドラーを受け取ります。
SSE ハンドラー(フロントエンド配信)
Python
from agenticstar_platform.events import EventEmitter, create_sse_handler
# FastAPI の StreamingResponse と組み合わせる
async def chat_stream(request: ChatRequest):
sse_handler = create_sse_handler()
emitter = EventEmitter(execution_id="exec-123", handler=sse_handler)
# エージェント処理を非同期で開始
asyncio.create_task(run_agent(emitter, request))
# SSE ストリームを返す
return StreamingResponse(
emitter.consume_events(),
media_type="text/event-stream",
)
DB ハンドラー(永続化)
Python
from agenticstar_platform.events.handlers import DatabaseEventHandler
# DB ハンドラーでイベントを PostgreSQL に永続化
db_handler = DatabaseEventHandler(
data_access=data_access,
user_id="user-001",
conversation_id="conv-001",
message_id="msg-001",
)
emitter = EventEmitter(execution_id="exec-123", handler=db_handler)
Webhook ハンドラー(外部通知)
Python
from agenticstar_platform.events.handlers import WebhookEventHandler
# 外部システムに Webhook 通知
webhook_handler = WebhookEventHandler(
webhook_url="https://your-system.example.com/webhook",
conversation_id="conv-001",
message_id="msg-001",
)
emitter = EventEmitter(execution_id="exec-123", handler=webhook_handler)
複合ハンドラー
複数のハンドラーを 1 つにまとめて管理できます。
Python
from agenticstar_platform.events.handlers import CompositeEventHandler
composite = CompositeEventHandler([
create_sse_handler(),
DatabaseEventHandler(data_access=da, user_id="u", conversation_id="c", message_id="m"),
WebhookEventHandler(webhook_url="https://...", conversation_id="c", message_id="m"),
])
emitter = EventEmitter(execution_id="exec-123", handler=composite)
SubEventType による UI 表示の制御
同じ EventType でも SubEventType を変えることで、フロントエンドの表示を細かく制御できます。
| SubEventType | 用途 |
|---|---|
SEARCH_WEB | Web 検索中 |
COMMAND_EXECUTION | コマンド実行中 |
FILE_OPERATION | ファイル操作中 |
FILE_EDITED | ファイル編集完了 |
FILE_READ | ファイル読み取り完了 |
FILE_SEARCHED | ファイル検索完了 |
BASH_EXECUTED | Bash コマンド実行完了 |
WEB_FETCHED | Web ページ取得完了 |
MCP_TOOL | MCP ツール実行中 |
LOCAL_ASSISTANT | ローカルアシスタント処理中 |
TASK_LAUNCHED | タスク起動 |
TODO_UPDATED | TODO 更新 |
VIDEO_GENERATED | 動画生成完了 |
IMAGE_GENERATED | 画像生成完了 |
SLIDE_CREATED | スライド作成完了 |
MACOS_AUTOMATION | macOS 自動化実行中 |
Python
# ツール実行の表示を細かく制御
await emitter.emit_event(
event_type=EventType.TOOL_START,
message="Web検索を実行中...",
sub_event_type=SubEventType.SEARCH_WEB,
)
マーケットプレイス向けハンドラー
マーケットプレイスで提供するエージェントには、専用のハンドラーファクトリを使用します。DB + Webhook の複合ハンドラーを一括生成します。
Python
from agenticstar_platform.events.handlers import create_marketplace_handler
handler = create_marketplace_handler(
data_access=data_access,
webhook_url="https://tenant.example.com/webhook",
user_id="user-001",
conversation_id="conv-001",
message_id="msg-001",
)
emitter = EventEmitter(execution_id="exec-123", handler=handler)
エラーハンドリング
Python
try:
await run_agent_logic()
except Exception as e:
# エラーイベントを発行してフロントエンドに通知
await emitter.emit_event(
event_type=EventType.COMPLETION_FAILURE,
message=str(e),
metadata={"error_type": type(e).__name__},
)
finally:
# 必ずクリーンアップを実行
await emitter.cleanup()
次のステップ
SDK API リファレンス — Events Module
EventEmitter / StreamingEvent / EventType の完全仕様
アーキテクチャガイド
SDK のモジュール構成と設計思想