Skip to main content

SDK API Reference

Complete specifications for all modules, classes, and methods of agenticstar-platform SDK (v0.5.3).

Installation

Install the SDK using pip.

Basic Installationcurl
pip install agenticstar-platform==0.5.3

To install only specific modules, specify extras.

Extra説明
[[db]]PostgreSQL database support
[[rag]]RAG (embedding + vector search) support
[[storage]]Cloud storage (Azure, S3, GCS) support
[[security]]Security (content moderation, PII detection) support
[[auth]]AGENTIC STAR Auth service integration
[[memory]]Semantic Memory (Mem0 + Qdrant) support
[[all]]All modules
Install Specific Modulescurl
# RAG module
pip install agenticstar-platform[rag]==0.5.3

# DB + RAG + Storage
pip install agenticstar-platform[db,rag,storage]==0.5.3

# All modules
pip install agenticstar-platform[all]==0.5.3

Module List

The SDK consists of the following 8 modules.

ModuleDescriptionKey Classes
eventsAsync event delivery system (SSE / Webhook)EventEmitter, StreamingEvent
dbPostgreSQL connection + Azure AD authPostgreSQLManager, DataAccess
ragRAG (embedding + vector search via Qdrant)EmbeddingGenerator, QdrantManager
storageCloud storage (Azure, S3, GCS)AzureBlobStorageClient, S3StorageClient
authAGENTIC STAR Auth API clientAgenticStarAuthClient
memorySemantic memory (Mem0 + Qdrant)SemanticMemoryClient
securityContent Moderation + PII DetectionAzureSecurityClient, ContentSafetyValidator
commonCommon utilities (validation, secret masking)SecretMasker, validate_identifier

events Events Module

Async event delivery system. Emit events via EventEmitter and deliver them to SSE / Webhook / DB.

Enums

EventType

Event types displayed in the frontend UI.

PHASE_START = "phase_start"
PROGRESS_UPDATE = "progress_update"
THOUGHT_MESSAGE = "thought_message"
COMPLETION_SUCCESS = "completion_success"
COMPLETION_FAILURE = "completion_failure"
USER_INTERACTION_REQUIRED = "user_interaction_required"
UNEXPECTED_ERROR = "unexpected_error"
HITL_REQUIRED_BROWSER_VNC = "hitl_required_browser_vnc"
HITL_REQUIRED_BROWSER_CLI = "hitl_required_browser_cli"
HITL_COMPLETED = "hitl_completed"
FILE_CREATED = "file_created"
PERMISSION_REQUEST = "permission_request"
PERMISSION_RESPONSE = "permission_response"
TOOL_START = "tool_start"
TOOL_RESULT = "tool_result"
FINAL_RESULT = "final_result"
PROGRESS_MESSAGE = "progress_message"

SubEventType

Sub-event types for granular frontend UI control.

SEARCH_WEB = "search_web"
COMMAND_EXECUTION = "command_execution"
FILE_OPERATION = "file_operation"
LOCAL_ASSISTANT = "local_assistant"
MCP_TOOL = "mcp_tool"
A2A_TOOL = "a2a_tool"
FILE_EDITED = "file_edited"
FILE_READ = "file_read"
FILE_SEARCHED = "file_searched"
BASH_EXECUTED = "bash_executed"
WEB_FETCHED = "web_fetched"
TASK_LAUNCHED = "task_launched"
TODO_UPDATED = "todo_updated"
VIDEO_GENERATED = "video_generated"
IMAGE_GENERATED = "image_generated"
SLIDE_CREATED = "slide_created"
MACOS_AUTOMATION = "macos_automation"

Data Classes

StreamingEvent

Event data for real-time SSE delivery.

@dataclass class StreamingEvent: event_type: EventType execution_id: str message: str timestamp: float metadata: Optional[Dict[str, Any]]= None sub_event_type: Optional[SubEventType]= None def to_dict() -> Dict[str, Any]: ...

SequencedEvent

Event with ordering guarantee.

@dataclass class SequencedEvent: sequence: int # >= 0 event_type: EventType execution_id: str message: str timestamp: float metadata: Optional[Dict[str, Any]]= None sub_event_type: Optional[SubEventType]= None def __post_init__(): ... # Validates sequence >= 0 def to_streaming_event() -> StreamingEvent: ... def to_dict() -> Dict[str, Any]: ...

ExecutionMessage

Database-stored message for marketplace UI integration.

@dataclass class ExecutionMessage: user_id: str conversation_id: str message_id: str chunk_type: str content_data: Dict[str, Any] def to_dict() -> Dict[str, Any]: ... @classmethod def from_streaming_event( event: StreamingEvent, user_id: str, conversation_id: str, message_id: str, chunk_type: Optional[str]= None ) -> ExecutionMessage: ...

Classes

EventEmitter

Async event queue. Emits and consumes events in a non-blocking manner.

class EventEmitter: def __init__( execution_id: str, handler: Optional[EventHandler]= None ): ... @property def is_completed() -> bool: ... @property def sequence_number() -> int: ... async def emit_event( event_type: EventType, message: str, metadata: Optional[Dict[str, Any]]= None, sub_event_type: Optional[SubEventType]= None ) -> None: ... async def consume_events( timeout: float = 0.1 ) -> AsyncGenerator[str,None]: ... def mark_completed() -> None: ... async def cleanup() -> None: ... async def __aenter__() -> EventEmitter: ... async def __aexit__(...) -> None: ...
Returns
emit_event() -> NoneAdds an event to the queue. If a handler is set, it is called asynchronously
consume_events() -> AsyncGenerator[str, None]Asynchronously yields SSE-formatted strings. Stops automatically on completion
Context manager supported: async with EventEmitter(...) as emitter:
Emitting and Consuming EventsPython
from agenticstar_platform import EventEmitter, EventType

emitter = EventEmitter(execution_id="exec-abc-123")

# Emit event
await emitter.emit_event(
event_type=EventType.PHASE_START,
message="Starting document analysis",
metadata={"phase": "analysis", "total_pages": 42}
)

# Consume events (SSE streaming)
async for chunk in emitter.consume_events():
print(chunk)

DatabaseEventHandler

Persists events to the PostgreSQL execution_messages table.

class DatabaseEventHandler: def __init__( data_access: Any, user_id: str, conversation_id: str, message_id: str, table_name: str = "execution_messages", chunk_type_map: Optional[Dict[EventType, str]]= None ): ... async def __call__(event: StreamingEvent) -> Optional[str]: ...

WebhookEventHandler

Sends events to an HTTP webhook endpoint.

class WebhookEventHandler: def __init__( webhook_url: str, conversation_id: str, message_id: str, headers: Optional[Dict[str, str]]= None, timeout_seconds: int = 30, message_type: str = "append_message", token_provider: Optional[Callable]= None, chunk_type_map: Optional[Dict[EventType, str]]= None ): ... async def __call__(event: StreamingEvent) -> Optional[str]: ...

CompositeEventHandler

Executes multiple EventHandlers in parallel.

class CompositeEventHandler: def __init__( handlers: List[Callable], return_first_chunk: bool = True ): ... async def __call__(event: StreamingEvent) -> Optional[str]: ...

Factory Functions

def create_sse_handler() -> EventHandler

Creates a handler in SSE (Server-Sent Events) format.

def create_json_handler() -> EventHandler

Creates a handler in JSON format.

def create_marketplace_handler( data_access: Any, webhook_url: str, user_id: str, conversation_id: str, message_id: str, token_provider: Optional[Callable]= None ) -> CompositeEventHandler

Creates a composite handler (DB + Webhook) for marketplace UI.

Protocol

EventHandler

Generic interface for event processing.

class EventHandler(Protocol): async def __call__( self, event: StreamingEvent ) -> Optional[str]: ...
Related
Database ModulePass DataAccess to DatabaseEventHandler for persistence
Storage ModuleArchive event logs to cloud storage

db Database Module

PostgreSQL connection pool + Azure AD auth.

Exceptions

DatabaseError (Exception) ├── ConnectionError ├── QueryError └── ConfigError

Configuration

AzureADConfig

Azure Active Directory authentication configuration.

@dataclass class AzureADConfig: tenant_id: str client_id: str client_secret: str # hidden username: str = "" @classmethod def from_dict(data: Dict[str, Any]) -> AzureADConfig: ... @classmethod def from_toml(toml_path: str, section: str = "azure_ad") -> AzureADConfig: ...

PostgreSQLConfig

PostgreSQL connection configuration (with Azure AD support).

class PostgreSQLConfig: provider: str = "postgresql" host: str = "localhost" port: int = 5432 database: str = "agenticstar_db" use_azure_ad: bool = False username: Optional[str]= None password: Optional[str]= None # hidden pool_min_size: int = 5 pool_max_size: int = 20 command_timeout: int = 60 pool_timeout: Optional[int]= 30 max_overflow: Optional[int]= 10 azure_ad: Optional[AzureADConfig]= None api_proxy_url: Optional[str]= None ssl_mode: str = "require" @classmethod def from_dict(data: Dict[str, Any]) -> PostgreSQLConfig: ... @classmethod def from_toml(toml_path: str, section: str = "database") -> PostgreSQLConfig: ... @classmethod def from_env(prefix: str = "DB_") -> PostgreSQLConfig: ...

TOML configuration example:

config.toml - PostgreSQLToml
[database]
provider = "postgresql"
host = "db.example.com"
port = 5432
database = "agenticstar_db"
use_azure_ad = true
pool_min_size = 5
pool_max_size = 20
command_timeout = 60

[azure_ad]
tenant_id = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
client_id = "yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy"
client_secret = "your-secret-here"
username = "dbuser@example.onmicrosoft.com"

Connection Management

PostgreSQLManager

Async PostgreSQL connection pool (with Azure AD token management).

class PostgreSQLManager: def __init__(config: PostgreSQLConfig): ... @property def config() -> PostgreSQLConfig: ... @property def pool() -> Optional[asyncpg.Pool]: ... async def __aenter__() -> PostgreSQLManager: ... async def __aexit__(...) -> None: ... async def initialize() -> None: ... async def close() -> None: ... async def execute_query( query: str, params: tuple = () ) -> Dict[str, Any]: ... async def fetch_one( query: str, params: tuple = () ) -> Optional[Dict[str, Any]]: ... async def fetch_all( query: str, params: tuple = () ) -> List[Dict[str, Any]]: ... async def execute( query: str, params: tuple = () ) -> str: ...
Returns
execute_query() -> Dict[str, Any]{"success": bool, "data": list, "count": int}
fetch_one() -> Optional[Dict[str, Any]]Single row result dict, or None if not found
fetch_all() -> List[Dict[str, Any]]List of result rows (empty list if 0 rows)
execute() -> strPostgreSQL status string (e.g., "INSERT 0 1")
Context manager supported: async with PostgreSQLManager(config) as mgr:
Database Connection and Query ExecutionPython
from agenticstar_platform.db import PostgreSQLConfig, PostgreSQLManager

config = PostgreSQLConfig.from_toml("config.toml")

async with PostgreSQLManager(config) as mgr:
await mgr.initialize()

# SELECT query
result = await mgr.fetch_all(
"SELECT * FROM conversations WHERE user_id = $1",
("user-123",)
)

# INSERT
await mgr.execute(
"INSERT INTO events (event_type, data) VALUES ($1, $2)",
("phase_start", '{}')
)

Data Access Layer

DataAccess

Generic table operation wrapper with SQL injection prevention.

class DataAccess: def __init__( db: PostgreSQLManager| ApiPostgreSQLManager ): ... async def __aenter__() -> DataAccess: ... async def __aexit__(...) -> None: ... async def initialize() -> None: ... async def close() -> None: ... def is_initialized() -> bool: ... async def ensure_initialized() -> None: ... async def execute_query( query: str, params: tuple = () ) -> Dict[str, Any]: ... async def insert( table: str, data: Dict[str, Any], returning: Optional[List[str]]= None ) -> Dict[str, Any]: ... async def upsert( table: str, data: Dict[str, Any], conflict_columns: List[str], update_columns: Optional[List[str]]= None, returning: Optional[List[str]]= None ) -> Dict[str, Any]: ... async def select( table: str, columns: Optional[List[str]]= None, where: Optional[Dict[str, Any]]= None, order_by: Optional[str]= None, limit: Optional[int]= None ) -> Dict[str, Any]: ... async def select_one( table: str, columns: Optional[List[str]]= None, where: Optional[Dict[str, Any]]= None ) -> Optional[Dict[str, Any]]: ... async def update( table: str, data: Dict[str, Any], where: Dict[str, Any], returning: Optional[List[str]]= None ) -> Dict[str, Any]: ... async def delete( table: str, where: Dict[str, Any], returning: Optional[List[str]]= None ) -> Dict[str, Any]: ...
Returns
insert() / upsert() -> Dict[str, Any]{"success": bool, "data": dict}. Contains column values when returning is specified
select() -> Dict[str, Any]{"success": bool, "data": list, "count": int}
update() / delete() -> Dict[str, Any]{"success": bool, "affected_rows": int}

Key method usage examples:

# INSERT result = await da.insert("users", {"name": "Alice"}, returning=["id"]) # UPSERT (update on conflict) result = await da.upsert("users", {"email": "a@example.com", "name": "Alice"}, conflict_columns=["email"]) # SELECT with WHERE + ORDER + LIMIT result = await da.select("users", columns=["id", "name"], where={"active": True}, order_by="created_at DESC", limit=10)

ConfigAccess

Access layer for configuration information such as agents, tools, guardrails, and MCP.

class ConfigAccess: def __init__(db: DataAccess): ... async def get_agent_instructions(agent_type: str) -> Dict[str, Any]: ... async def get_all_agent_configs() -> Dict[str, Any]: ... async def get_agent_config(agent_type: str, requested_level: str = "default") -> Dict[str, Any]: ... async def get_tool_config(tool_name: str) -> Dict[str, Any]: ... async def get_all_tool_configs() -> Dict[str, Any]: ... async def get_banned_urls() -> Dict[str, Any]: ... async def get_guardrails_settings() -> Dict[str, Any]: ... async def get_system_settings() -> Dict[str, Any]: ... async def get_mcp_configurations(execution_mode: Optional[str]= None) -> Dict[str, Any]: ...
Extending Blocked URL granularity

get_banned_urls() returns the list of blocked URL patterns configured in the admin panel. The basic feature (ExtAuth Service) provides domain-level restriction, but by matching the retrieved patterns against URL strings inside your agent tools (e.g., using re.search), you can implement URL (path) level restriction as an extended capability. Settings are unified through the admin panel, so there is no duplicated operational configuration.

ExecutionAccess

Execution message retrieval layer.

class ExecutionAccess: def __init__(db: DataAccess): ... async def get_messages( execution_id: str ) -> Optional[List[Dict[str, Any]]]: ...

TelemetryAccess

NIST audit telemetry storage and retrieval layer.

class TelemetryAccess: def __init__(db: DataAccess): ... async def save_telemetry( telemetry_data: Dict[str, Any] ) -> Dict[str, Any]: ... async def list_telemetry( conversation_id: Optional[str]= None, service: Optional[str]= None, limit: int = 100, offset: int = 0 ) -> Dict[str, Any]: ... async def get_telemetry( telemetry_id: str ) -> Optional[Dict[str, Any]]: ...

PodRuntime

Pod lifecycle management (startup notification, shutdown notification, auto scale-down).

class PodRuntime: def __init__( db: DataAccess, execution_id: str, pod_name: str ): ... async def start() -> Dict[str, Any]: ... async def final(status: str = "completed") -> Dict[str, Any]: ...

ApiPostgreSQLManager

PostgreSQL access via HTTP API proxy (for CLI mode). Has the same async interface as PostgreSQLManager.

class ApiPostgreSQLManager: def __init__(api_url: str, token_provider: Optional[Callable]= None): ... async def fetch_one(query: str, params: tuple = ()) -> Optional[Dict[str, Any]]: ... async def fetch_all(query: str, params: tuple = ()) -> List[Dict[str, Any]]: ... async def execute(query: str, params: tuple = ()) -> str: ... async def execute_query(query: str, params: tuple = ()) -> Dict[str, Any]: ...

Factory Function

def create_postgresql_manager( config: PostgreSQLConfig ) -> Union[PostgreSQLManager, ApiPostgreSQLManager]

Automatically selects and returns PostgreSQLManager (direct connection) or ApiPostgreSQLManager (API proxy) based on whether config.api_proxy_url is set.

Error Handling

Database operation exception handling.

Database Error HandlingPython
from agenticstar_platform.db import PostgreSQLConfig, PostgreSQLManager, DataAccess

config = PostgreSQLConfig.from_toml("config.toml")

try:
db_manager = PostgreSQLManager(config)
da = DataAccess(db_manager)
await da.initialize()

result = await da.select("users", where={"id": "user-123"})
if result["success"] and result["data"]:
print(f"User: {result['data'][0]['username']}")
except Exception as e:
print(f"Database error: {e}")
finally:
await da.close()
Related
Events ModulePersist events to DB with DatabaseEventHandler
RAG ModuleConvert data retrieved from DB into Embeddings for searchability

rag RAG Module

RAG (Retrieval-Augmented Generation): Embedding + Vector Search via Qdrant.

Enums

VectorStoreProvider

QDRANT = "qdrant"
WEAVIATE = "weaviate"
MILVUS = "milvus"

Exceptions

RAGError (Exception) ├── EmbeddingError ├── QdrantError └── SearchError

Data Classes

SearchResult

Single search result.

@dataclass class SearchResult: point_id: str payload: Dict[str, Any] score: float similarity: float = 0.0

UpsertResult

Upsert operation result.

@dataclass class UpsertResult: success: bool point_id: str = "" error: Optional[str]= None error_code: Optional[str]= None

SearchResponse

Search operation result.

@dataclass class SearchResponse: success: bool results: List[SearchResult]= field(default_factory=list) total_found: int = 0 query: str = "" error: Optional[str]= None error_code: Optional[str]= None

Embedding Configuration

EmbeddingConfig

Azure OpenAI embedding configuration.

class EmbeddingConfig: base_url: str api_key: str # hidden model: str = "text-embedding-ada-002" api_version: str = "2024-02-15-preview" max_cache_size: int = 10000 dimensions: int = 1536 max_retries: int = 5 base_delay: float = 1.0 max_delay: float = 60.0 @classmethod def from_dict(data: Dict[str, Any]) -> EmbeddingConfig: ... @classmethod def from_toml(toml_path: str, section: str = "rag.embedding") -> EmbeddingConfig: ...

EmbeddingGenerator

Embedding generation with caching and retry logic.

class EmbeddingGenerator: def __init__(config: EmbeddingConfig): ... async def generate(text: str) -> List[float]: ... async def generate_batch( texts: List[str], batch_size: int = 25 ) -> List[List[float]]: ... def clear_cache() -> None: ...
Returns
generate() -> List[float]Vector (dimensions per config.dimensions, default 1536). Returns immediately if cached
generate_batch() -> List[List[float]]List of vectors in input text order. API calls in batch_size units (default 25)
# Single text Embedding vector = await generator.generate("Hello, world!") # -> [0.012, -0.034, ...] # Batch Embedding vectors = await generator.generate_batch(["text1", "text2", "text3"])

Vector Database

QdrantConfig

Qdrant database configuration.

class QdrantConfig: url: str collection_name: str vector_size: int = 1536 distance: str = "cosine" # cosine, euclid, dot on_disk_vectors: bool = False on_disk_payload: bool = True hnsw_m: int = 16 hnsw_ef_construct: int = 256 payload_indexes: List[PayloadIndexConfig] auth_token_provider: Optional[Callable[[], str]]= None @classmethod def from_dict(data: Dict[str, Any]) -> QdrantConfig: ... @classmethod def from_toml(toml_path: str, section: str = "rag.qdrant") -> QdrantConfig: ...

PayloadIndexConfig

Payload field index configuration.

@dataclass class PayloadIndexConfig: field_name: str field_schema: str = "keyword" # keyword, integer, float, bool @classmethod def from_dict(data: Dict[str, Any]) -> PayloadIndexConfig: ...

TOML configuration example:

config.toml - RAG (Qdrant)Toml
[embedding]
provider = "openai"
model = "text-embedding-3-small"
api_key = "sk-..."
dimension = 1536

[qdrant]
host = "qdrant.example.com"
port = 6333
api_key = "your-qdrant-key"
use_ssl = true

QdrantManager

Qdrant vector search client.

class QdrantManager: def __init__( config: QdrantConfig, embedding_generator: EmbeddingGenerator ): ... async def __aenter__() -> QdrantManager: ... async def __aexit__(...) -> None: ... def is_initialized() -> bool: ... async def initialize() -> None: ... async def ensure_initialized() -> None: ... async def upsert( point_id: str, text: str, payload: Dict[str, Any] ) -> Dict[str, Any]: ... async def batch_upsert( items: List[Dict[str, Any]], batch_size: int = 256 ) -> Dict[str, Any]: ... async def search( query_text: str, limit: int = 10, score_threshold: float = 0.4, filter_conditions: Optional[Dict[str, Any]]= None ) -> Dict[str, Any]: ... async def delete( point_ids: List[str] ) -> Dict[str, Any]: ... async def get_statistics() -> Dict[str, Any]: ... async def close() -> None: ...
Returns
upsert() -> Dict[str, Any]{"success": bool, "point_id": str}
batch_upsert() -> Dict[str, Any]{"success": bool, "upserted_count": int, "errors": list}
search() -> Dict[str, Any]{"success": bool, "data": List[SearchResult], "total_found": int}
get_statistics() -> Dict[str, Any]{"vectors_count": int, "indexed_vectors_count": int, "points_count": int, ...}
Context manager supported: async with QdrantManager(config, generator) as qdrant:
RAG: Embedding + Qdrant SearchPython
from agenticstar_platform.rag import (
EmbeddingConfig, EmbeddingGenerator,
QdrantConfig, QdrantManager
)

emb_config = EmbeddingConfig.from_toml("config.toml")
qd_config = QdrantConfig.from_toml("config.toml")

generator = EmbeddingGenerator(emb_config)

async with QdrantManager(qd_config, generator) as qdrant:
# Add a document
await qdrant.upsert(
point_id="doc-001",
text="AGENTIC STAR is an AI agent development platform",
payload={"source": "docs", "category": "overview"}
)

# Semantic search
results = await qdrant.search(
query_text="What is an AI platform?",
limit=5,
score_threshold=0.5
)
for r in results["data"]:
print(f"Score: {r['score']}, Text: {r['payload']['text']}")
Related
Database ModuleFeed data retrieved from DB into RAG
Storage ModuleRetrieve document files from storage and convert to Embeddings
Security ModuleVerify content safety before Embedding

storage Storage Module

Cloud storage integration (Azure Blob Storage / AWS S3 / Google Cloud Storage).

Enums

StorageProvider

Cloud storage provider type.

AZURE_BLOB = "azure_blob"
AWS_S3 = "aws_s3"
GCS = "gcs"

Exceptions

StorageError (Exception) - Base exception ├── StorageConfigError - Configuration error ├── StorageConnectionError - Connection error └── StorageOperationError - Operation error with error_code attribute

Result Types

UploadResult

File upload result.

@dataclass class UploadResult: success: bool # Upload succeeded object_name: str = "" # Object name object_url: str = "" # Object URL file_size: int = 0 # File size content_type: str = "" # Content type error: Optional[str]= None # Error message error_code: Optional[str]= None # Error code metadata: Dict[str, Any]= field(default_factory=dict) # Metadata

DownloadResult

File download result.

@dataclass class DownloadResult: success: bool # Download succeeded object_name: str = "" # Object name local_path: str = "" # Local file path file_size: int = 0 # File size error: Optional[str]= None # Error message error_code: Optional[str]= None # Error code

ObjectInfo

Cloud object metadata.

@dataclass class ObjectInfo: name: str # Object name size: int # Object size last_modified: Optional[str]= None # Last modified timestamp content_type: Optional[str]= None # Content type metadata: Dict[str, Any]= field(default_factory=dict) # Metadata

ListResult

List operation result.

@dataclass class ListResult: success: bool # Operation succeeded objects: List[ObjectInfo]= field(default_factory=list) # Objects count: int = 0 # Object count prefix: str = "" # Search prefix error: Optional[str]= None # Error message error_code: Optional[str]= None # Error code

Configuration

StorageConfig

Common storage configuration (base class).

class StorageConfig: provider: StorageProvider# Provider type bucket_name: str # Bucket/container name enabled: bool = True # Enabled flag max_file_size: int = 5*1024*1024*1024 # Max file size (5GB) auto_create_bucket: bool = False # Auto-create bucket prefix: str = "" # Object name prefix custom_domain: Optional[str]= None # Custom domain connection_timeout: int = 30 # Connection timeout (seconds) read_timeout: int = 300 # Read timeout (seconds)

AzureBlobConfig

Azure Blob Storage configuration.

class AzureBlobConfig: bucket_name: str # Container name connection_string: str # hidden enabled: bool = True max_file_size: int = 5*1024*1024*1024 auto_create_bucket: bool = False prefix: str = "" custom_domain: Optional[str]= None connection_timeout: int = 30 read_timeout: int = 300 max_block_size: int = 8*1024*1024 # Max block size (8MB) max_single_put_size: int = 256*1024*1024 # Max single upload (256MB) @classmethod def from_dict(data: Dict[str, Any]) -> AzureBlobConfig: ... @property def provider(self) -> StorageProvider: return StorageProvider.AZURE_BLOB

S3Config

AWS S3 configuration.

class S3Config: bucket_name: str # Bucket name aws_access_key_id: str # hidden aws_secret_access_key: str # hidden region_name: str = "us-east-1" # AWS region enabled: bool = True max_file_size: int = 5*1024*1024*1024 auto_create_bucket: bool = False prefix: str = "" custom_domain: Optional[str]= None connection_timeout: int = 30 read_timeout: int = 300 endpoint_url: Optional[str]= None # S3-compatible endpoint (MinIO, etc.) @classmethod def from_dict(data: Dict[str, Any]) -> S3Config: ... @property def provider(self) -> StorageProvider: return StorageProvider.AWS_S3

GCSConfig

Google Cloud Storage configuration.

class GCSConfig: bucket_name: str # Bucket name project_id: str # GCP project ID enabled: bool = True max_file_size: int = 5*1024*1024*1024 auto_create_bucket: bool = False prefix: str = "" custom_domain: Optional[str]= None connection_timeout: int = 30 read_timeout: int = 300 credentials_path: Optional[str]= None # hidden - Path to service account JSON credentials_json: Optional[str]= None # hidden - Service account JSON string credentials_base64: Optional[str]= None # hidden - Base64-encoded service account JSON @classmethod def from_dict(data: Dict[str, Any]) -> GCSConfig: ... @property def provider(self) -> StorageProvider: return StorageProvider.GCS

Storage Clients

AzureBlobStorageClient

Azure Blob Storage client. Supports async context manager (async with).

class AzureBlobStorageClient: def __init__(config: AzureBlobConfig): ... async def initialize() -> None: ... # Initialize client async def ensure_initialized() -> None: ... # Initialize if needed async def upload_file( file_path: str, prefix: Optional[str]= None ) -> UploadResult: ... async def upload_bytes( data: bytes, object_name: str, content_type: str = "application/octet-stream" ) -> UploadResult: ... async def download_file( object_name: str, local_path: str ) -> DownloadResult: ... async def list_objects( prefix: str = "" ) -> ListResult: ... async def delete_object( object_name: str ) -> Dict[str, Any]: ... async def delete_objects( object_names: List[str] ) -> Dict[str, Any]: ... async def close() -> None: ...
Returns
upload_file() / upload_bytes() -> UploadResultContains success, object_url, file_size, content_type
download_file() -> DownloadResultContains success, local_path, file_size
list_objects() -> ListResultContains success, objects: List[ObjectInfo], count
delete_object() -> Dict[str, Any]{"success": bool, "object_name": str}

S3StorageClient

AWS S3 storage client. Supports async context manager (async with).

class S3StorageClient: def __init__(config: S3Config): ... async def initialize() -> None: ... # Initialize client async def ensure_initialized() -> None: ... # Initialize if needed async def upload_file( file_path: str, prefix: Optional[str]= None ) -> UploadResult: ... async def upload_bytes( data: bytes, object_name: str, content_type: str = "application/octet-stream" ) -> UploadResult: ... async def download_file( object_name: str, local_path: str ) -> DownloadResult: ... async def list_objects( prefix: str = "" ) -> ListResult: ... async def delete_object( object_name: str ) -> Dict[str, Any]: ... async def delete_objects( object_names: List[str] ) -> Dict[str, Any]: ... async def close() -> None: ...

GCSStorageClient

Google Cloud Storage client. Supports async context manager (async with).

class GCSStorageClient: def __init__(config: GCSConfig): ... async def initialize() -> None: ... # Initialize client async def ensure_initialized() -> None: ... # Initialize if needed async def upload_file( file_path: str, prefix: Optional[str]= None ) -> UploadResult: ... async def upload_bytes( data: bytes, object_name: str, content_type: str = "application/octet-stream" ) -> UploadResult: ... async def download_file( object_name: str, local_path: str ) -> DownloadResult: ... async def list_objects( prefix: str = "" ) -> ListResult: ... async def delete_object( object_name: str ) -> Dict[str, Any]: ... async def delete_objects( object_names: List[str] ) -> Dict[str, Any]: ... async def close() -> None: ...
S3 Storage Usage ExamplePython
from agenticstar_platform.storage import (
S3Config, S3StorageClient
)

config = S3Config.from_dict({
"bucket_name": "my-bucket",
"aws_access_key_id": "AKIA...",
"aws_secret_access_key": "secret...",
"region_name": "ap-northeast-1"
})

# Context manager with async with
async with S3StorageClient(config) as storage:
# File upload
upload_result = await storage.upload_file(
file_path="/tmp/report.pdf",
prefix="documents/"
)
if upload_result.success:
print(f"URL: {upload_result.object_url}")

# Byte data upload
await storage.upload_bytes(
data=b"Hello, World!",
object_name="greetings/hello.txt",
content_type="text/plain"
)

# List objects
list_result = await storage.list_objects(prefix="documents/")
for obj in list_result.objects:
print(f"{obj.name} ({obj.size} bytes)")

# Download
download = await storage.download_file(
object_name="documents/report.pdf",
local_path="/tmp/downloaded.pdf"
)
Error HandlingPython
from agenticstar_platform.storage import (
StorageError, StorageConfigError,
StorageConnectionError, StorageOperationError
)

try:
result = await storage.upload_file("/tmp/large-file.zip")
except StorageConnectionError as e:
print(f"Connection failed: {e}")
except StorageOperationError as e:
print(f"Operation failed: {e}, code: {e.error_code}")
except StorageError as e:
print(f"Storage error: {e}")
Related
RAG ModuleRetrieve documents from storage and feed into Embedding
Security ModuleContent moderation before upload

auth Auth Module

AGENTIC STAR authentication service integration. Manages users, tokens, and device information.

Exceptions

AuthError (Exception) ├── AuthConfigError └── AuthAPIError ├── AuthUnauthorizedError (401) ├── AuthNotFoundError (404) └── AuthRateLimitError (429)

Configuration

AgenticStarAuthConfig

class AgenticStarAuthConfig: base_url: str api_key: str # hidden auth_type: str = "api_key" timeout: float = 30.0 @classmethod def create( base_url: str, api_key: str, auth_type: str = "api_key", timeout: float = 30.0 ) -> AgenticStarAuthConfig: ... @classmethod def from_config(config_path: Optional[Path]= None) -> AgenticStarAuthConfig: ...

Data Models

ApiUser

Authenticated user information (Pydantic BaseModel).

class ApiUser(BaseModel): id: str username: Optional[str]= None email: Optional[str]= None first_name: Optional[str]= None last_name: Optional[str]= None display_name: Optional[str]= None email_verified: bool = False enabled: bool = True created_timestamp: Optional[int]= None attributes: Optional[Dict[str, Any]]= None organization: Optional[str]= None job: Optional[str]= None bio: Optional[str]= None last_login_at: Optional[str]= None

DeviceInfo

Device information.

class DeviceInfo(BaseModel): id: Optional[str]= None device_code: str device_name: Optional[str]= None device_type: Optional[str]= None device_info: Optional[Dict[str, Any]]= None is_active: Optional[bool]= None created_at: Optional[str]= None last_used_at: Optional[str]= None

MCPTokenInfo

MCP token information.

class MCPTokenInfo(BaseModel): access_token: str token_type: str expires_at: datetime scopes: List[str]

Client

AgenticStarAuthClient

class AgenticStarAuthClient: def __init__(config: AgenticStarAuthConfig): ... async def __aenter__() -> AgenticStarAuthClient: ... async def __aexit__(...) -> None: ... async def get_users( page: int = 1, limit: int = 20, search: Optional[str]= None, is_approved: Optional[str]= None, sort_by: Optional[str]= None, order: Optional[str]= None, include_last_login: bool = False ) -> GetUsersResult: ... async def get_user( user_id: str ) -> GetUserResult: ... async def get_mcp_tokens( user_id: str, providers: Optional[List[str]]= None ) -> GetMCPTokensResult: ... async def close() -> None: ...
Returns
get_users() -> GetUsersResultsuccess, users: List[ApiUser], pagination: UserPagination
get_user() -> GetUserResultsuccess, user: ApiUser, devices: List[DeviceInfo], login_history
get_mcp_tokens() -> GetMCPTokensResultsuccess, tokens: Dict[str, MCPTokenInfo], errors
# Get user list (with pagination) result = await auth_client.get_users(page=1, limit=20) for user in result.users: print(f"{user.username} ({user.email})")
User Information and MCP TokensPython
from agenticstar_platform.auth import (
AgenticStarAuthConfig, AgenticStarAuthClient
)

config = AgenticStarAuthConfig.from_config("config.toml")

async with AgenticStarAuthClient(config) as auth_client:
# Get user information
result = await auth_client.get_user("user-123")
if result.success:
print(f"User: {result.user.username}")

# Get MCP tokens
tokens = await auth_client.get_mcp_tokens(
user_id="user-123",
providers=["slack", "gitlab"],
)
for provider, token in tokens.tokens.items():
print(f"{provider}: expires={token.expires_at}")
Related
Events ModuleSupply auth tokens to WebhookEventHandler token_provider
Memory ModuleManage semantic memory for authenticated users

memory Memory Module

Semantic memory (Mem0 + Qdrant) integration. Recall user knowledge, preferences, and facts via vector search.

Configuration

SemanticMemoryConfig

@dataclass class LLMProviderConfig: model: str # "azure_openai/gpt-4.1" api_key: str # hidden base_url: Optional[str]= None api_version: Optional[str]= None aws_access_key_id: Optional[str]= None # For Bedrock aws_secret_access_key: Optional[str]= None # For Bedrock aws_region_name: Optional[str]= None # For Bedrock @classmethod def from_dict(data: Dict[str, Any]) -> LLMProviderConfig: ... @classmethod def from_toml(toml_path: str, section: str) -> LLMProviderConfig: ...
@dataclass class SemanticMemoryConfig: llm_config: LLMProviderConfig embedder_config: LLMProviderConfig vector_store: Optional[Dict[str, Any]]= None rerank: Optional[Dict[str, Any]]= None custom_fact_extraction_prompt: Optional[str]= None @classmethod def from_dict(data: Dict[str, Any]) -> SemanticMemoryConfig: ... @classmethod def from_toml(toml_path: str, section: str = "memory") -> SemanticMemoryConfig: ...

SemanticMemoryClient

class SemanticMemoryClient: def __init__(config: SemanticMemoryConfig): ... @property def enabled() -> bool: ... # Synchronous methods (no await needed) def add( messages: List[Dict[str, str]], user_id: str, metadata: Optional[Dict[str, Any]]= None ) -> Dict[str, Any]: ... def search( query: str, user_id: str, limit: int = 10 ) -> Dict[str, Any]: ... def get_all( user_id: str ) -> Dict[str, Any]: ... def delete( memory_id: str ) -> Dict[str, Any]: ... def delete_all( user_id: str ) -> Dict[str, Any]: ... # Asynchronous method async def cleanup() -> None: ...
Returns
add() -> Dict[str, Any]Mem0 result dict. messages is OpenAI format [{"role": "user", "content": "..."}]
search() -> Dict[str, Any]{"results": [...]}. Each element contains memory, score, etc.
get_all() -> Dict[str, Any]Returns all memories for the user
delete() / delete_all() -> Dict[str, Any]Deletion result

Exceptions

SemanticMemoryError (Exception) └── SemanticMemoryConfigError

Utility Functions

def normalize_provider(provider: str) -> str

Normalize provider name (alias support, e.g., "Azure_OpenAI" -> "azure_openai").

def parse_model_string(model: str) -> tuple[str, str]

Split model string into provider and model name (e.g., "azure_openai/gpt-4.1" -> ("azure_openai", "gpt-4.1")).

def get_api_key(config: LLMProviderConfig) -> str

Get API key from LLMProviderConfig (SecretStr compatible).

def convert_llm_to_mem0(llm_config: LLMProviderConfig) -> Dict[str, Any]

Convert LLMProviderConfig to Mem0 format LLM configuration dict.

def convert_embedder_to_mem0(embedder_config: LLMProviderConfig) -> Dict[str, Any]

Convert LLMProviderConfig to Mem0 format Embedder configuration dict.

Semantic Memory Usage ExamplePython
from agenticstar_platform.memory import (
SemanticMemoryConfig, SemanticMemoryClient
)

config = SemanticMemoryConfig.from_toml("config.toml")
memory = SemanticMemoryClient(config)

# Add memory (synchronous method)
memory.add(
messages=[
{"role": "user", "content": "I'm proficient in Python"},
{"role": "assistant", "content": "Understood"},
],
user_id="user-123",
)

# Search memory (synchronous method)
result = memory.search(query="What are the user's technical skills?", user_id="user-123")
for r in result.get("results", []):
print(f"{r['memory']}")

# Cleanup (asynchronous)
await memory.cleanup()
Related
Auth ModuleUse authenticated user ID as memory user_id
RAG ModuleConvert memory contents to Embeddings for enhanced similarity search

security Security Module

Content Moderation (Azure / AWS / GCP) + PII Detection.

Enums

SecurityProvider

AZURE = "azure"
AWS = "aws"
GCP = "gcp"

ContentCategory

Content moderation categories.

HATE = "hate"
SEXUAL = "sexual"
SELF_HARM = "self_harm"
VIOLENCE = "violence"
PROFANITY = "profanity"
INSULT = "insult"
THREAT = "threat"

PIICategory

PII (Personally Identifiable Information) categories.

PERSON_NAME = "person_name"
EMAIL = "email"
PHONE_NUMBER = "phone_number"
ADDRESS = "address"
DATE_OF_BIRTH = "date_of_birth"
NATIONAL_ID = "national_id"
PASSPORT_NUMBER = "passport_number"
SOCIAL_SECURITY = "social_security"
CREDIT_CARD = "credit_card"
IP_ADDRESS = "ip_address"
API_KEY = "api_key"
CONNECTION_STRING = "connection_string"

Exceptions

SecurityError (Exception) ├── SecurityConfigError └── SecurityAPIError (status_code, error_code)

Result Types

ContentModerationResult

Content moderation result. categories values are severity (0-6).

@dataclass class ContentModerationResult: blocked: bool # True if above threshold categories: Dict[ContentCategory, int]= {} # severity 0-6 threshold: int = 2 # Applied threshold raw_response: Dict[str, Any]= {} error: Optional[str]= None error_code: Optional[str]= None

PromptShieldResult

Prompt injection detection result.

@dataclass class PromptShieldResult: attack_detected: bool attack_type: Optional[str]= None confidence: float = 0.0 raw_response: Dict[str, Any]= {} error: Optional[str]= None error_code: Optional[str]= None

PIIEntity

Detected PII entity.

@dataclass class PIIEntity: category: PIICategory text: str # Detected text offset: int # Start position in text length: int # Length of detected text confidence: float provider_category: str = "" # Provider-specific category name

PIIDetectionResult

PII detection result.

@dataclass class PIIDetectionResult: success: bool masked_text: str = "" entities: List[PIIEntity]= [] categories_detected: List[PIICategory]= [] raw_response: Dict[str, Any]= {} error: Optional[str]= None error_code: Optional[str]= None

SecurityCheckResult

Integrated security check result.

@dataclass class SecurityCheckResult: allowed: bool # Allowed / Blocked violations: List[str]= [] # List of violation reasons content_moderation: Optional[ContentModerationResult]= None prompt_shield: Optional[PromptShieldResult]= None pii_detection: Optional[PIIDetectionResult]= None

Configuration

AzureSecurityConfig

class AzureSecurityConfig: content_safety_endpoint: str content_safety_api_key: str # hidden language_endpoint: Optional[str]= None # For PII detection language_api_key: Optional[str]= None # hidden enabled: bool = True moderation_threshold: int = 2 # severity 0-6 pii_enabled: bool = True pii_confidence_threshold: float = 0.7 timeout: float = 30.0

AWSSecurityConfig

class AWSSecurityConfig: aws_access_key_id: str = "" # hidden aws_secret_access_key: str = "" # hidden region_name: str = "us-east-1" guardrail_id: str = "" guardrail_version: str = "DRAFT" enabled: bool = False moderation_threshold: int = 2 pii_enabled: bool = True timeout: float = 30.0

GCPSecurityConfig

class GCPSecurityConfig: project_id: str credentials_path: Optional[str]= None # hidden model_armor_template: str = "" model_armor_region: str = "" dlp_location: str = "global" enabled: bool = False moderation_threshold: int = 2 pii_enabled: bool = True timeout: float = 30.0

Clients

AzureSecurityClient

class AzureSecurityClient(SecurityClientBase): def __init__(config: AzureSecurityConfig): ... async def __aenter__() -> AzureSecurityClient: ... async def __aexit__(...) -> None: ... async def check_content_moderation( text: str, threshold: Optional[int]= None ) -> ContentModerationResult: ... async def check_prompt_shield( user_prompt: str, documents: Optional[List[str]]= None ) -> PromptShieldResult: ... async def detect_pii( text: str, mask: bool = True, language: str = "ja", confidence_threshold: Optional[float]= None ) -> PIIDetectionResult: ... async def check_security( text: str, check_moderation: bool = True, check_prompt_shield: bool = True, check_pii: bool = False, fail_on_error: bool = True ) -> SecurityCheckResult: ... async def close() -> None: ...
Returns
check_content_moderation() -> ContentModerationResultblocked (block decision), categories (severity 0-6)
check_prompt_shield() -> PromptShieldResultattack_detected, attack_type, confidence
detect_pii() -> PIIDetectionResultsuccess, entities (list of PIIEntity), masked_text
check_security() -> SecurityCheckResultallowed, violations, individual detection results
# Using PII mask results result = await security.detect_pii("Email: user@example.com", mask=True) print(result.masked_text) # -> "Email: ***" print(result.entities[0].category) # -> PIICategory.EMAIL

AWSSecurityClient / GCPSecurityClient

Same interface as Azure (inherits SecurityClientBase). Implements check_content_moderation(), check_prompt_shield(), detect_pii(), check_security(), close().

Content Moderation + PII DetectionPython
from agenticstar_platform.security import (
AzureSecurityConfig, AzureSecurityClient
)

config = AzureSecurityConfig(
content_safety_endpoint="https://your-cs.cognitiveservices.azure.com/",
content_safety_api_key="your-key",
)

async with AzureSecurityClient(config) as security:
# Content moderation
moderation = await security.check_content_moderation(
text="This is a sample message",
threshold=2,
)

if moderation.blocked:
print(f"Content blocked: {moderation.categories}")

# Integrated check
result = await security.check_security(
text="Contact me at user@example.com or 123-456-7890",
check_pii=True,
)

if not result.allowed:
print(f"Violations: {result.violations}")
if result.pii_detection and result.pii_detection.entities:
print(f"PII found: {result.pii_detection.masked_text}")
Related
RAG ModuleContent validation pipeline before Embedding ingestion
Storage ModulePII screening before file upload
Events ModuleDeliver moderation results as events

common Common Module

Common utilities. SQL injection prevention, secret masking, async context manager mixin.

SecretMasker

Secret value masking. Safely displays passwords and tokens in log output.

class SecretMasker: def __init__(mask_char: str = "*", visible_chars: int = 4): ... def mask(value: str) -> str: ...
def mask_secret(value: str, visible_chars: int = 4) -> str

Shortcut function. Equivalent to SecretMasker().mask(value).

Secret MaskingPython
from agenticstar_platform.common import mask_secret

masked = mask_secret("my-secret-api-key-12345")
# -> "my-s********************"

Validation Functions

Validation functions for SQL injection prevention. Used automatically within DataAccess, but can also be used directly when building custom queries.

def validate_identifier(name: str) -> str

Validates table and column names. Only alphanumeric characters and underscores are allowed. Raises ValueError for invalid input.

def validate_identifiers(names: List[str]) -> List[str]

Batch validation of multiple identifiers.

def validate_order_by(order_by: str) -> str

Validates ORDER BY clauses. Only column_name ASC|DESC format is allowed.

ValidationPython
from agenticstar_platform.common import validate_identifier, validate_order_by

table = validate_identifier("users") # OK: "users"
validate_identifier("users; DROP TABLE --") # -> ValueError

order = validate_order_by("created_at DESC") # OK: "created_at DESC"

AsyncContextManagerMixin

Mixin class for async context managers. Used internally by the SDK to uniformly implement the async with pattern.

class AsyncContextManagerMixin: async def __aenter__(self) -> Self: ... async def __aexit__(...) -> None: ...

Error Handling

All modules use a common exception handling pattern. Each module has its own base exception class, with subclasses to distinguish specific errors.

Exception Hierarchy

Exception ├── DatabaseError │ ├── ConnectionError │ ├── QueryError │ └── ConfigError ├── RAGError (VectorStoreError) │ ├── EmbeddingError │ ├── QdrantError (VectorStoreConnectionError) │ ├── SearchError │ └── VectorStoreConfigError ├── StorageError │ ├── StorageConfigError │ ├── StorageConnectionError │ └── StorageOperationError ├── AuthError │ ├── AuthConfigError │ └── AuthAPIError │ ├── AuthUnauthorizedError (401) │ ├── AuthNotFoundError (404) │ └── AuthRateLimitError (429) ├── SemanticMemoryError │ └── SemanticMemoryConfigError └── SecurityError ├── SecurityConfigError └── SecurityAPIError

Common Pattern

Across all modules, catch exceptions in order: specific exception -> base exception -> Exception.

Multi-Module Error HandlingPython
from agenticstar_platform.storage import StorageError, StorageOperationError
from agenticstar_platform.auth import AuthError, AuthAPIError
from agenticstar_platform.security import SecurityError, SecurityAPIError

async def process_document(doc_id: str):
try:
# 1. Retrieve document from DB
doc = await db.fetch_one(
"SELECT * FROM documents WHERE id = $1", (doc_id,)
)

# 2. Security check
moderation = await security.check_content_moderation(doc["content"])
if moderation.blocked:
raise ValueError("Content policy violation")

# 3. Generate Embedding + save to vector DB
await qdrant.upsert(
point_id=doc_id,
text=doc["content"],
payload={"source": "documents"}
)

# 4. Save processing result to storage
await storage.upload_bytes(
data=doc["content"].encode(),
object_name=f"processed/{doc_id}.txt"
)

except StorageOperationError as e:
# Storage operation error -> determine by error_code
logger.error(f"Storage op failed: {e.error_code}")
except AuthAPIError as e:
# Auth API error -> determine by status_code
logger.error(f"Auth failed: {e.status_code}")
except (StorageError, AuthError, SecurityError) as e:
# Module base exception fallback
logger.error(f"Module error: {type(e).__name__}: {e}")
except Exception as e:
# Unexpected error
logger.critical(f"Unexpected: {e}")
raise

Summary

AGENTIC STAR Platform SDK v0.5.3 is a comprehensive toolkit consisting of the following 8 specialized modules:

  • Events — Async event delivery (SSE / Webhook / DB)
  • Database — PostgreSQL + Azure AD auth + specialized access layers (Config / Execution / Telemetry / PodRuntime)
  • RAG — Embedding + Vector Search (Qdrant)
  • Storage — Cloud Storage (Azure / S3 / GCS)
  • Auth — AGENTIC STAR authentication service integration
  • Memory — Semantic Memory (Mem0 + Qdrant)
  • Security — Content Moderation + PII Detection (Azure / AWS / GCP multi-cloud support)
  • Common — Common utilities (SQL injection prevention, secret masking)

Each module can be installed and used independently, and by combining them you can build powerful AI agent applications. The SDK provides infrastructure only, and developers freely design agent logic.