メインコンテンツまでスキップ

イベント / ストリーミングガイド

SDK の Events モジュールを使って、エージェントの実行状態をリアルタイムにフロントエンドへ配信する方法を解説します。

概要

Events モジュールは EventEmitter を中心とした非同期イベント配信システムです。エージェントが発行したイベントは SSE(Server-Sent Events)、Webhook、DB の3つの配信先に同時配信できます。

エージェント → EventEmitter → ┬→ SSE Handler     → フロントエンド
├→ DB Handler → PostgreSQL
└→ Webhook Handler → 外部システム

イベントタイプ

エージェントが発行できるイベントタイプの一覧です。

EventType用途UI 表示
PHASE_STARTフェーズ開始フェーズ名とスピナー
PROGRESS_UPDATE進捗更新プログレスバー / メッセージ
THOUGHT_MESSAGEエージェントの思考過程思考バブル
TOOL_STARTツール実行開始ツール名とスピナー
TOOL_RESULTツール実行完了ツール結果の表示
FINAL_RESULT最終結果最終レスポンス
COMPLETION_SUCCESS処理完了(成功)完了通知
COMPLETION_FAILURE処理完了(失敗)エラーメッセージ
FILE_CREATEDファイル生成完了ダウンロードリンク
HITL_REQUIRED_BROWSER_VNCブラウザ操作の人間介入要求VNC 接続 UI
HITL_REQUIRED_BROWSER_CLICLI 操作の人間介入要求CLI 入力 UI
HITL_COMPLETED人間介入完了完了通知
PERMISSION_REQUEST実行許可のリクエスト承認ダイアログ
PERMISSION_RESPONSE実行許可の応答承認結果
USER_INTERACTION_REQUIREDユーザー入力要求入力フォーム
UNEXPECTED_ERROR予期しないエラーエラー通知

基本的な使い方

EventEmitter の初期化と使用

emit_event()message(文字列)と metadata(辞書、オプション)を受け取ります。

Python
from agenticstar_platform.events import EventEmitter, EventType, SubEventType

# EventEmitter を初期化(execution_id は必須)
emitter = EventEmitter(execution_id="exec-abc-123")

# フェーズ開始イベント
await emitter.emit_event(
event_type=EventType.PHASE_START,
message="意図を解析中...",
metadata={"phase": "intent"},
)

# 思考過程のストリーミング
await emitter.emit_event(
event_type=EventType.THOUGHT_MESSAGE,
message="ユーザーはRAG検索を求めています",
sub_event_type=SubEventType.SEARCH_WEB,
)

# ツール実行
await emitter.emit_event(
event_type=EventType.TOOL_START,
message="search_knowledge を実行中",
metadata={"tool_name": "search_knowledge", "input": {"query": "AI エージェント"}},
)

# ツール結果
await emitter.emit_event(
event_type=EventType.TOOL_RESULT,
message="検索完了: 5件の結果",
metadata={"tool_name": "search_knowledge", "result_count": 5},
)

# 完了
await emitter.emit_event(
event_type=EventType.COMPLETION_SUCCESS,
message="回答が完了しました",
)

# クリーンアップ(必須)
await emitter.cleanup()

シーケンス番号の自動管理

EventEmitter は発行されたイベントに自動でシーケンス番号を付与します。フロントエンドはこの番号でイベントの順序を保証できます。

Python
# シーケンス番号はプロパティで取得
print(emitter.sequence_number) # 0, 1, 2, ... 自動インクリメント

イベントハンドラー

EventEmitter はコンストラクタで1つのハンドラーを受け取ります。

SSE ハンドラー(フロントエンド配信)

Python
from agenticstar_platform.events import EventEmitter, create_sse_handler

# FastAPI の StreamingResponse と組み合わせる
async def chat_stream(request: ChatRequest):
sse_handler = create_sse_handler()
emitter = EventEmitter(execution_id="exec-123", handler=sse_handler)

# エージェント処理を非同期で開始
asyncio.create_task(run_agent(emitter, request))

# SSE ストリームを返す
return StreamingResponse(
emitter.consume_events(),
media_type="text/event-stream",
)

DB ハンドラー(永続化)

Python
from agenticstar_platform.events.handlers import DatabaseEventHandler

# DB ハンドラーでイベントを PostgreSQL に永続化
db_handler = DatabaseEventHandler(
data_access=data_access,
user_id="user-001",
conversation_id="conv-001",
message_id="msg-001",
)
emitter = EventEmitter(execution_id="exec-123", handler=db_handler)

Webhook ハンドラー(外部通知)

Python
from agenticstar_platform.events.handlers import WebhookEventHandler

# 外部システムに Webhook 通知
webhook_handler = WebhookEventHandler(
webhook_url="https://your-system.example.com/webhook",
conversation_id="conv-001",
message_id="msg-001",
)
emitter = EventEmitter(execution_id="exec-123", handler=webhook_handler)

複合ハンドラー

複数のハンドラーを 1 つにまとめて管理できます。

Python
from agenticstar_platform.events.handlers import CompositeEventHandler

composite = CompositeEventHandler([
create_sse_handler(),
DatabaseEventHandler(data_access=da, user_id="u", conversation_id="c", message_id="m"),
WebhookEventHandler(webhook_url="https://...", conversation_id="c", message_id="m"),
])
emitter = EventEmitter(execution_id="exec-123", handler=composite)

SubEventType による UI 表示の制御

同じ EventType でも SubEventType を変えることで、フロントエンドの表示を細かく制御できます。

SubEventType用途
SEARCH_WEBWeb 検索中
COMMAND_EXECUTIONコマンド実行中
FILE_OPERATIONファイル操作中
FILE_EDITEDファイル編集完了
FILE_READファイル読み取り完了
FILE_SEARCHEDファイル検索完了
BASH_EXECUTEDBash コマンド実行完了
WEB_FETCHEDWeb ページ取得完了
MCP_TOOLMCP ツール実行中
LOCAL_ASSISTANTローカルアシスタント処理中
TASK_LAUNCHEDタスク起動
TODO_UPDATEDTODO 更新
VIDEO_GENERATED動画生成完了
IMAGE_GENERATED画像生成完了
SLIDE_CREATEDスライド作成完了
MACOS_AUTOMATIONmacOS 自動化実行中
Python
# ツール実行の表示を細かく制御
await emitter.emit_event(
event_type=EventType.TOOL_START,
message="Web検索を実行中...",
sub_event_type=SubEventType.SEARCH_WEB,
)

マーケットプレイス向けハンドラー

マーケットプレイスで提供するエージェントには、専用のハンドラーファクトリを使用します。DB + Webhook の複合ハンドラーを一括生成します。

Python
from agenticstar_platform.events.handlers import create_marketplace_handler

handler = create_marketplace_handler(
data_access=data_access,
webhook_url="https://tenant.example.com/webhook",
user_id="user-001",
conversation_id="conv-001",
message_id="msg-001",
)
emitter = EventEmitter(execution_id="exec-123", handler=handler)

エラーハンドリング

Python
try:
await run_agent_logic()
except Exception as e:
# エラーイベントを発行してフロントエンドに通知
await emitter.emit_event(
event_type=EventType.COMPLETION_FAILURE,
message=str(e),
metadata={"error_type": type(e).__name__},
)
finally:
# 必ずクリーンアップを実行
await emitter.cleanup()

次のステップ

SDK API リファレンス — Events Module

EventEmitter / StreamingEvent / EventType の完全仕様

ガイドを見る

アーキテクチャガイド

SDK のモジュール構成と設計思想

ガイドを見る