メインコンテンツまでスキップ

SDK API リファレンス

agenticstar-platform SDK(v0.3.1)の全モジュール・クラス・メソッドの完全仕様です。

インストール

pip を使用して SDK をインストールしてください。

基本インストールcurl
pip install agenticstar-platform==0.3.1

特定モジュールのみインストールする場合は、extras を指定してください。

Extra説明
[[db]]PostgreSQL database support
[[rag]]RAG (embedding + vector search) support
[[storage]]Cloud storage (Azure, S3, GCS) support
[[security]]Security (content moderation, PII detection) support
[[auth]]AgenticStar Auth service integration
[[memory]]Memory (Mem0 + Graphiti) support
[[all]]全モジュール
特定モジュールのインストール例curl
# RAG モジュール
pip install agenticstar-platform[rag]==0.3.1

# DB + RAG + Storage
pip install agenticstar-platform[db,rag,storage]==0.3.1

# 全モジュール
pip install agenticstar-platform[all]==0.3.1

モジュール一覧

SDK は以下の7つのモジュールで構成されています。

モジュール説明主要クラス
events非同期イベント配信システム(SSE / Webhook)EventEmitter, StreamingEvent
dbPostgreSQL 接続 + Azure AD 認証PostgreSQLManager, DataAccess
ragRAG(embedding + vector search via Qdrant)EmbeddingGenerator, QdrantManager
storageクラウドストレージ(Azure, S3, GCS)AzureBlobStorageClient, S3StorageClient
authAgenticStar 認証 API クライアントAgenticStarAuthClient
memoryセマンティック+エピソディックメモリSemanticMemoryClient, EpisodicMemoryClient
securityContent Moderation + PII DetectionAzureSecurityClient

events Events Module

非同期イベント配信システム。EventEmitter でイベントを発行し、SSE / Webhook / DB に配信します。

Enums

EventType

フロントエンド UI に表示されるイベントタイプ。

PHASE_START = "phase_start"
PROGRESS_UPDATE = "progress_update"
THOUGHT_MESSAGE = "thought_message"
COMPLETION_SUCCESS = "completion_success"
COMPLETION_FAILURE = "completion_failure"
USER_INTERACTION_REQUIRED = "user_interaction_required"
UNEXPECTED_ERROR = "unexpected_error"
HITL_REQUIRED_BROWSER_VNC = "hitl_required_browser_vnc"
HITL_REQUIRED_BROWSER_CLI = "hitl_required_browser_cli"
HITL_COMPLETED = "hitl_completed"
FILE_CREATED = "file_created"
PERMISSION_REQUEST = "permission_request"
PERMISSION_RESPONSE = "permission_response"
TOOL_START = "tool_start"
TOOL_RESULT = "tool_result"
FINAL_RESULT = "final_result"

SubEventType

フロントエンド UI の細分化用サブイベントタイプ。

SEARCH_WEB = "search_web"
COMMAND_EXECUTION = "command_execution"
FILE_OPERATION = "file_operation"
LOCAL_ASSISTANT = "local_assistant"
MCP_TOOL = "mcp_tool"
FILE_EDITED = "file_edited"
FILE_READ = "file_read"
FILE_SEARCHED = "file_searched"
BASH_EXECUTED = "bash_executed"
WEB_FETCHED = "web_fetched"
TASK_LAUNCHED = "task_launched"
TODO_UPDATED = "todo_updated"
VIDEO_GENERATED = "video_generated"
IMAGE_GENERATED = "image_generated"
SLIDE_CREATED = "slide_created"
MACOS_AUTOMATION = "macos_automation"

Data Classes

StreamingEvent

リアルタイム SSE 配信用のイベントデータ。

@dataclass class StreamingEvent: event_type: EventType execution_id: str message: str timestamp: float metadata: Optional[Dict[str, Any]]= None sub_event_type: Optional[SubEventType]= None def to_dict() -> Dict[str, Any]: ...

SequencedEvent

順序保証付きイベント。

@dataclass class SequencedEvent: sequence: int # 0以上 event_type: EventType execution_id: str message: str timestamp: float metadata: Optional[Dict[str, Any]]= None sub_event_type: Optional[SubEventType]= None def __post_init__(): ... # sequence >= 0 を検証 def to_streaming_event() -> StreamingEvent: ... def to_dict() -> Dict[str, Any]: ...

ExecutionMessage

マーケットプレイス UI 統合用のデータベース格納メッセージ。

@dataclass class ExecutionMessage: user_id: str conversation_id: str message_id: str chunk_type: str content_data: Dict[str, Any] def to_dict() -> Dict[str, Any]: ... @classmethod def from_streaming_event( event: StreamingEvent, user_id: str, conversation_id: str, message_id: str, chunk_type: Optional[str]= None ) -> ExecutionMessage: ...

Classes

EventEmitter

非同期イベントキュー。イベント発行・消費を非ブロッキングで実行。

class EventEmitter: def __init__( execution_id: str, handler: Optional[EventHandler]= None ): ... @property def is_completed() -> bool: ... @property def sequence_number() -> int: ... async def emit_event( event_type: EventType, message: str, metadata: Optional[Dict[str, Any]]= None, sub_event_type: Optional[SubEventType]= None ) -> None: ... async def consume_events( timeout: float = 0.1 ) -> AsyncGenerator[str,None]: ... def mark_completed() -> None: ... async def cleanup() -> None: ... async def __aenter__() -> EventEmitter: ... async def __aexit__(...) -> None: ...
Returns
emit_event() → Noneイベントをキューに追加。ハンドラーが設定されている場合は非同期で呼び出される
consume_events() → AsyncGenerator[str, None]SSE フォーマットの文字列を非同期に yield。完了時に自動停止
Context manager 対応: async with EventEmitter(...) as emitter:
イベント発行と消費Python
from agenticstar_platform import EventEmitter, EventType

emitter = EventEmitter(execution_id="exec-abc-123")

# イベント発行
await emitter.emit_event(
event_type=EventType.PHASE_START,
message="ドキュメント分析を開始します",
metadata={"phase": "analysis", "total_pages": 42}
)

# イベント消費(SSE ストリーミング)
async for chunk in emitter.consume_events():
print(chunk)

DatabaseEventHandler

PostgreSQL の execution_messages テーブルにイベントを永続化。

class DatabaseEventHandler: def __init__( data_access: Any, user_id: str, conversation_id: str, message_id: str, table_name: str = "execution_messages", chunk_type_map: Optional[Dict[EventType, str]]= None ): ... async def __call__(event: StreamingEvent) -> Optional[str]: ...

WebhookEventHandler

HTTP webhook エンドポイントにイベントを送信。

class WebhookEventHandler: def __init__( webhook_url: str, conversation_id: str, message_id: str, headers: Optional[Dict[str, str]]= None, timeout_seconds: int = 30, message_type: str = "append_message", token_provider: Optional[Callable]= None, chunk_type_map: Optional[Dict[EventType, str]]= None ): ... async def __call__(event: StreamingEvent) -> Optional[str]: ...

CompositeEventHandler

複数の EventHandler を並列実行。

class CompositeEventHandler: def __init__( handlers: List[Callable], return_first_chunk: bool = True ): ... async def __call__(event: StreamingEvent) -> Optional[str]: ...

Factory Functions

def create_sse_handler() -> EventHandler

SSE (Server-Sent Events) 形式のハンドラーを作成。

def create_json_handler() -> EventHandler

JSON 形式のハンドラーを作成。

def create_marketplace_handler( data_access: Any, webhook_url: str, user_id: str, conversation_id: str, message_id: str, token_provider: Optional[Callable]= None ) -> CompositeEventHandler

マーケットプレイス UI 用の複合ハンドラー(DB + Webhook)を作成。

Protocol

EventHandler

イベント処理の汎用インターフェース。

class EventHandler(Protocol): async def __call__( self, event: StreamingEvent ) -> Optional[str]: ...
Related
Database ModuleDatabaseEventHandler の data_access に DataAccess を渡して永続化
Storage Moduleイベントログのクラウドストレージへのアーカイブ

db Database Module

PostgreSQL 接続プール + Azure AD 認証。

Exceptions

DatabaseError (Exception) ├── ConnectionError ├── QueryError └── ConfigError

Configuration

AzureADConfig

Azure Active Directory 認証設定。

@dataclass class AzureADConfig: tenant_id: str client_id: str client_secret: str # 非表示 username: str = "" @classmethod def from_dict(data: Dict[str, Any]) -> AzureADConfig: ... @classmethod def from_toml(toml_path: str, section: str = "azure_ad") -> AzureADConfig: ...

PostgreSQLConfig

PostgreSQL 接続設定(Azure AD サポート)。

class PostgreSQLConfig: provider: str = "postgresql" host: str = "localhost" port: int = 5432 database: str = "agenticstar_db" use_azure_ad: bool = False username: Optional[str]= None password: Optional[str]= None # 非表示 pool_min_size: int = 5 pool_max_size: int = 20 command_timeout: int = 60 pool_timeout: Optional[int]= 30 max_overflow: Optional[int]= 10 azure_ad: Optional[AzureADConfig]= None api_proxy_url: Optional[str]= None ssl_mode: str = "require" @classmethod def from_dict(data: Dict[str, Any]) -> PostgreSQLConfig: ... @classmethod def from_toml(toml_path: str, section: str = "database") -> PostgreSQLConfig: ... @classmethod def from_env(prefix: str = "DB_") -> PostgreSQLConfig: ...

TOML 設定例:

config.toml - PostgreSQLToml
[database]
provider = "postgresql"
host = "db.example.com"
port = 5432
database = "agenticstar_db"
use_azure_ad = true
pool_min_size = 5
pool_max_size = 20
command_timeout = 60

[azure_ad]
tenant_id = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
client_id = "yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy"
client_secret = "your-secret-here"
username = "dbuser@example.onmicrosoft.com"

Connection Management

PostgreSQLManager

非同期 PostgreSQL 接続プール(Azure AD トークン管理対応)。

class PostgreSQLManager: def __init__(config: PostgreSQLConfig): ... @property def config() -> PostgreSQLConfig: ... @property def pool() -> Optional[asyncpg.Pool]: ... async def __aenter__() -> PostgreSQLManager: ... async def __aexit__(...) -> None: ... async def initialize() -> None: ... async def close() -> None: ... async def execute_query( query: str, params: tuple = () ) -> Dict[str, Any]: ... async def fetch_one( query: str, params: tuple = () ) -> Optional[Dict[str, Any]]: ... async def fetch_all( query: str, params: tuple = () ) -> List[Dict[str, Any]]: ... async def execute( query: str, params: tuple = () ) -> str: ...
Returns
execute_query() → Dict[str, Any]{"success": bool, "data": list, "count": int}
fetch_one() → Optional[Dict[str, Any]]1行の結果辞書、または該当なしで None
fetch_all() → List[Dict[str, Any]]結果行のリスト(0件の場合は空リスト)
execute() → strPostgreSQL のステータス文字列(例: "INSERT 0 1")
Context manager 対応: async with PostgreSQLManager(config) as mgr:
データベース接続と Query 実行Python
from agenticstar_platform.db import PostgreSQLConfig, PostgreSQLManager

config = PostgreSQLConfig.from_toml("config.toml")

async with PostgreSQLManager(config) as mgr:
await mgr.initialize()

# SELECT クエリ
result = await mgr.fetch_all(
"SELECT * FROM conversations WHERE user_id = $1",
("user-123",)
)

# INSERT
await mgr.execute(
"INSERT INTO events (event_type, data) VALUES ($1, $2)",
("phase_start", '{}')
)

Data Access Layer

DataAccess

SQL インジェクション防止を備えた汎用テーブル操作ラッパー。

class DataAccess: def __init__( config: PostgreSQLConfig, *, use_proxy: bool = False, token_provider: Optional[Callable]= None ): ... async def __aenter__() -> DataAccess: ... async def __aexit__(...) -> None: ... async def initialize() -> None: ... async def close() -> None: ... def is_initialized() -> bool: ... async def ensure_initialized() -> None: ... async def execute_query( query: str, params: tuple = () ) -> Dict[str, Any]: ... async def insert( table: str, data: Dict[str, Any], returning: Optional[List[str]]= None ) -> Dict[str, Any]: ... async def upsert( table: str, data: Dict[str, Any], conflict_columns: List[str], update_columns: Optional[List[str]]= None, returning: Optional[List[str]]= None ) -> Dict[str, Any]: ... async def select( table: str, columns: Optional[List[str]]= None, where: Optional[Dict[str, Any]]= None, order_by: Optional[str]= None, limit: Optional[int]= None ) -> Dict[str, Any]: ... async def update( table: str, data: Dict[str, Any], where: Dict[str, Any] ) -> Dict[str, Any]: ... async def delete( table: str, where: Dict[str, Any] ) -> Dict[str, Any]: ...
Returns
insert() / upsert() → Dict[str, Any]{"success": bool, "data": dict}。returning 指定時はカラム値を含む
select() → Dict[str, Any]{"success": bool, "data": list, "count": int}
update() / delete() → Dict[str, Any]{"success": bool, "affected_rows": int}

主要メソッドの使用例:

# INSERT result = await da.insert("users", {"name": "Alice"}, returning=["id"]) # UPSERT(競合時は更新) result = await da.upsert("users", {"email": "a@example.com", "name": "Alice"}, conflict_columns=["email"]) # SELECT with WHERE + ORDER + LIMIT result = await da.select("users", columns=["id", "name"], where={"active": True}, order_by="created_at DESC", limit=10)

ConfigAccess

設定情報へのアクセス層。

class ConfigAccess: async def get_agent_instructions(agent_type: str) -> Dict[str, Any]: ...

Error Handling

データベース操作の例外処理。

Database Error HandlingPython
from agenticstar_platform.db import (
PostgreSQLConfig, PostgreSQLManager,
DatabaseError, ConnectionError, QueryError
)

config = PostgreSQLConfig.from_toml("config.toml")

try:
async with PostgreSQLManager(config) as db:
result = await db.fetch_one(
"SELECT * FROM users WHERE id = $1",
("user-123",)
)
if result:
print(f"User: {result['username']}")
except ConnectionError as e:
print(f"Connection failed: {e}")
# Retry logic
except QueryError as e:
print(f"Query error: {e}")
except DatabaseError as e:
print(f"Database error: {e}")
Related
Events ModuleDatabaseEventHandler でイベントを DB に永続化
RAG ModuleDB から取得したデータを Embedding 化して検索可能にする

rag RAG Module

RAG (Retrieval-Augmented Generation): Embedding + Vector Search via Qdrant。

Enums

VectorStoreProvider

QDRANT = "qdrant"
WEAVIATE = "weaviate"
MILVUS = "milvus"

Exceptions

RAGError (Exception) ├── EmbeddingError ├── QdrantError └── SearchError

Data Classes

SearchResult

単一の検索結果。

@dataclass class SearchResult: point_id: str payload: Dict[str, Any] score: float similarity: float = 0.0

UpsertResult

Upsert 操作の結果。

@dataclass class UpsertResult: success: bool point_id: str = "" error: Optional[str]= None error_code: Optional[str]= None

SearchResponse

検索操作の結果。

@dataclass class SearchResponse: success: bool results: List[SearchResult]= field(default_factory=list) total_found: int = 0 query: str = "" error: Optional[str]= None error_code: Optional[str]= None

Embedding Configuration

EmbeddingConfig

Azure OpenAI エンベッディング設定。

class EmbeddingConfig: base_url: str api_key: str # 非表示 model: str = "text-embedding-ada-002" api_version: str = "2024-02-15-preview" max_cache_size: int = 10000 dimensions: int = 1536 max_retries: int = 5 base_delay: float = 1.0 max_delay: float = 60.0 @classmethod def from_dict(data: Dict[str, Any]) -> EmbeddingConfig: ... @classmethod def from_toml(toml_path: str, section: str = "rag.embedding") -> EmbeddingConfig: ...

EmbeddingGenerator

キャッシング機能とリトライロジック付きエンベッディング生成。

class EmbeddingGenerator: def __init__(config: EmbeddingConfig): ... async def generate(text: str) -> List[float]: ... async def generate_batch( texts: List[str], batch_size: int = 25 ) -> List[List[float]]: ... def clear_cache() -> None: ...
Returns
generate() → List[float]ベクトル(次元数は config.dimensions、デフォルト 1536)。キャッシュ済みの場合は即返却
generate_batch() → List[List[float]]入力テキスト順のベクトルリスト。batch_size 単位で API 呼び出し(デフォルト 25)
# 単一テキストの Embedding vector = await generator.generate("Hello, world!") # → [0.012, -0.034, ...] # バッチ Embedding vectors = await generator.generate_batch(["text1", "text2", "text3"])

Vector Database

QdrantConfig

Qdrant データベース設定。

class QdrantConfig: url: str collection_name: str vector_size: int = 1536 distance: str = "cosine" # cosine, euclid, dot on_disk_vectors: bool = False on_disk_payload: bool = True hnsw_m: int = 16 hnsw_ef_construct: int = 256 payload_indexes: List[PayloadIndexConfig] auth_token_provider: Optional[Callable[[], str]]= None @classmethod def from_dict(data: Dict[str, Any]) -> QdrantConfig: ... @classmethod def from_toml(toml_path: str, section: str = "rag.qdrant") -> QdrantConfig: ...

PayloadIndexConfig

Payload フィールドのインデックス設定。

@dataclass class PayloadIndexConfig: field_name: str field_schema: str = "keyword" # keyword, integer, float, bool @classmethod def from_dict(data: Dict[str, Any]) -> PayloadIndexConfig: ...

TOML 設定例:

config.toml - RAG (Qdrant)Toml
[embedding]
provider = "openai"
model = "text-embedding-3-small"
api_key = "sk-..."
dimension = 1536

[qdrant]
host = "qdrant.example.com"
port = 6333
api_key = "your-qdrant-key"
use_ssl = true

QdrantManager

Qdrant ベクトル検索クライアント。

class QdrantManager: def __init__( config: QdrantConfig, embedding_generator: EmbeddingGenerator ): ... async def __aenter__() -> QdrantManager: ... async def __aexit__(...) -> None: ... def is_initialized() -> bool: ... async def initialize() -> None: ... async def ensure_initialized() -> None: ... async def upsert( point_id: str, text: str, payload: Dict[str, Any] ) -> Dict[str, Any]: ... async def batch_upsert( items: List[Dict[str, Any]], batch_size: int = 256 ) -> Dict[str, Any]: ... async def search( query_text: str, limit: int = 10, score_threshold: float = 0.4, filter_conditions: Optional[Dict[str, Any]]= None ) -> Dict[str, Any]: ... async def delete( point_ids: List[str] ) -> Dict[str, Any]: ... async def get_statistics() -> Dict[str, Any]: ... async def close() -> None: ...
Returns
upsert() → Dict[str, Any]{"success": bool, "point_id": str}
batch_upsert() → Dict[str, Any]{"success": bool, "upserted_count": int, "errors": list}
search() → Dict[str, Any]{"success": bool, "data": List[SearchResult], "total_found": int}
get_statistics() → Dict[str, Any]{"vectors_count": int, "indexed_vectors_count": int, "points_count": int, ...}
Context manager 対応: async with QdrantManager(config, generator) as qdrant:
RAG: Embedding + Qdrant SearchPython
from agenticstar_platform.rag import (
EmbeddingConfig, EmbeddingGenerator,
QdrantConfig, QdrantManager
)

emb_config = EmbeddingConfig.from_toml("config.toml")
qd_config = QdrantConfig.from_toml("config.toml")

generator = EmbeddingGenerator(emb_config)

async with QdrantManager(qd_config, generator) as qdrant:
# ドキュメントの追加
await qdrant.upsert(
point_id="doc-001",
text="AgenticStar は AI エージェント開発プラットフォームです",
payload={"source": "docs", "category": "overview"}
)

# セマンティック検索
results = await qdrant.search(
query_text="AI プラットフォームとは?",
limit=5,
score_threshold=0.5
)
for r in results["data"]:
print(f"Score: {r['score']}, Text: {r['payload']['text']}")
Related
Database ModuleDB から取得したデータを RAG に投入
Storage Moduleドキュメントファイルをストレージから取得して Embedding 化
Security ModuleEmbedding 前にコンテンツの安全性を検証

storage Storage Module

クラウドストレージ統合(Azure Blob Storage / AWS S3 / Google Cloud Storage)。

Enums

StorageProvider

Cloud storage provider type.

AZURE_BLOB = "azure_blob"
AWS_S3 = "aws_s3"
GCS = "gcs"

Exceptions

StorageError (Exception) - Base exception ├── StorageConfigError - Configuration error ├── StorageConnectionError - Connection error └── StorageOperationError - Operation error with error_code attribute

Result Types

UploadResult

File upload result.

@dataclass class UploadResult: success: bool # Upload succeeded object_name: str = "" # Object name object_url: str = "" # Object URL file_size: int = 0 # File size content_type: str = "" # Content type error: Optional[str]= None # Error message error_code: Optional[str]= None # Error code metadata: Dict[str, Any]= field(default_factory=dict) # Metadata

DownloadResult

File download result.

@dataclass class DownloadResult: success: bool # Download succeeded object_name: str = "" # Object name local_path: str = "" # Local file path file_size: int = 0 # File size error: Optional[str]= None # Error message error_code: Optional[str]= None # Error code

ObjectInfo

Cloud object metadata.

@dataclass class ObjectInfo: name: str # Object name size: int # Object size last_modified: Optional[str]= None # Last modified timestamp content_type: Optional[str]= None # Content type metadata: Dict[str, Any]= field(default_factory=dict) # Metadata

ListResult

List operation result.

@dataclass class ListResult: success: bool # Operation succeeded objects: List[ObjectInfo]= field(default_factory=list) # Objects count: int = 0 # Object count prefix: str = "" # Search prefix error: Optional[str]= None # Error message error_code: Optional[str]= None # Error code

Configuration

StorageConfig

Common storage configuration (base class).

class StorageConfig: provider: StorageProvider# Provider type bucket_name: str # Bucket/container name enabled: bool = True # Enabled flag max_file_size: int = 5*1024*1024*1024 # Max file size (5GB) auto_create_bucket: bool = False # Auto-create bucket prefix: str = "" # Object name prefix custom_domain: Optional[str]= None # Custom domain connection_timeout: int = 30 # Connection timeout (seconds) read_timeout: int = 300 # Read timeout (seconds)

AzureBlobConfig

Azure Blob Storage configuration.

class AzureBlobConfig: bucket_name: str # Container name connection_string: str # 非表示 enabled: bool = True max_file_size: int = 5*1024*1024*1024 auto_create_bucket: bool = False prefix: str = "" custom_domain: Optional[str]= None connection_timeout: int = 30 read_timeout: int = 300 max_block_size: int = 8*1024*1024 # Max block size (8MB) max_single_put_size: int = 256*1024*1024 # Max single upload (256MB) @classmethod def from_dict(data: Dict[str, Any]) -> AzureBlobConfig: ... @property def provider(self) -> StorageProvider: return StorageProvider.AZURE_BLOB

S3Config

AWS S3 configuration.

class S3Config: bucket_name: str # Bucket name aws_access_key_id: str # 非表示 aws_secret_access_key: str # 非表示 region_name: str = "us-east-1" # AWS region enabled: bool = True max_file_size: int = 5*1024*1024*1024 auto_create_bucket: bool = False prefix: str = "" custom_domain: Optional[str]= None connection_timeout: int = 30 read_timeout: int = 300 endpoint_url: Optional[str]= None # S3-compatible endpoint (MinIO, etc.) @classmethod def from_dict(data: Dict[str, Any]) -> S3Config: ... @property def provider(self) -> StorageProvider: return StorageProvider.AWS_S3

GCSConfig

Google Cloud Storage configuration.

class GCSConfig: bucket_name: str # Bucket name project_id: str # GCP project ID enabled: bool = True max_file_size: int = 5*1024*1024*1024 auto_create_bucket: bool = False prefix: str = "" custom_domain: Optional[str]= None connection_timeout: int = 30 read_timeout: int = 300 credentials_path: Optional[str]= None # 非表示 - Path to service account JSON credentials_json: Optional[str]= None # 非表示 - Service account JSON string credentials_base64: Optional[str]= None # 非表示 - Base64-encoded service account JSON @classmethod def from_dict(data: Dict[str, Any]) -> GCSConfig: ... @property def provider(self) -> StorageProvider: return StorageProvider.GCS

Storage Clients

AzureBlobStorageClient

Azure Blob Storage client. Supports async context manager (async with).

class AzureBlobStorageClient: def __init__(config: AzureBlobConfig): ... async def initialize() -> None: ... # Initialize client async def ensure_initialized() -> None: ... # Initialize if needed async def upload_file( file_path: str, prefix: Optional[str]= None ) -> UploadResult: ... async def upload_bytes( data: bytes, object_name: str, content_type: str = "application/octet-stream" ) -> UploadResult: ... async def download_file( object_name: str, local_path: str ) -> DownloadResult: ... async def list_objects( prefix: str = "" ) -> ListResult: ... async def delete_object( object_name: str ) -> Dict[str, Any]: ... async def delete_objects( object_names: List[str] ) -> Dict[str, Any]: ... async def close() -> None: ...
Returns
upload_file() / upload_bytes() → UploadResultsuccess, object_url, file_size, content_type を含む
download_file() → DownloadResultsuccess, local_path, file_size を含む
list_objects() → ListResultsuccess, objects: List[ObjectInfo], count を含む
delete_object() → Dict[str, Any]{"success": bool, "object_name": str}

S3StorageClient

AWS S3 storage client. Supports async context manager (async with).

class S3StorageClient: def __init__(config: S3Config): ... async def initialize() -> None: ... # Initialize client async def ensure_initialized() -> None: ... # Initialize if needed async def upload_file( file_path: str, prefix: Optional[str]= None ) -> UploadResult: ... async def upload_bytes( data: bytes, object_name: str, content_type: str = "application/octet-stream" ) -> UploadResult: ... async def download_file( object_name: str, local_path: str ) -> DownloadResult: ... async def list_objects( prefix: str = "" ) -> ListResult: ... async def delete_object( object_name: str ) -> Dict[str, Any]: ... async def delete_objects( object_names: List[str] ) -> Dict[str, Any]: ... async def close() -> None: ...

GCSStorageClient

Google Cloud Storage client. Supports async context manager (async with).

class GCSStorageClient: def __init__(config: GCSConfig): ... async def initialize() -> None: ... # Initialize client async def ensure_initialized() -> None: ... # Initialize if needed async def upload_file( file_path: str, prefix: Optional[str]= None ) -> UploadResult: ... async def upload_bytes( data: bytes, object_name: str, content_type: str = "application/octet-stream" ) -> UploadResult: ... async def download_file( object_name: str, local_path: str ) -> DownloadResult: ... async def list_objects( prefix: str = "" ) -> ListResult: ... async def delete_object( object_name: str ) -> Dict[str, Any]: ... async def delete_objects( object_names: List[str] ) -> Dict[str, Any]: ... async def close() -> None: ...
S3 ストレージの使用例Python
from agenticstar_platform.storage import (
S3Config, S3StorageClient
)

config = S3Config.from_dict({
"bucket_name": "my-bucket",
"aws_access_key_id": "AKIA...",
"aws_secret_access_key": "secret...",
"region_name": "ap-northeast-1"
})

# async with によるコンテキストマネージャー
async with S3StorageClient(config) as storage:
# ファイルアップロード
upload_result = await storage.upload_file(
file_path="/tmp/report.pdf",
prefix="documents/"
)
if upload_result.success:
print(f"URL: {upload_result.object_url}")

# バイトデータのアップロード
await storage.upload_bytes(
data=b"Hello, World!",
object_name="greetings/hello.txt",
content_type="text/plain"
)

# オブジェクト一覧
list_result = await storage.list_objects(prefix="documents/")
for obj in list_result.objects:
print(f"{obj.name} ({obj.size} bytes)")

# ダウンロード
download = await storage.download_file(
object_name="documents/report.pdf",
local_path="/tmp/downloaded.pdf"
)
エラーハンドリングPython
from agenticstar_platform.storage import (
StorageError, StorageConfigError,
StorageConnectionError, StorageOperationError
)

try:
result = await storage.upload_file("/tmp/large-file.zip")
except StorageConnectionError as e:
print(f"Connection failed: {e}")
except StorageOperationError as e:
print(f"Operation failed: {e}, code: {e.error_code}")
except StorageError as e:
print(f"Storage error: {e}")
Related
RAG Moduleストレージのドキュメントを取得して Embedding に投入
Security Moduleアップロード前のコンテンツモデレーション

auth Auth Module

AgenticStar 認証サービス統合。ユーザー・トークン・デバイス情報を管理。

Exceptions

AuthError (Exception) ├── AuthenticationError ├── AuthorizationError ├── ConfigError └── TokenError

Configuration

AgenticStarAuthConfig

class AgenticStarAuthConfig: auth_service_url: str client_id: str client_secret: str # 非表示 timeout_seconds: int = 30 @classmethod def from_dict(data: Dict[str, Any]) -> AgenticStarAuthConfig: ... @classmethod def from_toml(toml_path: str, section: str = "auth") -> AgenticStarAuthConfig: ... @classmethod def from_env(prefix: str = "AUTH_") -> AgenticStarAuthConfig: ...

Data Models

ApiUser

認証ユーザー情報。

@dataclass class ApiUser: user_id: str email: str username: str created_at: str # ISO 8601 last_login_at: Optional[str]= None roles: List[str]= field(default_factory=list) metadata: Optional[Dict[str, Any]]= None

DeviceInfo

デバイス情報。

@dataclass class DeviceInfo: device_id: str device_name: str device_type: str # "mobile", "desktop", "tablet" os: str browser: Optional[str]= None ip_address: str = "" last_active_at: Optional[str]= None

LoginHistoryEntry

ログイン履歴レコード。

@dataclass class LoginHistoryEntry: timestamp: str # ISO 8601 device_info: DeviceInfo status: str # "success", "failed" failure_reason: Optional[str]= None

MCPTokenInfo

MCP(マーケットプレイス接続プロトコル)トークン情報。

@dataclass class MCPTokenInfo: token_id: str token_name: str created_at: str expires_at: Optional[str] scopes: List[str] last_used_at: Optional[str]= None is_revoked: bool = False

Client

AgenticStarAuthClient

class AgenticStarAuthClient: def __init__(config: AgenticStarAuthConfig): ... async def __aenter__() -> AgenticStarAuthClient: ... async def __aexit__(...) -> None: ... async def authenticate( username: str, password: str ) -> Dict[str, Any]: ... async def get_user( user_id: str ) -> ApiUser: ... async def list_users( limit: int = 50, offset: int = 0 ) -> UserPagination: ... async def get_user_devices( user_id: str ) -> List[DeviceInfo]: ... async def get_login_history( user_id: str, limit: int = 50 ) -> List[LoginHistoryEntry]: ... async def create_mcp_token( user_id: str, token_name: str, scopes: List[str], expires_in_days: Optional[int]= None ) -> MCPTokenInfo: ... async def list_mcp_tokens( user_id: str ) -> GetMCPTokensResult: ... async def revoke_mcp_token( token_id: str ) -> None: ...
Returns
authenticate() → Dict[str, Any]{"access_token": str, "refresh_token": str, "expires_in": int, "token_type": "Bearer"}
get_user() → ApiUserユーザー情報(user_id, email, username, roles 等)
list_users() → UserPaginationページネーション付きユーザーリスト(users, total, limit, offset)
create_mcp_token() → MCPTokenInfo作成されたトークン情報(token_id, scopes, expires_at)
# ユーザー一覧の取得(ページネーション) page = await auth_client.list_users(limit=20, offset=0) for user in page.users: print(f"{user.username} ({user.email})")
ユーザー認証と MCP トークン作成Python
from agenticstar_platform.auth import (
AgenticStarAuthConfig, AgenticStarAuthClient
)

config = AgenticStarAuthConfig.from_toml("config.toml")

async with AgenticStarAuthClient(config) as auth_client:
# ユーザー認証
token_data = await auth_client.authenticate(
username="user@example.com",
password="secure_password"
)

access_token = token_data["access_token"]

# ユーザー情報取得
user = await auth_client.get_user("user-123")

# MCP トークン作成
mcp_token = await auth_client.create_mcp_token(
user_id="user-123",
token_name="API Integration",
scopes=["chat:exec", "chat:file"],
expires_in_days=90
)

print(f"Token ID: {mcp_token.token_id}")
Related
Events ModuleWebhookEventHandler の token_provider に認証トークンを供給
Memory Module認証済みユーザーのメモリ管理

memory Memory Module

セマンティックメモリ(Mem0) + エピソディックメモリ(Graphiti)統合。

Semantic Memory (Mem0)

SemanticMemoryConfig

class SemanticMemoryConfig: api_key: str # 非表示 base_url: str = "https://api.mem0.ai/v1" timeout_seconds: int = 30 @classmethod def from_dict(data: Dict[str, Any]) -> SemanticMemoryConfig: ... @classmethod def from_toml(toml_path: str, section: str = "memory_semantic") -> SemanticMemoryConfig: ...

SemanticMemoryClient

class SemanticMemoryClient: def __init__(config: SemanticMemoryConfig): ... async def __aenter__() -> SemanticMemoryClient: ... async def __aexit__(...) -> None: ... async def add_memory( user_id: str, text: str, metadata: Optional[Dict[str, Any]]= None ) -> Dict[str, Any]: ... async def retrieve( user_id: str, query: str, limit: int = 10 ) -> List[Dict[str, Any]]: ... async def delete_memory( memory_id: str ) -> None: ... async def update_memory( memory_id: str, text: str ) -> Dict[str, Any]: ...
Returns
add_memory() → Dict[str, Any]{"memory_id": str, "status": "created"}
retrieve() → List[Dict[str, Any]]関連度順のメモリリスト。各要素に memory_id, text, score を含む

Episodic Memory (Graphiti)

EpisodicMemoryConfig

class EpisodicMemoryConfig: api_key: str # 非表示 base_url: str = "https://api.graphiti.ai/v1" timeout_seconds: int = 30 @classmethod def from_dict(data: Dict[str, Any]) -> EpisodicMemoryConfig: ... @classmethod def from_toml(toml_path: str, section: str = "memory_episodic") -> EpisodicMemoryConfig: ...

EpisodicMemoryClient

class EpisodicMemoryClient: def __init__(config: EpisodicMemoryConfig): ... async def __aenter__() -> EpisodicMemoryClient: ... async def __aexit__(...) -> None: ... async def add_episode( user_id: str, content: str, timestamp: Optional[str]= None, metadata: Optional[Dict[str, Any]]= None ) -> Dict[str, Any]: ... async def query_episodes( user_id: str, query: str, limit: int = 10 ) -> List[Dict[str, Any]]: ... async def delete_episode( episode_id: str ) -> None: ...
Returns
add_episode() → Dict[str, Any]{"episode_id": str, "status": "created"}
query_episodes() → List[Dict[str, Any]]時系列関連のエピソードリスト。各要素に episode_id, content, timestamp を含む

Exceptions

MemoryError (Exception) ├── SemanticMemoryError ├── EpisodicMemoryError └── ConfigError

Utility Functions

def normalize_provider(provider: str) -> str

プロバイダー名を正規化(小文字・スペース除去)。

def parse_model_string(model: str) -> tuple[str, str]

モデル文字列を解析(プロバイダー・モデル名に分割)。

def get_embedding_dim(model_name: str) -> int

既知のモデルから embedding 次元を取得。

def normalize_group_id(group_id: str) -> str

グループ ID を正規化。

セマンティック + エピソディックメモリPython
from agenticstar_platform.memory import (
SemanticMemoryConfig, SemanticMemoryClient,
EpisodicMemoryConfig, EpisodicMemoryClient
)

# セマンティックメモリ(事実・概念)
sem_config = SemanticMemoryConfig.from_toml("config.toml")
async with SemanticMemoryClient(sem_config) as sem_mem:
# メモリ追加
await sem_mem.add_memory(
user_id="user-123",
text="ユーザーは AI 開発者で、Python と JavaScript に精通している"
)

# メモリ検索
results = await sem_mem.retrieve(
user_id="user-123",
query="ユーザーの技術スキルは?"
)

# エピソディックメモリ(時系列イベント)
ep_config = EpisodicMemoryConfig.from_toml("config.toml")
async with EpisodicMemoryClient(ep_config) as ep_mem:
# エピソード追加
await ep_mem.add_episode(
user_id="user-123",
content="AgenticStar SDK をレビュー中",
metadata={"module": "events"}
)

# エピソード検索
episodes = await ep_mem.query_episodes(
user_id="user-123",
query="最近のアクティビティ"
)
Related
Auth Module認証済みユーザー ID をメモリの user_id として使用
RAG Moduleメモリ内容を Embedding 化して類似検索を強化

security Security Module

Content Moderation(Azure / AWS / GCP)+ PII Detection。

Enums

SecurityProvider

AZURE = "azure"
AWS = "aws"
GCP = "gcp"

ContentCategory

コンテンツモデレーション分類。

HATE = "hate"
SELF_HARM = "self_harm"
SEXUAL = "sexual"
VIOLENCE = "violence"
PROFANITY = "profanity"
ASSAULT = "assault"
HARASSMENT = "harassment"
BULLYING = "bullying"

PIICategory

PII(個人識別情報)分類。

EMAIL = "email"
PHONE = "phone"
SSN = "ssn"
CREDIT_CARD = "credit_card"
PERSON_NAME = "person_name"
ADDRESS = "address"
IP_ADDRESS = "ip_address"
DATE_OF_BIRTH = "date_of_birth"

Exceptions

SecurityError (Exception) ├── ModerationError ├── PIIDetectionError ├── ConfigError └── ProviderError

Result Types

ContentModerationResult

コンテンツモデレーション結果。

@dataclass class ContentModerationResult: is_safe: bool categories: Dict[ContentCategory, float]# スコア 0.0-1.0 highest_severity_category: Optional[ContentCategory] reason: Optional[str]

PIIEntity

検出された PII エンティティ。

@dataclass class PIIEntity: category: PIICategory value: str # 検出値(マスク済み可) start_index: int end_index: int confidence: float

PIIDetectionResult

PII 検出結果。

@dataclass class PIIDetectionResult: has_pii: bool entities: List[PIIEntity] masked_text: str

Configuration

AzureSecurityConfig

class AzureSecurityConfig: endpoint: str api_key: str # 非表示 @classmethod def from_dict(data: Dict[str, Any]) -> AzureSecurityConfig: ...

AWSSecurityConfig

class AWSSecurityConfig: region: str access_key_id: str # 非表示 secret_access_key: str # 非表示 @classmethod def from_dict(data: Dict[str, Any]) -> AWSSecurityConfig: ...

GCPSecurityConfig

class GCPSecurityConfig: project_id: str credentials_json: str # JSON 文字列、非表示 @classmethod def from_dict(data: Dict[str, Any]) -> GCPSecurityConfig: ...

Clients

AzureSecurityClient

class AzureSecurityClient: def __init__(config: AzureSecurityConfig): ... async def __aenter__() -> AzureSecurityClient: ... async def __aexit__(...) -> None: ... async def moderate_content( text: str, language: str = "ja" ) -> ContentModerationResult: ... async def detect_pii( text: str, mask: bool = True ) -> PIIDetectionResult: ... async def moderate_batch( texts: List[str], language: str = "ja" ) -> List[ContentModerationResult]: ... async def detect_pii_batch( texts: List[str], mask: bool = True ) -> List[PIIDetectionResult]: ...
Returns
moderate_content() → ContentModerationResultis_safe(安全判定), categories(各カテゴリのスコア 0.0-1.0), highest_severity_category
detect_pii() → PIIDetectionResulthas_pii(検出有無), entities(PIIEntity のリスト), masked_text(マスク済みテキスト)
moderate_batch() → List[ContentModerationResult]入力順の結果リスト。大量テキストの一括チェックに最適
# PII マスク結果の利用 result = await security.detect_pii("Email: user@example.com", mask=True) print(result.masked_text) # → "Email: [EMAIL]" print(result.entities[0].category) # → PIICategory.EMAIL

AWSSecurityClient

class AWSSecurityClient: def __init__(config: AWSSecurityConfig): ... async def moderate_content( text: str ) -> ContentModerationResult: ... async def detect_pii( text: str, mask: bool = True ) -> PIIDetectionResult: ...

GCPSecurityClient

class GCPSecurityClient: def __init__(config: GCPSecurityConfig): ... async def moderate_content( text: str ) -> ContentModerationResult: ... async def detect_pii( text: str, mask: bool = True ) -> PIIDetectionResult: ...
コンテンツモデレーション + PII 検出Python
from agenticstar_platform.security import (
AzureSecurityConfig, AzureSecurityClient
)

config = AzureSecurityConfig.from_toml("config.toml")

async with AzureSecurityClient(config) as security:
# コンテンツモデレーション
moderation = await security.moderate_content(
text="This is a sample message",
language="en"
)

if not moderation.is_safe:
print(f"Content flagged: {moderation.highest_severity_category}")

# PII 検出(マスク有り)
pii_result = await security.detect_pii(
text="Contact me at user@example.com or 123-456-7890",
mask=True
)

if pii_result.has_pii:
print(f"Found {len(pii_result.entities)} PII entities")
print(f"Masked text: {pii_result.masked_text}")
Related
RAG ModuleEmbedding 投入前のコンテンツ検証パイプライン
Storage Moduleファイルアップロード前の PII スクリーニング
Events Moduleモデレーション結果をイベントとして配信

エラーハンドリング

全モジュールで共通の例外処理パターンを使用します。各モジュールはモジュール固有の基底例外クラスを持ち、サブクラスで具体的なエラーを区別します。

例外階層一覧

Exception ├── DatabaseError │ ├── ConnectionError │ ├── QueryError │ └── ConfigError ├── RAGError (VectorStoreError) │ ├── EmbeddingError │ ├── QdrantError (VectorStoreConnectionError) │ ├── SearchError │ └── VectorStoreConfigError ├── StorageError │ ├── StorageConfigError │ ├── StorageConnectionError │ └── StorageOperationError ├── AuthError │ ├── AuthenticationError │ ├── AuthorizationError │ ├── ConfigError │ └── TokenError ├── MemoryError │ ├── SemanticMemoryError │ └── EpisodicMemoryError └── SecurityError ├── ModerationError ├── PIIDetectionError ├── ConfigError └── ProviderError

共通パターン

全モジュールで「具体的な例外 → 基底例外 → Exception」の順にキャッチします。

マルチモジュール エラーハンドリングPython
from agenticstar_platform.db import DatabaseError, ConnectionError, QueryError
from agenticstar_platform.rag import VectorStoreError, EmbeddingError
from agenticstar_platform.storage import StorageError, StorageOperationError
from agenticstar_platform.auth import AuthError, AuthenticationError
from agenticstar_platform.security import SecurityError, ModerationError

async def process_document(doc_id: str):
try:
# 1. DB からドキュメント取得
doc = await db.fetch_one(
"SELECT * FROM documents WHERE id = $1", (doc_id,)
)

# 2. セキュリティチェック
moderation = await security.moderate_content(doc["content"])
if not moderation.is_safe:
raise ValueError("Content policy violation")

# 3. Embedding 生成 + ベクトル DB に保存
await qdrant.upsert(
point_id=doc_id,
text=doc["content"],
payload={"source": "documents"}
)

# 4. 処理結果をストレージに保存
await storage.upload_bytes(
data=doc["content"].encode(),
object_name=f"processed/{doc_id}.txt"
)

except ConnectionError:
# DB 接続エラー → リトライ
logger.warning("DB connection lost, retrying...")
except EmbeddingError:
# Embedding API エラー → スキップ
logger.error("Embedding generation failed")
except StorageOperationError as e:
# ストレージ操作エラー → error_code で判断
logger.error(f"Storage op failed: {e.error_code}")
except (DatabaseError, VectorStoreError, StorageError, SecurityError) as e:
# モジュール基底例外でフォールバック
logger.error(f"Module error: {type(e).__name__}: {e}")
except Exception as e:
# 予期しないエラー
logger.critical(f"Unexpected: {e}")
raise

まとめ

AgenticStar Platform SDK v0.3.1 は、以下の7つの専門モジュールで構成された包括的なツールキットです:

  • Events - 非同期イベント配信(SSE / Webhook / DB)
  • Database - PostgreSQL + Azure AD 認証
  • RAG - Embedding + Vector Search (Qdrant)
  • Storage - Cloud Storage(Azure / S3 / GCS)
  • Auth - AgenticStar 認証サービス統合
  • Memory - Semantic + Episodic Memory
  • Security - Content Moderation + PII Detection

各モジュールは独立して利用可能であり、組み合わせることで強力な AI エージェントアプリケーションを構築できます。