メインコンテンツまでスキップ

SDK API リファレンス

agenticstar-platform SDK(v0.5.3)の全モジュール・クラス・メソッドの完全仕様です。

インストール

pip を使用して SDK をインストールしてください。

基本インストールcurl
pip install agenticstar-platform==0.5.3

特定モジュールのみインストールする場合は、extras を指定してください。

Extra説明
[[db]]PostgreSQL database support
[[rag]]RAG (embedding + vector search) support
[[storage]]Cloud storage (Azure, S3, GCS) support
[[security]]Security (content moderation, PII detection) support
[[auth]]AGENTIC STAR Auth service integration
[[memory]]Semantic Memory (Mem0 + Qdrant) support
[[all]]全モジュール
特定モジュールのインストール例curl
# RAG モジュール
pip install agenticstar-platform[rag]==0.5.3

# DB + RAG + Storage
pip install agenticstar-platform[db,rag,storage]==0.5.3

# 全モジュール
pip install agenticstar-platform[all]==0.5.3

モジュール一覧

SDK は以下の 8 つのモジュールで構成されています。

モジュール説明主要クラス
events非同期イベント配信システム(SSE / Webhook)EventEmitter, StreamingEvent
dbPostgreSQL 接続 + Azure AD 認証PostgreSQLManager, DataAccess
ragRAG(embedding + vector search via Qdrant)EmbeddingGenerator, QdrantManager
storageクラウドストレージ(Azure, S3, GCS)AzureBlobStorageClient, S3StorageClient
authAGENTIC STAR 認証 API クライアントAgenticStarAuthClient
memoryセマンティックメモリ(Mem0 + Qdrant)SemanticMemoryClient
securityContent Moderation + PII DetectionAzureSecurityClient, ContentSafetyValidator
common共通ユーティリティ(バリデーション・シークレットマスク)SecretMasker, validate_identifier

events Events Module

非同期イベント配信システム。EventEmitter でイベントを発行し、SSE / Webhook / DB に配信します。

Enums

EventType

フロントエンド UI に表示されるイベントタイプ。

PHASE_START = "phase_start"
PROGRESS_UPDATE = "progress_update"
THOUGHT_MESSAGE = "thought_message"
COMPLETION_SUCCESS = "completion_success"
COMPLETION_FAILURE = "completion_failure"
USER_INTERACTION_REQUIRED = "user_interaction_required"
UNEXPECTED_ERROR = "unexpected_error"
HITL_REQUIRED_BROWSER_VNC = "hitl_required_browser_vnc"
HITL_REQUIRED_BROWSER_CLI = "hitl_required_browser_cli"
HITL_COMPLETED = "hitl_completed"
FILE_CREATED = "file_created"
PERMISSION_REQUEST = "permission_request"
PERMISSION_RESPONSE = "permission_response"
TOOL_START = "tool_start"
TOOL_RESULT = "tool_result"
FINAL_RESULT = "final_result"
PROGRESS_MESSAGE = "progress_message"

SubEventType

フロントエンド UI の細分化用サブイベントタイプ。

SEARCH_WEB = "search_web"
COMMAND_EXECUTION = "command_execution"
FILE_OPERATION = "file_operation"
LOCAL_ASSISTANT = "local_assistant"
MCP_TOOL = "mcp_tool"
A2A_TOOL = "a2a_tool"
FILE_EDITED = "file_edited"
FILE_READ = "file_read"
FILE_SEARCHED = "file_searched"
BASH_EXECUTED = "bash_executed"
WEB_FETCHED = "web_fetched"
TASK_LAUNCHED = "task_launched"
TODO_UPDATED = "todo_updated"
VIDEO_GENERATED = "video_generated"
IMAGE_GENERATED = "image_generated"
SLIDE_CREATED = "slide_created"
MACOS_AUTOMATION = "macos_automation"

Data Classes

StreamingEvent

リアルタイム SSE 配信用のイベントデータ。

@dataclass class StreamingEvent: event_type: EventType execution_id: str message: str timestamp: float metadata: Optional[Dict[str, Any]]= None sub_event_type: Optional[SubEventType]= None def to_dict() -> Dict[str, Any]: ...

SequencedEvent

順序保証付きイベント。

@dataclass class SequencedEvent: sequence: int # 0以上 event_type: EventType execution_id: str message: str timestamp: float metadata: Optional[Dict[str, Any]]= None sub_event_type: Optional[SubEventType]= None def __post_init__(): ... # sequence >= 0 を検証 def to_streaming_event() -> StreamingEvent: ... def to_dict() -> Dict[str, Any]: ...

ExecutionMessage

マーケットプレイス UI 統合用のデータベース格納メッセージ。

@dataclass class ExecutionMessage: user_id: str conversation_id: str message_id: str chunk_type: str content_data: Dict[str, Any] def to_dict() -> Dict[str, Any]: ... @classmethod def from_streaming_event( event: StreamingEvent, user_id: str, conversation_id: str, message_id: str, chunk_type: Optional[str]= None ) -> ExecutionMessage: ...

Classes

EventEmitter

非同期イベントキュー。イベント発行・消費を非ブロッキングで実行。

class EventEmitter: def __init__( execution_id: str, handler: Optional[EventHandler]= None ): ... @property def is_completed() -> bool: ... @property def sequence_number() -> int: ... async def emit_event( event_type: EventType, message: str, metadata: Optional[Dict[str, Any]]= None, sub_event_type: Optional[SubEventType]= None ) -> None: ... async def consume_events( timeout: float = 0.1 ) -> AsyncGenerator[str,None]: ... def mark_completed() -> None: ... async def cleanup() -> None: ... async def __aenter__() -> EventEmitter: ... async def __aexit__(...) -> None: ...
Returns
emit_event() → Noneイベントをキューに追加。ハンドラーが設定されている場合は非同期で呼び出される
consume_events() → AsyncGenerator[str, None]SSE フォーマットの文字列を非同期に yield。完了時に自動停止
Context manager 対応: async with EventEmitter(...) as emitter:
イベント発行と消費Python
from agenticstar_platform import EventEmitter, EventType

emitter = EventEmitter(execution_id="exec-abc-123")

# イベント発行
await emitter.emit_event(
event_type=EventType.PHASE_START,
message="ドキュメント分析を開始します",
metadata={"phase": "analysis", "total_pages": 42}
)

# イベント消費(SSE ストリーミング)
async for chunk in emitter.consume_events():
print(chunk)

DatabaseEventHandler

PostgreSQL の execution_messages テーブルにイベントを永続化。

class DatabaseEventHandler: def __init__( data_access: Any, user_id: str, conversation_id: str, message_id: str, table_name: str = "execution_messages", chunk_type_map: Optional[Dict[EventType, str]]= None ): ... async def __call__(event: StreamingEvent) -> Optional[str]: ...

WebhookEventHandler

HTTP webhook エンドポイントにイベントを送信。

class WebhookEventHandler: def __init__( webhook_url: str, conversation_id: str, message_id: str, headers: Optional[Dict[str, str]]= None, timeout_seconds: int = 30, message_type: str = "append_message", token_provider: Optional[Callable]= None, chunk_type_map: Optional[Dict[EventType, str]]= None ): ... async def __call__(event: StreamingEvent) -> Optional[str]: ...

CompositeEventHandler

複数の EventHandler を並列実行。

class CompositeEventHandler: def __init__( handlers: List[Callable], return_first_chunk: bool = True ): ... async def __call__(event: StreamingEvent) -> Optional[str]: ...

Factory Functions

def create_sse_handler() -> EventHandler

SSE (Server-Sent Events) 形式のハンドラーを作成。

def create_json_handler() -> EventHandler

JSON 形式のハンドラーを作成。

def create_marketplace_handler( data_access: Any, webhook_url: str, user_id: str, conversation_id: str, message_id: str, token_provider: Optional[Callable]= None ) -> CompositeEventHandler

マーケットプレイス UI 用の複合ハンドラー(DB + Webhook)を作成。

Protocol

EventHandler

イベント処理の汎用インターフェース。

class EventHandler(Protocol): async def __call__( self, event: StreamingEvent ) -> Optional[str]: ...
Related
Database ModuleDatabaseEventHandler の data_access に DataAccess を渡して永続化
Storage Moduleイベントログのクラウドストレージへのアーカイブ

db Database Module

PostgreSQL 接続プール + Azure AD 認証。

Exceptions

DatabaseError (Exception) ├── ConnectionError ├── QueryError └── ConfigError

Configuration

AzureADConfig

Azure Active Directory 認証設定。

@dataclass class AzureADConfig: tenant_id: str client_id: str client_secret: str # 非表示 username: str = "" @classmethod def from_dict(data: Dict[str, Any]) -> AzureADConfig: ... @classmethod def from_toml(toml_path: str, section: str = "azure_ad") -> AzureADConfig: ...

PostgreSQLConfig

PostgreSQL 接続設定(Azure AD サポート)。

class PostgreSQLConfig: provider: str = "postgresql" host: str = "localhost" port: int = 5432 database: str = "agenticstar_db" use_azure_ad: bool = False username: Optional[str]= None password: Optional[str]= None # 非表示 pool_min_size: int = 5 pool_max_size: int = 20 command_timeout: int = 60 pool_timeout: Optional[int]= 30 max_overflow: Optional[int]= 10 azure_ad: Optional[AzureADConfig]= None api_proxy_url: Optional[str]= None ssl_mode: str = "require" @classmethod def from_dict(data: Dict[str, Any]) -> PostgreSQLConfig: ... @classmethod def from_toml(toml_path: str, section: str = "database") -> PostgreSQLConfig: ... @classmethod def from_env(prefix: str = "DB_") -> PostgreSQLConfig: ...

TOML 設定例:

config.toml - PostgreSQLToml
[database]
provider = "postgresql"
host = "db.example.com"
port = 5432
database = "agenticstar_db"
use_azure_ad = true
pool_min_size = 5
pool_max_size = 20
command_timeout = 60

[azure_ad]
tenant_id = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
client_id = "yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy"
client_secret = "your-secret-here"
username = "dbuser@example.onmicrosoft.com"

Connection Management

PostgreSQLManager

非同期 PostgreSQL 接続プール(Azure AD トークン管理対応)。

class PostgreSQLManager: def __init__(config: PostgreSQLConfig): ... @property def config() -> PostgreSQLConfig: ... @property def pool() -> Optional[asyncpg.Pool]: ... async def __aenter__() -> PostgreSQLManager: ... async def __aexit__(...) -> None: ... async def initialize() -> None: ... async def close() -> None: ... async def execute_query( query: str, params: tuple = () ) -> Dict[str, Any]: ... async def fetch_one( query: str, params: tuple = () ) -> Optional[Dict[str, Any]]: ... async def fetch_all( query: str, params: tuple = () ) -> List[Dict[str, Any]]: ... async def execute( query: str, params: tuple = () ) -> str: ...
Returns
execute_query() → Dict[str, Any]{"success": bool, "data": list, "count": int}
fetch_one() → Optional[Dict[str, Any]]1行の結果辞書、または該当なしで None
fetch_all() → List[Dict[str, Any]]結果行のリスト(0件の場合は空リスト)
execute() → strPostgreSQL のステータス文字列(例: "INSERT 0 1")
Context manager 対応: async with PostgreSQLManager(config) as mgr:
データベース接続と Query 実行Python
from agenticstar_platform.db import PostgreSQLConfig, PostgreSQLManager

config = PostgreSQLConfig.from_toml("config.toml")

async with PostgreSQLManager(config) as mgr:
await mgr.initialize()

# SELECT クエリ
result = await mgr.fetch_all(
"SELECT * FROM conversations WHERE user_id = $1",
("user-123",)
)

# INSERT
await mgr.execute(
"INSERT INTO events (event_type, data) VALUES ($1, $2)",
("phase_start", '{}')
)

Data Access Layer

DataAccess

SQL インジェクション防止を備えた汎用テーブル操作ラッパー。

class DataAccess: def __init__( db: PostgreSQLManager| ApiPostgreSQLManager ): ... async def __aenter__() -> DataAccess: ... async def __aexit__(...) -> None: ... async def initialize() -> None: ... async def close() -> None: ... def is_initialized() -> bool: ... async def ensure_initialized() -> None: ... async def execute_query( query: str, params: tuple = () ) -> Dict[str, Any]: ... async def insert( table: str, data: Dict[str, Any], returning: Optional[List[str]]= None ) -> Dict[str, Any]: ... async def upsert( table: str, data: Dict[str, Any], conflict_columns: List[str], update_columns: Optional[List[str]]= None, returning: Optional[List[str]]= None ) -> Dict[str, Any]: ... async def select( table: str, columns: Optional[List[str]]= None, where: Optional[Dict[str, Any]]= None, order_by: Optional[str]= None, limit: Optional[int]= None ) -> Dict[str, Any]: ... async def select_one( table: str, columns: Optional[List[str]]= None, where: Optional[Dict[str, Any]]= None ) -> Optional[Dict[str, Any]]: ... async def update( table: str, data: Dict[str, Any], where: Dict[str, Any], returning: Optional[List[str]]= None ) -> Dict[str, Any]: ... async def delete( table: str, where: Dict[str, Any], returning: Optional[List[str]]= None ) -> Dict[str, Any]: ...
Returns
insert() / upsert() → Dict[str, Any]{"success": bool, "data": dict}。returning 指定時はカラム値を含む
select() → Dict[str, Any]{"success": bool, "data": list, "count": int}
update() / delete() → Dict[str, Any]{"success": bool, "affected_rows": int}

主要メソッドの使用例:

# INSERT result = await da.insert("users", {"name": "Alice"}, returning=["id"]) # UPSERT(競合時は更新) result = await da.upsert("users", {"email": "a@example.com", "name": "Alice"}, conflict_columns=["email"]) # SELECT with WHERE + ORDER + LIMIT result = await da.select("users", columns=["id", "name"], where={"active": True}, order_by="created_at DESC", limit=10)

ConfigAccess

エージェント・ツール・ガードレール・MCP などの設定情報へのアクセス層。

class ConfigAccess: def __init__(db: DataAccess): ... async def get_agent_instructions(agent_type: str) -> Dict[str, Any]: ... async def get_all_agent_configs() -> Dict[str, Any]: ... async def get_agent_config(agent_type: str, requested_level: str = "default") -> Dict[str, Any]: ... async def get_tool_config(tool_name: str) -> Dict[str, Any]: ... async def get_all_tool_configs() -> Dict[str, Any]: ... async def get_banned_urls() -> Dict[str, Any]: ... async def get_guardrails_settings() -> Dict[str, Any]: ... async def get_system_settings() -> Dict[str, Any]: ... async def get_mcp_configurations(execution_mode: Optional[str]= None) -> Dict[str, Any]: ...
禁止 URL の制限粒度を拡張する

get_banned_urls() は管理画面で登録された「禁止 URL」一覧を取得します。基本機能(ExtAuth Service)はドメイン単位の制限ですが、本メソッドで取得したパターンをエージェントのツール内で re.search 等により URL 文字列に照合することで、URL(パス)単位の制限を拡張機能として実装できます。設定は管理画面で一元管理されるため、運用上の重複は発生しません。

ExecutionAccess

実行メッセージの取得層。

class ExecutionAccess: def __init__(db: DataAccess): ... async def get_messages( execution_id: str ) -> Optional[List[Dict[str, Any]]]: ...

TelemetryAccess

NIST 監査テレメトリの保存・取得層。

class TelemetryAccess: def __init__(db: DataAccess): ... async def save_telemetry( telemetry_data: Dict[str, Any] ) -> Dict[str, Any]: ... async def list_telemetry( conversation_id: Optional[str]= None, service: Optional[str]= None, limit: int = 100, offset: int = 0 ) -> Dict[str, Any]: ... async def get_telemetry( telemetry_id: str ) -> Optional[Dict[str, Any]]: ...

PodRuntime

Pod ライフサイクル管理(開始通知・終了通知・自動スケールダウン)。

class PodRuntime: def __init__( db: DataAccess, execution_id: str, pod_name: str ): ... async def start() -> Dict[str, Any]: ... async def final(status: str = "completed") -> Dict[str, Any]: ...

ApiPostgreSQLManager

HTTP API プロキシ経由の PostgreSQL アクセス(CLI モード用)。PostgreSQLManager と同一の非同期インターフェースを持つ。

class ApiPostgreSQLManager: def __init__(api_url: str, token_provider: Optional[Callable]= None): ... async def fetch_one(query: str, params: tuple = ()) -> Optional[Dict[str, Any]]: ... async def fetch_all(query: str, params: tuple = ()) -> List[Dict[str, Any]]: ... async def execute(query: str, params: tuple = ()) -> str: ... async def execute_query(query: str, params: tuple = ()) -> Dict[str, Any]: ...

Factory Function

def create_postgresql_manager( config: PostgreSQLConfig ) -> Union[PostgreSQLManager, ApiPostgreSQLManager]

config.api_proxy_url の有無に応じて PostgreSQLManager(直接接続)または ApiPostgreSQLManager(API プロキシ)を自動選択して返す。

Error Handling

データベース操作の例外処理。

Database Error HandlingPython
from agenticstar_platform.db import PostgreSQLConfig, PostgreSQLManager, DataAccess

config = PostgreSQLConfig.from_toml("config.toml")

try:
db_manager = PostgreSQLManager(config)
da = DataAccess(db_manager)
await da.initialize()

result = await da.select("users", where={"id": "user-123"})
if result["success"] and result["data"]:
print(f"User: {result['data'][0]['username']}")
except Exception as e:
print(f"Database error: {e}")
finally:
await da.close()
Related
Events ModuleDatabaseEventHandler でイベントを DB に永続化
RAG ModuleDB から取得したデータを Embedding 化して検索可能にする

rag RAG Module

RAG (Retrieval-Augmented Generation): Embedding + Vector Search via Qdrant。

Enums

VectorStoreProvider

QDRANT = "qdrant"
WEAVIATE = "weaviate"
MILVUS = "milvus"

Exceptions

RAGError (Exception) ├── EmbeddingError ├── QdrantError └── SearchError

Data Classes

SearchResult

単一の検索結果。

@dataclass class SearchResult: point_id: str payload: Dict[str, Any] score: float similarity: float = 0.0

UpsertResult

Upsert 操作の結果。

@dataclass class UpsertResult: success: bool point_id: str = "" error: Optional[str]= None error_code: Optional[str]= None

SearchResponse

検索操作の結果。

@dataclass class SearchResponse: success: bool results: List[SearchResult]= field(default_factory=list) total_found: int = 0 query: str = "" error: Optional[str]= None error_code: Optional[str]= None

Embedding Configuration

EmbeddingConfig

Azure OpenAI エンベッディング設定。

class EmbeddingConfig: base_url: str api_key: str # 非表示 model: str = "text-embedding-ada-002" api_version: str = "2024-02-15-preview" max_cache_size: int = 10000 dimensions: int = 1536 max_retries: int = 5 base_delay: float = 1.0 max_delay: float = 60.0 @classmethod def from_dict(data: Dict[str, Any]) -> EmbeddingConfig: ... @classmethod def from_toml(toml_path: str, section: str = "rag.embedding") -> EmbeddingConfig: ...

EmbeddingGenerator

キャッシング機能とリトライロジック付きエンベッディング生成。

class EmbeddingGenerator: def __init__(config: EmbeddingConfig): ... async def generate(text: str) -> List[float]: ... async def generate_batch( texts: List[str], batch_size: int = 25 ) -> List[List[float]]: ... def clear_cache() -> None: ...
Returns
generate() → List[float]ベクトル(次元数は config.dimensions、デフォルト 1536)。キャッシュ済みの場合は即返却
generate_batch() → List[List[float]]入力テキスト順のベクトルリスト。batch_size 単位で API 呼び出し(デフォルト 25)
# 単一テキストの Embedding vector = await generator.generate("Hello, world!") # → [0.012, -0.034, ...] # バッチ Embedding vectors = await generator.generate_batch(["text1", "text2", "text3"])

Vector Database

QdrantConfig

Qdrant データベース設定。

class QdrantConfig: url: str collection_name: str vector_size: int = 1536 distance: str = "cosine" # cosine, euclid, dot on_disk_vectors: bool = False on_disk_payload: bool = True hnsw_m: int = 16 hnsw_ef_construct: int = 256 payload_indexes: List[PayloadIndexConfig] auth_token_provider: Optional[Callable[[], str]]= None @classmethod def from_dict(data: Dict[str, Any]) -> QdrantConfig: ... @classmethod def from_toml(toml_path: str, section: str = "rag.qdrant") -> QdrantConfig: ...

PayloadIndexConfig

Payload フィールドのインデックス設定。

@dataclass class PayloadIndexConfig: field_name: str field_schema: str = "keyword" # keyword, integer, float, bool @classmethod def from_dict(data: Dict[str, Any]) -> PayloadIndexConfig: ...

TOML 設定例:

config.toml - RAG (Qdrant)Toml
[embedding]
provider = "openai"
model = "text-embedding-3-small"
api_key = "sk-..."
dimension = 1536

[qdrant]
host = "qdrant.example.com"
port = 6333
api_key = "your-qdrant-key"
use_ssl = true

QdrantManager

Qdrant ベクトル検索クライアント。

class QdrantManager: def __init__( config: QdrantConfig, embedding_generator: EmbeddingGenerator ): ... async def __aenter__() -> QdrantManager: ... async def __aexit__(...) -> None: ... def is_initialized() -> bool: ... async def initialize() -> None: ... async def ensure_initialized() -> None: ... async def upsert( point_id: str, text: str, payload: Dict[str, Any] ) -> Dict[str, Any]: ... async def batch_upsert( items: List[Dict[str, Any]], batch_size: int = 256 ) -> Dict[str, Any]: ... async def search( query_text: str, limit: int = 10, score_threshold: float = 0.4, filter_conditions: Optional[Dict[str, Any]]= None ) -> Dict[str, Any]: ... async def delete( point_ids: List[str] ) -> Dict[str, Any]: ... async def get_statistics() -> Dict[str, Any]: ... async def close() -> None: ...
Returns
upsert() → Dict[str, Any]{"success": bool, "point_id": str}
batch_upsert() → Dict[str, Any]{"success": bool, "upserted_count": int, "errors": list}
search() → Dict[str, Any]{"success": bool, "data": List[SearchResult], "total_found": int}
get_statistics() → Dict[str, Any]{"vectors_count": int, "indexed_vectors_count": int, "points_count": int, ...}
Context manager 対応: async with QdrantManager(config, generator) as qdrant:
RAG: Embedding + Qdrant SearchPython
from agenticstar_platform.rag import (
EmbeddingConfig, EmbeddingGenerator,
QdrantConfig, QdrantManager
)

emb_config = EmbeddingConfig.from_toml("config.toml")
qd_config = QdrantConfig.from_toml("config.toml")

generator = EmbeddingGenerator(emb_config)

async with QdrantManager(qd_config, generator) as qdrant:
# ドキュメントの追加
await qdrant.upsert(
point_id="doc-001",
text="AGENTIC STAR は AI エージェント開発プラットフォームです",
payload={"source": "docs", "category": "overview"}
)

# セマンティック検索
results = await qdrant.search(
query_text="AI プラットフォームとは?",
limit=5,
score_threshold=0.5
)
for r in results["data"]:
print(f"Score: {r['score']}, Text: {r['payload']['text']}")
Related
Database ModuleDB から取得したデータを RAG に投入
Storage Moduleドキュメントファイルをストレージから取得して Embedding 化
Security ModuleEmbedding 前にコンテンツの安全性を検証

storage Storage Module

クラウドストレージ統合(Azure Blob Storage / AWS S3 / Google Cloud Storage)。

Enums

StorageProvider

Cloud storage provider type.

AZURE_BLOB = "azure_blob"
AWS_S3 = "aws_s3"
GCS = "gcs"

Exceptions

StorageError (Exception) - Base exception ├── StorageConfigError - Configuration error ├── StorageConnectionError - Connection error └── StorageOperationError - Operation error with error_code attribute

Result Types

UploadResult

File upload result.

@dataclass class UploadResult: success: bool # Upload succeeded object_name: str = "" # Object name object_url: str = "" # Object URL file_size: int = 0 # File size content_type: str = "" # Content type error: Optional[str]= None # Error message error_code: Optional[str]= None # Error code metadata: Dict[str, Any]= field(default_factory=dict) # Metadata

DownloadResult

File download result.

@dataclass class DownloadResult: success: bool # Download succeeded object_name: str = "" # Object name local_path: str = "" # Local file path file_size: int = 0 # File size error: Optional[str]= None # Error message error_code: Optional[str]= None # Error code

ObjectInfo

Cloud object metadata.

@dataclass class ObjectInfo: name: str # Object name size: int # Object size last_modified: Optional[str]= None # Last modified timestamp content_type: Optional[str]= None # Content type metadata: Dict[str, Any]= field(default_factory=dict) # Metadata

ListResult

List operation result.

@dataclass class ListResult: success: bool # Operation succeeded objects: List[ObjectInfo]= field(default_factory=list) # Objects count: int = 0 # Object count prefix: str = "" # Search prefix error: Optional[str]= None # Error message error_code: Optional[str]= None # Error code

Configuration

StorageConfig

Common storage configuration (base class).

class StorageConfig: provider: StorageProvider# Provider type bucket_name: str # Bucket/container name enabled: bool = True # Enabled flag max_file_size: int = 5*1024*1024*1024 # Max file size (5GB) auto_create_bucket: bool = False # Auto-create bucket prefix: str = "" # Object name prefix custom_domain: Optional[str]= None # Custom domain connection_timeout: int = 30 # Connection timeout (seconds) read_timeout: int = 300 # Read timeout (seconds)

AzureBlobConfig

Azure Blob Storage configuration.

class AzureBlobConfig: bucket_name: str # Container name connection_string: str # 非表示 enabled: bool = True max_file_size: int = 5*1024*1024*1024 auto_create_bucket: bool = False prefix: str = "" custom_domain: Optional[str]= None connection_timeout: int = 30 read_timeout: int = 300 max_block_size: int = 8*1024*1024 # Max block size (8MB) max_single_put_size: int = 256*1024*1024 # Max single upload (256MB) @classmethod def from_dict(data: Dict[str, Any]) -> AzureBlobConfig: ... @property def provider(self) -> StorageProvider: return StorageProvider.AZURE_BLOB

S3Config

AWS S3 configuration.

class S3Config: bucket_name: str # Bucket name aws_access_key_id: str # 非表示 aws_secret_access_key: str # 非表示 region_name: str = "us-east-1" # AWS region enabled: bool = True max_file_size: int = 5*1024*1024*1024 auto_create_bucket: bool = False prefix: str = "" custom_domain: Optional[str]= None connection_timeout: int = 30 read_timeout: int = 300 endpoint_url: Optional[str]= None # S3-compatible endpoint (MinIO, etc.) @classmethod def from_dict(data: Dict[str, Any]) -> S3Config: ... @property def provider(self) -> StorageProvider: return StorageProvider.AWS_S3

GCSConfig

Google Cloud Storage configuration.

class GCSConfig: bucket_name: str # Bucket name project_id: str # GCP project ID enabled: bool = True max_file_size: int = 5*1024*1024*1024 auto_create_bucket: bool = False prefix: str = "" custom_domain: Optional[str]= None connection_timeout: int = 30 read_timeout: int = 300 credentials_path: Optional[str]= None # 非表示 - Path to service account JSON credentials_json: Optional[str]= None # 非表示 - Service account JSON string credentials_base64: Optional[str]= None # 非表示 - Base64-encoded service account JSON @classmethod def from_dict(data: Dict[str, Any]) -> GCSConfig: ... @property def provider(self) -> StorageProvider: return StorageProvider.GCS

Storage Clients

AzureBlobStorageClient

Azure Blob Storage client. Supports async context manager (async with).

class AzureBlobStorageClient: def __init__(config: AzureBlobConfig): ... async def initialize() -> None: ... # Initialize client async def ensure_initialized() -> None: ... # Initialize if needed async def upload_file( file_path: str, prefix: Optional[str]= None ) -> UploadResult: ... async def upload_bytes( data: bytes, object_name: str, content_type: str = "application/octet-stream" ) -> UploadResult: ... async def download_file( object_name: str, local_path: str ) -> DownloadResult: ... async def list_objects( prefix: str = "" ) -> ListResult: ... async def delete_object( object_name: str ) -> Dict[str, Any]: ... async def delete_objects( object_names: List[str] ) -> Dict[str, Any]: ... async def close() -> None: ...
Returns
upload_file() / upload_bytes() → UploadResultsuccess, object_url, file_size, content_type を含む
download_file() → DownloadResultsuccess, local_path, file_size を含む
list_objects() → ListResultsuccess, objects: List[ObjectInfo], count を含む
delete_object() → Dict[str, Any]{"success": bool, "object_name": str}

S3StorageClient

AWS S3 storage client. Supports async context manager (async with).

class S3StorageClient: def __init__(config: S3Config): ... async def initialize() -> None: ... # Initialize client async def ensure_initialized() -> None: ... # Initialize if needed async def upload_file( file_path: str, prefix: Optional[str]= None ) -> UploadResult: ... async def upload_bytes( data: bytes, object_name: str, content_type: str = "application/octet-stream" ) -> UploadResult: ... async def download_file( object_name: str, local_path: str ) -> DownloadResult: ... async def list_objects( prefix: str = "" ) -> ListResult: ... async def delete_object( object_name: str ) -> Dict[str, Any]: ... async def delete_objects( object_names: List[str] ) -> Dict[str, Any]: ... async def close() -> None: ...

GCSStorageClient

Google Cloud Storage client. Supports async context manager (async with).

class GCSStorageClient: def __init__(config: GCSConfig): ... async def initialize() -> None: ... # Initialize client async def ensure_initialized() -> None: ... # Initialize if needed async def upload_file( file_path: str, prefix: Optional[str]= None ) -> UploadResult: ... async def upload_bytes( data: bytes, object_name: str, content_type: str = "application/octet-stream" ) -> UploadResult: ... async def download_file( object_name: str, local_path: str ) -> DownloadResult: ... async def list_objects( prefix: str = "" ) -> ListResult: ... async def delete_object( object_name: str ) -> Dict[str, Any]: ... async def delete_objects( object_names: List[str] ) -> Dict[str, Any]: ... async def close() -> None: ...
S3 ストレージの使用例Python
from agenticstar_platform.storage import (
S3Config, S3StorageClient
)

config = S3Config.from_dict({
"bucket_name": "my-bucket",
"aws_access_key_id": "AKIA...",
"aws_secret_access_key": "secret...",
"region_name": "ap-northeast-1"
})

# async with によるコンテキストマネージャー
async with S3StorageClient(config) as storage:
# ファイルアップロード
upload_result = await storage.upload_file(
file_path="/tmp/report.pdf",
prefix="documents/"
)
if upload_result.success:
print(f"URL: {upload_result.object_url}")

# バイトデータのアップロード
await storage.upload_bytes(
data=b"Hello, World!",
object_name="greetings/hello.txt",
content_type="text/plain"
)

# オブジェクト一覧
list_result = await storage.list_objects(prefix="documents/")
for obj in list_result.objects:
print(f"{obj.name} ({obj.size} bytes)")

# ダウンロード
download = await storage.download_file(
object_name="documents/report.pdf",
local_path="/tmp/downloaded.pdf"
)
エラーハンドリングPython
from agenticstar_platform.storage import (
StorageError, StorageConfigError,
StorageConnectionError, StorageOperationError
)

try:
result = await storage.upload_file("/tmp/large-file.zip")
except StorageConnectionError as e:
print(f"Connection failed: {e}")
except StorageOperationError as e:
print(f"Operation failed: {e}, code: {e.error_code}")
except StorageError as e:
print(f"Storage error: {e}")
Related
RAG Moduleストレージのドキュメントを取得して Embedding に投入
Security Moduleアップロード前のコンテンツモデレーション

auth Auth Module

AGENTIC STAR 認証サービス統合。ユーザー・トークン・デバイス情報を管理。

Exceptions

AuthError (Exception) ├── AuthConfigError └── AuthAPIError ├── AuthUnauthorizedError (401) ├── AuthNotFoundError (404) └── AuthRateLimitError (429)

Configuration

AgenticStarAuthConfig

class AgenticStarAuthConfig: base_url: str api_key: str # 非表示 auth_type: str = "api_key" timeout: float = 30.0 @classmethod def create( base_url: str, api_key: str, auth_type: str = "api_key", timeout: float = 30.0 ) -> AgenticStarAuthConfig: ... @classmethod def from_config(config_path: Optional[Path]= None) -> AgenticStarAuthConfig: ...

Data Models

ApiUser

認証ユーザー情報(Pydantic BaseModel)。

class ApiUser(BaseModel): id: str username: Optional[str]= None email: Optional[str]= None first_name: Optional[str]= None last_name: Optional[str]= None display_name: Optional[str]= None email_verified: bool = False enabled: bool = True created_timestamp: Optional[int]= None attributes: Optional[Dict[str, Any]]= None organization: Optional[str]= None job: Optional[str]= None bio: Optional[str]= None last_login_at: Optional[str]= None

DeviceInfo

デバイス情報。

class DeviceInfo(BaseModel): id: Optional[str]= None device_code: str device_name: Optional[str]= None device_type: Optional[str]= None device_info: Optional[Dict[str, Any]]= None is_active: Optional[bool]= None created_at: Optional[str]= None last_used_at: Optional[str]= None

MCPTokenInfo

MCP トークン情報。

class MCPTokenInfo(BaseModel): access_token: str token_type: str expires_at: datetime scopes: List[str]

Client

AgenticStarAuthClient

class AgenticStarAuthClient: def __init__(config: AgenticStarAuthConfig): ... async def __aenter__() -> AgenticStarAuthClient: ... async def __aexit__(...) -> None: ... async def get_users( page: int = 1, limit: int = 20, search: Optional[str]= None, is_approved: Optional[str]= None, sort_by: Optional[str]= None, order: Optional[str]= None, include_last_login: bool = False ) -> GetUsersResult: ... async def get_user( user_id: str ) -> GetUserResult: ... async def get_mcp_tokens( user_id: str, providers: Optional[List[str]]= None ) -> GetMCPTokensResult: ... async def close() -> None: ...
Returns
get_users() → GetUsersResultsuccess, users: List[ApiUser], pagination: UserPagination
get_user() → GetUserResultsuccess, user: ApiUser, devices: List[DeviceInfo], login_history
get_mcp_tokens() → GetMCPTokensResultsuccess, tokens: Dict[str, MCPTokenInfo], errors
# ユーザー一覧の取得(ページネーション) result = await auth_client.get_users(page=1, limit=20) for user in result.users: print(f"{user.username} ({user.email})")
ユーザー情報取得と MCP トークンPython
from agenticstar_platform.auth import (
AgenticStarAuthConfig, AgenticStarAuthClient
)

config = AgenticStarAuthConfig.from_config("config.toml")

async with AgenticStarAuthClient(config) as auth_client:
# ユーザー情報取得
result = await auth_client.get_user("user-123")
if result.success:
print(f"User: {result.user.username}")

# MCP トークン取得
tokens = await auth_client.get_mcp_tokens(
user_id="user-123",
providers=["slack", "gitlab"],
)
for provider, token in tokens.tokens.items():
print(f"{provider}: expires={token.expires_at}")
Related
Events ModuleWebhookEventHandler の token_provider に認証トークンを供給
Memory Module認証済みユーザーのセマンティックメモリ管理

memory Memory Module

セマンティックメモリ(Mem0 + Qdrant)統合。ユーザーの知識・好み・事実をベクトル検索で想起。

Configuration

SemanticMemoryConfig

@dataclass class LLMProviderConfig: model: str # "azure_openai/gpt-4.1" api_key: str # 非表示 base_url: Optional[str]= None api_version: Optional[str]= None aws_access_key_id: Optional[str]= None # Bedrock 用 aws_secret_access_key: Optional[str]= None # Bedrock 用 aws_region_name: Optional[str]= None # Bedrock 用 @classmethod def from_dict(data: Dict[str, Any]) -> LLMProviderConfig: ... @classmethod def from_toml(toml_path: str, section: str) -> LLMProviderConfig: ...
@dataclass class SemanticMemoryConfig: llm_config: LLMProviderConfig embedder_config: LLMProviderConfig vector_store: Optional[Dict[str, Any]]= None rerank: Optional[Dict[str, Any]]= None custom_fact_extraction_prompt: Optional[str]= None @classmethod def from_dict(data: Dict[str, Any]) -> SemanticMemoryConfig: ... @classmethod def from_toml(toml_path: str, section: str = "memory") -> SemanticMemoryConfig: ...

SemanticMemoryClient

class SemanticMemoryClient: def __init__(config: SemanticMemoryConfig): ... @property def enabled() -> bool: ... # 同期メソッド(await 不要) def add( messages: List[Dict[str, str]], user_id: str, metadata: Optional[Dict[str, Any]]= None ) -> Dict[str, Any]: ... def search( query: str, user_id: str, limit: int = 10 ) -> Dict[str, Any]: ... def get_all( user_id: str ) -> Dict[str, Any]: ... def delete( memory_id: str ) -> Dict[str, Any]: ... def delete_all( user_id: str ) -> Dict[str, Any]: ... # 非同期メソッド async def cleanup() -> None: ...
Returns
add() → Dict[str, Any]Mem0 の結果辞書。messages は OpenAI 形式の [{"role": "user", "content": "..."}]
search() → Dict[str, Any]{"results": [...]}。各要素に memory, score 等を含む
get_all() → Dict[str, Any]ユーザーの全メモリを返す
delete() / delete_all() → Dict[str, Any]削除結果

Exceptions

SemanticMemoryError (Exception) └── SemanticMemoryConfigError

Utility Functions

def normalize_provider(provider: str) -> str

プロバイダー名を正規化(エイリアス対応。例: "Azure_OpenAI""azure_openai")。

def parse_model_string(model: str) -> tuple[str, str]

モデル文字列をプロバイダーとモデル名に分割(例: "azure_openai/gpt-4.1"("azure_openai", "gpt-4.1"))。

def get_api_key(config: LLMProviderConfig) -> str

LLMProviderConfig から API キーを取得(SecretStr 対応)。

def convert_llm_to_mem0(llm_config: LLMProviderConfig) -> Dict[str, Any]

LLMProviderConfig を Mem0 形式の LLM 設定辞書に変換。

def convert_embedder_to_mem0(embedder_config: LLMProviderConfig) -> Dict[str, Any]

LLMProviderConfig を Mem0 形式の Embedder 設定辞書に変換。

セマンティックメモリの使用例Python
from agenticstar_platform.memory import (
SemanticMemoryConfig, SemanticMemoryClient
)

config = SemanticMemoryConfig.from_toml("config.toml")
memory = SemanticMemoryClient(config)

# メモリ追加(同期メソッド)
memory.add(
messages=[
{"role": "user", "content": "私は Python が得意です"},
{"role": "assistant", "content": "承知しました"},
],
user_id="user-123",
)

# メモリ検索(同期メソッド)
result = memory.search(query="ユーザーの技術スキルは?", user_id="user-123")
for r in result.get("results", []):
print(f"{r['memory']}")

# クリーンアップ(非同期)
await memory.cleanup()
Related
Auth Module認証済みユーザー ID をメモリの user_id として使用
RAG Moduleメモリ内容を Embedding 化して類似検索を強化

security Security Module

Content Moderation(Azure / AWS / GCP)+ PII Detection。

Enums

SecurityProvider

AZURE = "azure"
AWS = "aws"
GCP = "gcp"

ContentCategory

コンテンツモデレーション分類。

HATE = "hate"
SEXUAL = "sexual"
SELF_HARM = "self_harm"
VIOLENCE = "violence"
PROFANITY = "profanity"
INSULT = "insult"
THREAT = "threat"

PIICategory

PII(個人識別情報)分類。

PERSON_NAME = "person_name"
EMAIL = "email"
PHONE_NUMBER = "phone_number"
ADDRESS = "address"
DATE_OF_BIRTH = "date_of_birth"
NATIONAL_ID = "national_id"
PASSPORT_NUMBER = "passport_number"
SOCIAL_SECURITY = "social_security"
CREDIT_CARD = "credit_card"
IP_ADDRESS = "ip_address"
API_KEY = "api_key"
CONNECTION_STRING = "connection_string"

Exceptions

SecurityError (Exception) ├── SecurityConfigError └── SecurityAPIError (status_code, error_code)

Result Types

ContentModerationResult

コンテンツモデレーション結果。categories の値は severity(0-6)。

@dataclass class ContentModerationResult: blocked: bool # しきい値以上でTrue categories: Dict[ContentCategory, int]= {} # severity 0-6 threshold: int = 2 # 適用しきい値 raw_response: Dict[str, Any]= {} error: Optional[str]= None error_code: Optional[str]= None

PromptShieldResult

プロンプトインジェクション検出結果。

@dataclass class PromptShieldResult: attack_detected: bool attack_type: Optional[str]= None confidence: float = 0.0 raw_response: Dict[str, Any]= {} error: Optional[str]= None error_code: Optional[str]= None

PIIEntity

検出された PII エンティティ。

@dataclass class PIIEntity: category: PIICategory text: str # 検出されたテキスト offset: int # テキスト内の開始位置 length: int # 検出テキストの長さ confidence: float provider_category: str = "" # プロバイダー固有のカテゴリ名

PIIDetectionResult

PII 検出結果。

@dataclass class PIIDetectionResult: success: bool masked_text: str = "" entities: List[PIIEntity]= [] categories_detected: List[PIICategory]= [] raw_response: Dict[str, Any]= {} error: Optional[str]= None error_code: Optional[str]= None

SecurityCheckResult

統合セキュリティチェック結果。

@dataclass class SecurityCheckResult: allowed: bool # 許可/ブロック violations: List[str]= [] # 違反理由リスト content_moderation: Optional[ContentModerationResult]= None prompt_shield: Optional[PromptShieldResult]= None pii_detection: Optional[PIIDetectionResult]= None

Configuration

AzureSecurityConfig

class AzureSecurityConfig: content_safety_endpoint: str content_safety_api_key: str # 非表示 language_endpoint: Optional[str]= None # PII 検出用 language_api_key: Optional[str]= None # 非表示 enabled: bool = True moderation_threshold: int = 2 # severity 0-6 pii_enabled: bool = True pii_confidence_threshold: float = 0.7 timeout: float = 30.0

AWSSecurityConfig

class AWSSecurityConfig: aws_access_key_id: str = "" # 非表示 aws_secret_access_key: str = "" # 非表示 region_name: str = "us-east-1" guardrail_id: str = "" guardrail_version: str = "DRAFT" enabled: bool = False moderation_threshold: int = 2 pii_enabled: bool = True timeout: float = 30.0

GCPSecurityConfig

class GCPSecurityConfig: project_id: str credentials_path: Optional[str]= None # 非表示 model_armor_template: str = "" model_armor_region: str = "" dlp_location: str = "global" enabled: bool = False moderation_threshold: int = 2 pii_enabled: bool = True timeout: float = 30.0

Clients

AzureSecurityClient

class AzureSecurityClient(SecurityClientBase): def __init__(config: AzureSecurityConfig): ... async def __aenter__() -> AzureSecurityClient: ... async def __aexit__(...) -> None: ... async def check_content_moderation( text: str, threshold: Optional[int]= None ) -> ContentModerationResult: ... async def check_prompt_shield( user_prompt: str, documents: Optional[List[str]]= None ) -> PromptShieldResult: ... async def detect_pii( text: str, mask: bool = True, language: str = "ja", confidence_threshold: Optional[float]= None ) -> PIIDetectionResult: ... async def check_security( text: str, check_moderation: bool = True, check_prompt_shield: bool = True, check_pii: bool = False, fail_on_error: bool = True ) -> SecurityCheckResult: ... async def close() -> None: ...
Returns
check_content_moderation() → ContentModerationResultblocked(ブロック判定), categories(severity 0-6)
check_prompt_shield() → PromptShieldResultattack_detected, attack_type, confidence
detect_pii() → PIIDetectionResultsuccess, entities(PIIEntity リスト), masked_text
check_security() → SecurityCheckResultallowed, violations, 各検出結果
# PII マスク結果の利用 result = await security.detect_pii("Email: user@example.com", mask=True) print(result.masked_text) # → "Email: ***" print(result.entities[0].category) # → PIICategory.EMAIL

AWSSecurityClient / GCPSecurityClient

Azure と同一のインターフェース(SecurityClientBase 継承)。check_content_moderation(), check_prompt_shield(), detect_pii(), check_security(), close() を実装。

コンテンツモデレーション + PII 検出Python
from agenticstar_platform.security import (
AzureSecurityConfig, AzureSecurityClient
)

config = AzureSecurityConfig(
content_safety_endpoint="https://your-cs.cognitiveservices.azure.com/",
content_safety_api_key="your-key",
)

async with AzureSecurityClient(config) as security:
# コンテンツモデレーション
moderation = await security.check_content_moderation(
text="This is a sample message",
threshold=2,
)

if moderation.blocked:
print(f"Content blocked: {moderation.categories}")

# 統合チェック
result = await security.check_security(
text="Contact me at user@example.com or 123-456-7890",
check_pii=True,
)

if not result.allowed:
print(f"Violations: {result.violations}")
if result.pii_detection and result.pii_detection.entities:
print(f"PII found: {result.pii_detection.masked_text}")
Related
RAG ModuleEmbedding 投入前のコンテンツ検証パイプライン
Storage Moduleファイルアップロード前の PII スクリーニング
Events Moduleモデレーション結果をイベントとして配信

common Common Module

共通ユーティリティ。SQL インジェクション防止・シークレットマスク・非同期コンテキストマネージャーミックスイン。

SecretMasker

シークレット値のマスク処理。ログ出力時にパスワードやトークンを安全に表示。

class SecretMasker: def __init__(mask_char: str = "*", visible_chars: int = 4): ... def mask(value: str) -> str: ...
def mask_secret(value: str, visible_chars: int = 4) -> str

ショートカット関数。SecretMasker().mask(value) と同等。

シークレットマスクPython
from agenticstar_platform.common import mask_secret

masked = mask_secret("my-secret-api-key-12345")
# → "my-s********************"

Validation Functions

SQL インジェクション防止用のバリデーション関数群。DataAccess 内部で自動的に使用されますが、カスタムクエリを構築する際にも直接使用できます。

def validate_identifier(name: str) -> str

テーブル名・カラム名のバリデーション。英数字・アンダースコアのみ許可。不正な場合は ValueError を送出。

def validate_identifiers(names: List[str]) -> List[str]

複数の識別子を一括バリデーション。

def validate_order_by(order_by: str) -> str

ORDER BY 句のバリデーション。column_name ASC|DESC 形式のみ許可。

バリデーションPython
from agenticstar_platform.common import validate_identifier, validate_order_by

table = validate_identifier("users") # OK: "users"
validate_identifier("users; DROP TABLE --") # → ValueError

order = validate_order_by("created_at DESC") # OK: "created_at DESC"

AsyncContextManagerMixin

非同期コンテキストマネージャーのミックスインクラス。SDK 内部で async with パターンを統一的に実装するために使用。

class AsyncContextManagerMixin: async def __aenter__(self) -> Self: ... async def __aexit__(...) -> None: ...

エラーハンドリング

全モジュールで共通の例外処理パターンを使用します。各モジュールはモジュール固有の基底例外クラスを持ち、サブクラスで具体的なエラーを区別します。

例外階層一覧

Exception ├── DatabaseError │ ├── ConnectionError │ ├── QueryError │ └── ConfigError ├── RAGError (VectorStoreError) │ ├── EmbeddingError │ ├── QdrantError (VectorStoreConnectionError) │ ├── SearchError │ └── VectorStoreConfigError ├── StorageError │ ├── StorageConfigError │ ├── StorageConnectionError │ └── StorageOperationError ├── AuthError │ ├── AuthConfigError │ └── AuthAPIError │ ├── AuthUnauthorizedError (401) │ ├── AuthNotFoundError (404) │ └── AuthRateLimitError (429) ├── SemanticMemoryError │ └── SemanticMemoryConfigError └── SecurityError ├── SecurityConfigError └── SecurityAPIError

共通パターン

全モジュールで「具体的な例外 → 基底例外 → Exception」の順にキャッチします。

マルチモジュール エラーハンドリングPython
from agenticstar_platform.storage import StorageError, StorageOperationError
from agenticstar_platform.auth import AuthError, AuthAPIError
from agenticstar_platform.security import SecurityError, SecurityAPIError

async def process_document(doc_id: str):
try:
# 1. DB からドキュメント取得
doc = await db.fetch_one(
"SELECT * FROM documents WHERE id = $1", (doc_id,)
)

# 2. セキュリティチェック
moderation = await security.check_content_moderation(doc["content"])
if moderation.blocked:
raise ValueError("Content policy violation")

# 3. Embedding 生成 + ベクトル DB に保存
await qdrant.upsert(
point_id=doc_id,
text=doc["content"],
payload={"source": "documents"}
)

# 4. 処理結果をストレージに保存
await storage.upload_bytes(
data=doc["content"].encode(),
object_name=f"processed/{doc_id}.txt"
)

except StorageOperationError as e:
# ストレージ操作エラー → error_code で判断
logger.error(f"Storage op failed: {e.error_code}")
except AuthAPIError as e:
# 認証 API エラー → status_code で判断
logger.error(f"Auth failed: {e.status_code}")
except (StorageError, AuthError, SecurityError) as e:
# モジュール基底例外でフォールバック
logger.error(f"Module error: {type(e).__name__}: {e}")
except Exception as e:
# 予期しないエラー
logger.critical(f"Unexpected: {e}")
raise

まとめ

AGENTIC STAR Platform SDK v0.5.3 は、以下の 8 つの専門モジュールで構成された包括的なツールキットです:

  • Events — 非同期イベント配信(SSE / Webhook / DB)
  • Database — PostgreSQL + Azure AD 認証 + 専門アクセス層(Config / Execution / Telemetry / PodRuntime)
  • RAG — Embedding + Vector Search(Qdrant)
  • Storage — Cloud Storage(Azure / S3 / GCS)
  • Auth — AGENTIC STAR 認証サービス統合
  • Memory — Semantic Memory(Mem0 + Qdrant)
  • Security — Content Moderation + PII Detection(Azure / AWS / GCP マルチクラウド対応)
  • Common — 共通ユーティリティ(SQL インジェクション防止・シークレットマスク)

各モジュールは独立してインストール・利用可能であり、組み合わせることで強力な AI エージェントアプリケーションを構築できます。SDK はインフラのみを提供し、エージェントロジックは開発者が自由に設計します。