Complete specifications for all modules, classes, and methods of agenticstar-platform SDK (v0.5.3).
Installation
Install the SDK using pip.
pip install agenticstar-platform == 0.5 .3
To install only specific modules, specify extras.
Extra 説明 PostgreSQL database support RAG (embedding + vector search) support Cloud storage (Azure, S3, GCS) support Security (content moderation, PII detection) support AGENTIC STAR Auth service integration Semantic Memory (Mem0 + Qdrant) support All modules
pip install agenticstar-platform [ rag ] == 0.5 .3 pip install agenticstar-platform [ db,rag,storage ] == 0.5 .3 pip install agenticstar-platform [ all ] == 0.5 .3
Module List
The SDK consists of the following 8 modules.
Module Description Key Classes events Async event delivery system (SSE / Webhook) EventEmitter, StreamingEvent db PostgreSQL connection + Azure AD auth PostgreSQLManager, DataAccess rag RAG (embedding + vector search via Qdrant) EmbeddingGenerator, QdrantManager storage Cloud storage (Azure, S3, GCS) AzureBlobStorageClient, S3StorageClient auth AGENTIC STAR Auth API client AgenticStarAuthClient memory Semantic memory (Mem0 + Qdrant) SemanticMemoryClient security Content Moderation + PII Detection AzureSecurityClient, ContentSafetyValidator common Common utilities (validation, secret masking) SecretMasker, validate_identifier
events Events Module
Async event delivery system. Emit events via EventEmitter and deliver them to SSE / Webhook / DB.
Enums
EventType
Event types displayed in the frontend UI.
PHASE_START = "phase_start"
PROGRESS_UPDATE = "progress_update"
THOUGHT_MESSAGE = "thought_message"
COMPLETION_SUCCESS = "completion_success"
COMPLETION_FAILURE = "completion_failure"
USER_INTERACTION_REQUIRED = "user_interaction_required"
UNEXPECTED_ERROR = "unexpected_error"
HITL_REQUIRED_BROWSER_VNC = "hitl_required_browser_vnc"
HITL_REQUIRED_BROWSER_CLI = "hitl_required_browser_cli"
HITL_COMPLETED = "hitl_completed"
FILE_CREATED = "file_created"
PERMISSION_REQUEST = "permission_request"
PERMISSION_RESPONSE = "permission_response"
TOOL_START = "tool_start"
TOOL_RESULT = "tool_result"
FINAL_RESULT = "final_result"
PROGRESS_MESSAGE = "progress_message"
SubEventType
Sub-event types for granular frontend UI control.
SEARCH_WEB = "search_web"
COMMAND_EXECUTION = "command_execution"
FILE_OPERATION = "file_operation"
LOCAL_ASSISTANT = "local_assistant"
MCP_TOOL = "mcp_tool"
A2A_TOOL = "a2a_tool"
FILE_EDITED = "file_edited"
FILE_READ = "file_read"
FILE_SEARCHED = "file_searched"
BASH_EXECUTED = "bash_executed"
WEB_FETCHED = "web_fetched"
TASK_LAUNCHED = "task_launched"
TODO_UPDATED = "todo_updated"
VIDEO_GENERATED = "video_generated"
IMAGE_GENERATED = "image_generated"
SLIDE_CREATED = "slide_created"
MACOS_AUTOMATION = "macos_automation"
Data Classes
StreamingEvent
Event data for real-time SSE delivery.
@dataclass
class StreamingEvent:
event_type: EventType
execution_id: str
message: str
timestamp: float
metadata: Optional[Dict[str, Any]] = None
sub_event_type: Optional[SubEventType] = None
def to_dict() -> Dict[str, Any] : ...
SequencedEvent
Event with ordering guarantee.
@dataclass
class SequencedEvent:
sequence: int # >= 0
event_type: EventType
execution_id: str
message: str
timestamp: float
metadata: Optional[Dict[str, Any]] = None
sub_event_type: Optional[SubEventType] = None
def __post_init__(): ... # Validates sequence >= 0
def to_streaming_event() -> StreamingEvent : ...
def to_dict() -> Dict[str, Any] : ...
ExecutionMessage
Database-stored message for marketplace UI integration.
@dataclass
class ExecutionMessage:
user_id: str
conversation_id: str
message_id: str
chunk_type: str
content_data: Dict[str, Any]
def to_dict() -> Dict[str, Any] : ...
@classmethod
def from_streaming_event(
event: StreamingEvent,
user_id: str,
conversation_id: str,
message_id: str,
chunk_type: Optional[str] = None
) -> ExecutionMessage : ...
Classes
EventEmitter
Async event queue. Emits and consumes events in a non-blocking manner.
class EventEmitter:
def __init__(
execution_id: str,
handler: Optional[EventHandler] = None
): ...
@property
def is_completed() -> bool: ...
@property
def sequence_number() -> int: ...
async def emit_event(
event_type: EventType,
message: str,
metadata: Optional[Dict[str, Any]] = None ,
sub_event_type: Optional[SubEventType] = None
) -> None : ...
async def consume_events(
timeout: float = 0.1
) -> AsyncGenerator[str, None ]: ...
def mark_completed() -> None : ...
async def cleanup() -> None : ...
async def __aenter__() -> EventEmitter : ...
async def __aexit__(...) -> None : ...
Returns
emit_event() -> None Adds an event to the queue. If a handler is set, it is called asynchronously
consume_events() -> AsyncGenerator[str, None] Asynchronously yields SSE-formatted strings. Stops automatically on completion
✓
Context manager supported: async with EventEmitter(...) as emitter:
from agenticstar_platform import EventEmitter , EventType emitter = EventEmitter ( execution_id = "exec-abc-123" ) await emitter . emit_event ( event_type = EventType . PHASE_START , message = "Starting document analysis" , metadata = { "phase" : "analysis" , "total_pages" : 42 } ) async for chunk in emitter . consume_events ( ) : print ( chunk )
DatabaseEventHandler
Persists events to the PostgreSQL execution_messages table.
class DatabaseEventHandler:
def __init__(
data_access: Any,
user_id: str,
conversation_id: str,
message_id: str,
table_name: str = "execution_messages" ,
chunk_type_map: Optional[Dict[EventType, str]] = None
): ...
async def __call__(event: StreamingEvent ) -> Optional[str] : ...
WebhookEventHandler
Sends events to an HTTP webhook endpoint.
class WebhookEventHandler:
def __init__(
webhook_url: str,
conversation_id: str,
message_id: str,
headers: Optional[Dict[str, str]] = None ,
timeout_seconds: int = 30 ,
message_type: str = "append_message" ,
token_provider: Optional[Callable] = None ,
chunk_type_map: Optional[Dict[EventType, str]] = None
): ...
async def __call__(event: StreamingEvent ) -> Optional[str] : ...
CompositeEventHandler
Executes multiple EventHandlers in parallel.
class CompositeEventHandler:
def __init__(
handlers: List[Callable],
return_first_chunk: bool = True
): ...
async def __call__(event: StreamingEvent ) -> Optional[str] : ...
Factory Functions
def create_sse_handler() -> EventHandler
Creates a handler in SSE (Server-Sent Events) format.
def create_json_handler() -> EventHandler
Creates a handler in JSON format.
def create_marketplace_handler(
data_access: Any,
webhook_url: str,
user_id: str,
conversation_id: str,
message_id: str,
token_provider: Optional[Callable] = None
) -> CompositeEventHandler
Creates a composite handler (DB + Webhook) for marketplace UI.
Protocol
EventHandler
Generic interface for event processing.
class EventHandler(Protocol):
async def __call__(
self , event: StreamingEvent
) -> Optional[str] : ...
db Database Module
PostgreSQL connection pool + Azure AD auth.
Exceptions
DatabaseError (Exception)
├── ConnectionError
├── QueryError
└── ConfigError
Configuration
AzureADConfig
Azure Active Directory authentication configuration.
@dataclass
class AzureADConfig:
tenant_id: str
client_id: str
client_secret: str # hidden
username: str = ""
@classmethod
def from_dict(data: Dict[str, Any] ) -> AzureADConfig : ...
@classmethod
def from_toml(toml_path: str, section: str = "azure_ad" ) -> AzureADConfig : ...
PostgreSQLConfig
PostgreSQL connection configuration (with Azure AD support).
class PostgreSQLConfig:
provider: str = "postgresql"
host: str = "localhost"
port: int = 5432
database: str = "agenticstar_db"
use_azure_ad: bool = False
username: Optional[str] = None
password: Optional[str] = None # hidden
pool_min_size: int = 5
pool_max_size: int = 20
command_timeout: int = 60
pool_timeout: Optional[int] = 30
max_overflow: Optional[int] = 10
azure_ad: Optional[AzureADConfig] = None
api_proxy_url: Optional[str] = None
ssl_mode: str = "require"
@classmethod
def from_dict(data: Dict[str, Any] ) -> PostgreSQLConfig : ...
@classmethod
def from_toml(toml_path: str, section: str = "database" ) -> PostgreSQLConfig : ...
@classmethod
def from_env(prefix: str = "DB_" ) -> PostgreSQLConfig : ...
TOML configuration example:
[database] provider = "postgresql" host = "db.example.com" port = 5432 database = "agenticstar_db" use_azure_ad = true pool_min_size = 5 pool_max_size = 20 command_timeout = 60 [azure_ad] tenant_id = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" client_id = "yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy" client_secret = "your-secret-here" username = "dbuser@example.onmicrosoft.com"
Connection Management
PostgreSQLManager
Async PostgreSQL connection pool (with Azure AD token management).
class PostgreSQLManager:
def __init__(config: PostgreSQLConfig ): ...
@property
def config() -> PostgreSQLConfig : ...
@property
def pool() -> Optional[asyncpg .Pool]: ...
async def __aenter__() -> PostgreSQLManager : ...
async def __aexit__(...) -> None : ...
async def initialize() -> None : ...
async def close() -> None : ...
async def execute_query(
query: str,
params: tuple = ()
) -> Dict[str, Any] : ...
async def fetch_one(
query: str,
params: tuple = ()
) -> Optional[Dict[str, Any]] : ...
async def fetch_all(
query: str,
params: tuple = ()
) -> List[Dict[str, Any]] : ...
async def execute(
query: str,
params: tuple = ()
) -> str: ...
Returns
execute_query() -> Dict[str, Any] {"success": bool, "data": list, "count": int}
fetch_one() -> Optional[Dict[str, Any]] Single row result dict, or None if not found
fetch_all() -> List[Dict[str, Any]] List of result rows (empty list if 0 rows)
execute() -> str PostgreSQL status string (e.g., "INSERT 0 1")
✓
Context manager supported: async with PostgreSQLManager(config) as mgr:
from agenticstar_platform . db import PostgreSQLConfig , PostgreSQLManager config = PostgreSQLConfig . from_toml ( "config.toml" ) async with PostgreSQLManager ( config ) as mgr : await mgr . initialize ( ) result = await mgr . fetch_all ( "SELECT * FROM conversations WHERE user_id = $1" , ( "user-123" , ) ) await mgr . execute ( "INSERT INTO events (event_type, data) VALUES ($1, $2)" , ( "phase_start" , '{}' ) )
Data Access Layer
DataAccess
Generic table operation wrapper with SQL injection prevention.
class DataAccess:
def __init__(
db: PostgreSQLManager | ApiPostgreSQLManager
): ...
async def __aenter__() -> DataAccess : ...
async def __aexit__(...) -> None : ...
async def initialize() -> None : ...
async def close() -> None : ...
def is_initialized() -> bool: ...
async def ensure_initialized() -> None : ...
async def execute_query(
query: str,
params: tuple = ()
) -> Dict[str, Any] : ...
async def insert(
table: str,
data: Dict[str, Any],
returning: Optional[List[str]] = None
) -> Dict[str, Any] : ...
async def upsert(
table: str,
data: Dict[str, Any],
conflict_columns: List[str],
update_columns: Optional[List[str]] = None ,
returning: Optional[List[str]] = None
) -> Dict[str, Any] : ...
async def select(
table: str,
columns: Optional[List[str]] = None ,
where: Optional[Dict[str, Any]] = None ,
order_by: Optional[str] = None ,
limit: Optional[int] = None
) -> Dict[str, Any] : ...
async def select_one(
table: str,
columns: Optional[List[str]] = None ,
where: Optional[Dict[str, Any]] = None
) -> Optional[Dict[str, Any]] : ...
async def update(
table: str,
data: Dict[str, Any],
where: Dict[str, Any],
returning: Optional[List[str]] = None
) -> Dict[str, Any] : ...
async def delete(
table: str,
where: Dict[str, Any],
returning: Optional[List[str]] = None
) -> Dict[str, Any] : ...
Returns
insert() / upsert() -> Dict[str, Any] {"success": bool, "data": dict}. Contains column values when returning is specified
select() -> Dict[str, Any] {"success": bool, "data": list, "count": int}
update() / delete() -> Dict[str, Any] {"success": bool, "affected_rows": int}
Key method usage examples:
# INSERT
result = await da.insert("users", {"name": "Alice"}, returning=["id"])
# UPSERT (update on conflict)
result = await da.upsert("users", {"email": "a@example.com", "name": "Alice"}, conflict_columns=["email"])
# SELECT with WHERE + ORDER + LIMIT
result = await da.select("users", columns=["id", "name"], where={"active": True}, order_by="created_at DESC", limit=10)
ConfigAccess
Access layer for configuration information such as agents, tools, guardrails, and MCP.
class ConfigAccess:
def __init__(db: DataAccess ): ...
async def get_agent_instructions(agent_type: str) -> Dict[str, Any] : ...
async def get_all_agent_configs() -> Dict[str, Any] : ...
async def get_agent_config(agent_type: str, requested_level: str = "default" ) -> Dict[str, Any] : ...
async def get_tool_config(tool_name: str) -> Dict[str, Any] : ...
async def get_all_tool_configs() -> Dict[str, Any] : ...
async def get_banned_urls() -> Dict[str, Any] : ...
async def get_guardrails_settings() -> Dict[str, Any] : ...
async def get_system_settings() -> Dict[str, Any] : ...
async def get_mcp_configurations(execution_mode: Optional[str] = None ) -> Dict[str, Any] : ...
Extending Blocked URL granularity
get_banned_urls() returns the list of blocked URL patterns configured in the admin panel. The basic feature (ExtAuth Service) provides domain-level restriction, but by matching the retrieved patterns against URL strings inside your agent tools (e.g., using re.search), you can implement URL (path) level restriction as an extended capability. Settings are unified through the admin panel, so there is no duplicated operational configuration.
ExecutionAccess
Execution message retrieval layer.
class ExecutionAccess:
def __init__(db: DataAccess ): ...
async def get_messages(
execution_id: str
) -> Optional[List[Dict[str, Any]]] : ...
TelemetryAccess
NIST audit telemetry storage and retrieval layer.
class TelemetryAccess:
def __init__(db: DataAccess ): ...
async def save_telemetry(
telemetry_data: Dict[str, Any]
) -> Dict[str, Any] : ...
async def list_telemetry(
conversation_id: Optional[str] = None ,
service: Optional[str] = None ,
limit: int = 100 ,
offset: int = 0
) -> Dict[str, Any] : ...
async def get_telemetry(
telemetry_id: str
) -> Optional[Dict[str, Any]] : ...
PodRuntime
Pod lifecycle management (startup notification, shutdown notification, auto scale-down).
class PodRuntime:
def __init__(
db: DataAccess,
execution_id: str,
pod_name: str
): ...
async def start() -> Dict[str, Any] : ...
async def final(status: str = "completed" ) -> Dict[str, Any] : ...
ApiPostgreSQLManager
PostgreSQL access via HTTP API proxy (for CLI mode). Has the same async interface as PostgreSQLManager.
class ApiPostgreSQLManager:
def __init__(api_url: str, token_provider: Optional[Callable] = None ): ...
async def fetch_one(query: str, params: tuple = ()) -> Optional[Dict[str, Any]] : ...
async def fetch_all(query: str, params: tuple = ()) -> List[Dict[str, Any]] : ...
async def execute(query: str, params: tuple = ()) -> str: ...
async def execute_query(query: str, params: tuple = ()) -> Dict[str, Any] : ...
Factory Function
def create_postgresql_manager(
config: PostgreSQLConfig
) -> Union[PostgreSQLManager, ApiPostgreSQLManager]
Automatically selects and returns PostgreSQLManager (direct connection) or ApiPostgreSQLManager (API proxy) based on whether config.api_proxy_url is set.
Error Handling
Database operation exception handling.
from agenticstar_platform . db import PostgreSQLConfig , PostgreSQLManager , DataAccess config = PostgreSQLConfig . from_toml ( "config.toml" ) try : db_manager = PostgreSQLManager ( config ) da = DataAccess ( db_manager ) await da . initialize ( ) result = await da . select ( "users" , where = { "id" : "user-123" } ) if result [ "success" ] and result [ "data" ] : print ( f"User: { result [ 'data' ] [ 0 ] [ 'username' ] } " ) except Exception as e : print ( f"Database error: { e } " ) finally : await da . close ( )
Related
Events Module — Persist events to DB with DatabaseEventHandler
RAG Module — Convert data retrieved from DB into Embeddings for searchability
rag RAG Module
RAG (Retrieval-Augmented Generation): Embedding + Vector Search via Qdrant.
Enums
VectorStoreProvider
QDRANT = "qdrant"
WEAVIATE = "weaviate"
MILVUS = "milvus"
Exceptions
RAGError (Exception)
├── EmbeddingError
├── QdrantError
└── SearchError
Data Classes
SearchResult
Single search result.
@dataclass
class SearchResult:
point_id: str
payload: Dict[str, Any]
score: float
similarity: float = 0.0
UpsertResult
Upsert operation result.
@dataclass
class UpsertResult:
success: bool
point_id: str = ""
error: Optional[str] = None
error_code: Optional[str] = None
SearchResponse
Search operation result.
@dataclass
class SearchResponse:
success: bool
results: List[SearchResult] = field(default_factory=list)
total_found: int = 0
query: str = ""
error: Optional[str] = None
error_code: Optional[str] = None
Embedding Configuration
EmbeddingConfig
Azure OpenAI embedding configuration.
class EmbeddingConfig:
base_url: str
api_key: str # hidden
model: str = "text-embedding-ada-002"
api_version: str = "2024-02-15-preview"
max_cache_size: int = 10000
dimensions: int = 1536
max_retries: int = 5
base_delay: float = 1.0
max_delay: float = 60.0
@classmethod
def from_dict(data: Dict[str, Any] ) -> EmbeddingConfig : ...
@classmethod
def from_toml(toml_path: str, section: str = "rag.embedding" ) -> EmbeddingConfig : ...
EmbeddingGenerator
Embedding generation with caching and retry logic.
class EmbeddingGenerator:
def __init__(config: EmbeddingConfig ): ...
async def generate(text: str) -> List[float] : ...
async def generate_batch(
texts: List[str],
batch_size: int = 25
) -> List[List[float]] : ...
def clear_cache() -> None : ...
Returns
generate() -> List[float] Vector (dimensions per config.dimensions, default 1536). Returns immediately if cached
generate_batch() -> List[List[float]] List of vectors in input text order. API calls in batch_size units (default 25)
# Single text Embedding
vector = await generator.generate("Hello, world!") # -> [0.012, -0.034, ...]
# Batch Embedding
vectors = await generator.generate_batch(["text1", "text2", "text3"])
Vector Database
QdrantConfig
Qdrant database configuration.
class QdrantConfig:
url: str
collection_name: str
vector_size: int = 1536
distance: str = "cosine" # cosine, euclid, dot
on_disk_vectors: bool = False
on_disk_payload: bool = True
hnsw_m: int = 16
hnsw_ef_construct: int = 256
payload_indexes: List[PayloadIndexConfig]
auth_token_provider: Optional[Callable[[], str]] = None
@classmethod
def from_dict(data: Dict[str, Any] ) -> QdrantConfig : ...
@classmethod
def from_toml(toml_path: str, section: str = "rag.qdrant" ) -> QdrantConfig : ...
PayloadIndexConfig
Payload field index configuration.
@dataclass
class PayloadIndexConfig:
field_name: str
field_schema: str = "keyword" # keyword, integer, float, bool
@classmethod
def from_dict(data: Dict[str, Any] ) -> PayloadIndexConfig : ...
TOML configuration example:
[embedding] provider = "openai" model = "text-embedding-3-small" api_key = "sk-..." dimension = 1536 [qdrant] host = "qdrant.example.com" port = 6333 api_key = "your-qdrant-key" use_ssl = true
QdrantManager
Qdrant vector search client.
class QdrantManager:
def __init__(
config: QdrantConfig,
embedding_generator: EmbeddingGenerator
): ...
async def __aenter__() -> QdrantManager : ...
async def __aexit__(...) -> None : ...
def is_initialized() -> bool: ...
async def initialize() -> None : ...
async def ensure_initialized() -> None : ...
async def upsert(
point_id: str,
text: str,
payload: Dict[str, Any]
) -> Dict[str, Any] : ...
async def batch_upsert(
items: List[Dict[str, Any]],
batch_size: int = 256
) -> Dict[str, Any] : ...
async def search(
query_text: str,
limit: int = 10 ,
score_threshold: float = 0.4 ,
filter_conditions: Optional[Dict[str, Any]] = None
) -> Dict[str, Any] : ...
async def delete(
point_ids: List[str]
) -> Dict[str, Any] : ...
async def get_statistics() -> Dict[str, Any] : ...
async def close() -> None : ...
Returns
upsert() -> Dict[str, Any] {"success": bool, "point_id": str}
batch_upsert() -> Dict[str, Any] {"success": bool, "upserted_count": int, "errors": list}
search() -> Dict[str, Any] {"success": bool, "data": List[SearchResult], "total_found": int}
get_statistics() -> Dict[str, Any] {"vectors_count": int, "indexed_vectors_count": int, "points_count": int, ...}
✓
Context manager supported: async with QdrantManager(config, generator) as qdrant:
from agenticstar_platform . rag import ( EmbeddingConfig , EmbeddingGenerator , QdrantConfig , QdrantManager ) emb_config = EmbeddingConfig . from_toml ( "config.toml" ) qd_config = QdrantConfig . from_toml ( "config.toml" ) generator = EmbeddingGenerator ( emb_config ) async with QdrantManager ( qd_config , generator ) as qdrant : await qdrant . upsert ( point_id = "doc-001" , text = "AGENTIC STAR is an AI agent development platform" , payload = { "source" : "docs" , "category" : "overview" } ) results = await qdrant . search ( query_text = "What is an AI platform?" , limit = 5 , score_threshold = 0.5 ) for r in results [ "data" ] : print ( f"Score: { r [ 'score' ] } , Text: { r [ 'payload' ] [ 'text' ] } " )
storage Storage Module
Cloud storage integration (Azure Blob Storage / AWS S3 / Google Cloud Storage).
Enums
StorageProvider
Cloud storage provider type.
AZURE_BLOB = "azure_blob"
AWS_S3 = "aws_s3"
GCS = "gcs"
Exceptions
StorageError (Exception) - Base exception
├── StorageConfigError - Configuration error
├── StorageConnectionError - Connection error
└── StorageOperationError - Operation error with error_code attribute
Result Types
UploadResult
File upload result.
@dataclass
class UploadResult:
success: bool # Upload succeeded
object_name: str = "" # Object name
object_url: str = "" # Object URL
file_size: int = 0 # File size
content_type: str = "" # Content type
error: Optional[str] = None # Error message
error_code: Optional[str] = None # Error code
metadata: Dict[str, Any] = field(default_factory=dict) # Metadata
DownloadResult
File download result.
@dataclass
class DownloadResult:
success: bool # Download succeeded
object_name: str = "" # Object name
local_path: str = "" # Local file path
file_size: int = 0 # File size
error: Optional[str] = None # Error message
error_code: Optional[str] = None # Error code
ObjectInfo
Cloud object metadata.
@dataclass
class ObjectInfo:
name: str # Object name
size: int # Object size
last_modified: Optional[str] = None # Last modified timestamp
content_type: Optional[str] = None # Content type
metadata: Dict[str, Any] = field(default_factory=dict) # Metadata
ListResult
List operation result.
@dataclass
class ListResult:
success: bool # Operation succeeded
objects: List[ObjectInfo] = field(default_factory=list) # Objects
count: int = 0 # Object count
prefix: str = "" # Search prefix
error: Optional[str] = None # Error message
error_code: Optional[str] = None # Error code
Configuration
StorageConfig
Common storage configuration (base class).
class StorageConfig:
provider: StorageProvider # Provider type
bucket_name: str # Bucket/container name
enabled: bool = True # Enabled flag
max_file_size: int = 5 *1024 *1024 *1024 # Max file size (5GB)
auto_create_bucket: bool = False # Auto-create bucket
prefix: str = "" # Object name prefix
custom_domain: Optional[str] = None # Custom domain
connection_timeout: int = 30 # Connection timeout (seconds)
read_timeout: int = 300 # Read timeout (seconds)
AzureBlobConfig
Azure Blob Storage configuration.
class AzureBlobConfig:
bucket_name: str # Container name
connection_string: str # hidden
enabled: bool = True
max_file_size: int = 5 *1024 *1024 *1024
auto_create_bucket: bool = False
prefix: str = ""
custom_domain: Optional[str] = None
connection_timeout: int = 30
read_timeout: int = 300
max_block_size: int = 8 *1024 *1024 # Max block size (8MB)
max_single_put_size: int = 256 *1024 *1024 # Max single upload (256MB)
@classmethod
def from_dict(data: Dict[str, Any] ) -> AzureBlobConfig : ...
@property
def provider(self ) -> StorageProvider : return StorageProvider.AZURE_BLOB
S3Config
AWS S3 configuration.
class S3Config:
bucket_name: str # Bucket name
aws_access_key_id: str # hidden
aws_secret_access_key: str # hidden
region_name: str = "us-east-1" # AWS region
enabled: bool = True
max_file_size: int = 5 *1024 *1024 *1024
auto_create_bucket: bool = False
prefix: str = ""
custom_domain: Optional[str] = None
connection_timeout: int = 30
read_timeout: int = 300
endpoint_url: Optional[str] = None # S3-compatible endpoint (MinIO, etc.)
@classmethod
def from_dict(data: Dict[str, Any] ) -> S3Config : ...
@property
def provider(self ) -> StorageProvider : return StorageProvider.AWS_S3
GCSConfig
Google Cloud Storage configuration.
class GCSConfig:
bucket_name: str # Bucket name
project_id: str # GCP project ID
enabled: bool = True
max_file_size: int = 5 *1024 *1024 *1024
auto_create_bucket: bool = False
prefix: str = ""
custom_domain: Optional[str] = None
connection_timeout: int = 30
read_timeout: int = 300
credentials_path: Optional[str] = None # hidden - Path to service account JSON
credentials_json: Optional[str] = None # hidden - Service account JSON string
credentials_base64: Optional[str] = None # hidden - Base64-encoded service account JSON
@classmethod
def from_dict(data: Dict[str, Any] ) -> GCSConfig : ...
@property
def provider(self ) -> StorageProvider : return StorageProvider.GCS
Storage Clients
AzureBlobStorageClient
Azure Blob Storage client. Supports async context manager (async with).
class AzureBlobStorageClient:
def __init__(config: AzureBlobConfig ): ...
async def initialize() -> None : ... # Initialize client
async def ensure_initialized() -> None : ... # Initialize if needed
async def upload_file(
file_path: str,
prefix: Optional[str] = None
) -> UploadResult : ...
async def upload_bytes(
data: bytes,
object_name: str,
content_type: str = "application/octet-stream"
) -> UploadResult : ...
async def download_file(
object_name: str,
local_path: str
) -> DownloadResult : ...
async def list_objects(
prefix: str = ""
) -> ListResult : ...
async def delete_object(
object_name: str
) -> Dict[str, Any] : ...
async def delete_objects(
object_names: List[str]
) -> Dict[str, Any] : ...
async def close() -> None : ...
Returns
upload_file() / upload_bytes() -> UploadResult Contains success, object_url, file_size, content_type
download_file() -> DownloadResult Contains success, local_path, file_size
list_objects() -> ListResult Contains success, objects: List[ObjectInfo], count
delete_object() -> Dict[str, Any] {"success": bool, "object_name": str}
S3StorageClient
AWS S3 storage client. Supports async context manager (async with).
class S3StorageClient:
def __init__(config: S3Config ): ...
async def initialize() -> None : ... # Initialize client
async def ensure_initialized() -> None : ... # Initialize if needed
async def upload_file(
file_path: str,
prefix: Optional[str] = None
) -> UploadResult : ...
async def upload_bytes(
data: bytes,
object_name: str,
content_type: str = "application/octet-stream"
) -> UploadResult : ...
async def download_file(
object_name: str,
local_path: str
) -> DownloadResult : ...
async def list_objects(
prefix: str = ""
) -> ListResult : ...
async def delete_object(
object_name: str
) -> Dict[str, Any] : ...
async def delete_objects(
object_names: List[str]
) -> Dict[str, Any] : ...
async def close() -> None : ...
GCSStorageClient
Google Cloud Storage client. Supports async context manager (async with).
class GCSStorageClient:
def __init__(config: GCSConfig ): ...
async def initialize() -> None : ... # Initialize client
async def ensure_initialized() -> None : ... # Initialize if needed
async def upload_file(
file_path: str,
prefix: Optional[str] = None
) -> UploadResult : ...
async def upload_bytes(
data: bytes,
object_name: str,
content_type: str = "application/octet-stream"
) -> UploadResult : ...
async def download_file(
object_name: str,
local_path: str
) -> DownloadResult : ...
async def list_objects(
prefix: str = ""
) -> ListResult : ...
async def delete_object(
object_name: str
) -> Dict[str, Any] : ...
async def delete_objects(
object_names: List[str]
) -> Dict[str, Any] : ...
async def close() -> None : ...
from agenticstar_platform . storage import ( S3Config , S3StorageClient ) config = S3Config . from_dict ( { "bucket_name" : "my-bucket" , "aws_access_key_id" : "AKIA..." , "aws_secret_access_key" : "secret..." , "region_name" : "ap-northeast-1" } ) async with S3StorageClient ( config ) as storage : upload_result = await storage . upload_file ( file_path = "/tmp/report.pdf" , prefix = "documents/" ) if upload_result . success : print ( f"URL: { upload_result . object_url } " ) await storage . upload_bytes ( data = b"Hello, World!" , object_name = "greetings/hello.txt" , content_type = "text/plain" ) list_result = await storage . list_objects ( prefix = "documents/" ) for obj in list_result . objects : print ( f" { obj . name } ( { obj . size } bytes)" ) download = await storage . download_file ( object_name = "documents/report.pdf" , local_path = "/tmp/downloaded.pdf" )
from agenticstar_platform . storage import ( StorageError , StorageConfigError , StorageConnectionError , StorageOperationError ) try : result = await storage . upload_file ( "/tmp/large-file.zip" ) except StorageConnectionError as e : print ( f"Connection failed: { e } " ) except StorageOperationError as e : print ( f"Operation failed: { e } , code: { e . error_code } " ) except StorageError as e : print ( f"Storage error: { e } " )
Related
RAG Module — Retrieve documents from storage and feed into Embedding
Security Module — Content moderation before upload
auth Auth Module
AGENTIC STAR authentication service integration. Manages users, tokens, and device information.
Exceptions
AuthError (Exception)
├── AuthConfigError
└── AuthAPIError
├── AuthUnauthorizedError (401)
├── AuthNotFoundError (404)
└── AuthRateLimitError (429)
Configuration
AgenticStarAuthConfig
class AgenticStarAuthConfig:
base_url: str
api_key: str # hidden
auth_type: str = "api_key"
timeout: float = 30.0
@classmethod
def create(
base_url: str,
api_key: str,
auth_type: str = "api_key" ,
timeout: float = 30.0
) -> AgenticStarAuthConfig : ...
@classmethod
def from_config(config_path: Optional[Path] = None ) -> AgenticStarAuthConfig : ...
Data Models
ApiUser
Authenticated user information (Pydantic BaseModel).
class ApiUser(BaseModel):
id: str
username: Optional[str] = None
email: Optional[str] = None
first_name: Optional[str] = None
last_name: Optional[str] = None
display_name: Optional[str] = None
email_verified: bool = False
enabled: bool = True
created_timestamp: Optional[int] = None
attributes: Optional[Dict[str, Any]] = None
organization: Optional[str] = None
job: Optional[str] = None
bio: Optional[str] = None
last_login_at: Optional[str] = None
DeviceInfo
Device information.
class DeviceInfo(BaseModel):
id: Optional[str] = None
device_code: str
device_name: Optional[str] = None
device_type: Optional[str] = None
device_info: Optional[Dict[str, Any]] = None
is_active: Optional[bool] = None
created_at: Optional[str] = None
last_used_at: Optional[str] = None
MCPTokenInfo
MCP token information.
class MCPTokenInfo(BaseModel):
access_token: str
token_type: str
expires_at: datetime
scopes: List[str]
Client
AgenticStarAuthClient
class AgenticStarAuthClient:
def __init__(config: AgenticStarAuthConfig ): ...
async def __aenter__() -> AgenticStarAuthClient : ...
async def __aexit__(...) -> None : ...
async def get_users(
page: int = 1 ,
limit: int = 20 ,
search: Optional[str] = None ,
is_approved: Optional[str] = None ,
sort_by: Optional[str] = None ,
order: Optional[str] = None ,
include_last_login: bool = False
) -> GetUsersResult : ...
async def get_user(
user_id: str
) -> GetUserResult : ...
async def get_mcp_tokens(
user_id: str,
providers: Optional[List[str]] = None
) -> GetMCPTokensResult : ...
async def close() -> None : ...
Returns
get_users() -> GetUsersResult success, users: List[ApiUser], pagination: UserPagination
get_user() -> GetUserResult success, user: ApiUser, devices: List[DeviceInfo], login_history
get_mcp_tokens() -> GetMCPTokensResult success, tokens: Dict[str, MCPTokenInfo], errors
# Get user list (with pagination)
result = await auth_client.get_users(page=1, limit=20)
for user in result.users:
print(f"{user.username} ({user.email})")
from agenticstar_platform . auth import ( AgenticStarAuthConfig , AgenticStarAuthClient ) config = AgenticStarAuthConfig . from_config ( "config.toml" ) async with AgenticStarAuthClient ( config ) as auth_client : result = await auth_client . get_user ( "user-123" ) if result . success : print ( f"User: { result . user . username } " ) tokens = await auth_client . get_mcp_tokens ( user_id = "user-123" , providers = [ "slack" , "gitlab" ] , ) for provider , token in tokens . tokens . items ( ) : print ( f" { provider } : expires= { token . expires_at } " )
Related
Events Module — Supply auth tokens to WebhookEventHandler token_provider
Memory Module — Manage semantic memory for authenticated users
memory Memory Module
Semantic memory (Mem0 + Qdrant) integration. Recall user knowledge, preferences, and facts via vector search.
Configuration
SemanticMemoryConfig
@dataclass
class LLMProviderConfig:
model: str # "azure_openai/gpt-4.1"
api_key: str # hidden
base_url: Optional[str] = None
api_version: Optional[str] = None
aws_access_key_id: Optional[str] = None # For Bedrock
aws_secret_access_key: Optional[str] = None # For Bedrock
aws_region_name: Optional[str] = None # For Bedrock
@classmethod
def from_dict(data: Dict[str, Any] ) -> LLMProviderConfig : ...
@classmethod
def from_toml(toml_path: str, section: str) -> LLMProviderConfig : ...
@dataclass
class SemanticMemoryConfig:
llm_config: LLMProviderConfig
embedder_config: LLMProviderConfig
vector_store: Optional[Dict[str, Any]] = None
rerank: Optional[Dict[str, Any]] = None
custom_fact_extraction_prompt: Optional[str] = None
@classmethod
def from_dict(data: Dict[str, Any] ) -> SemanticMemoryConfig : ...
@classmethod
def from_toml(toml_path: str, section: str = "memory" ) -> SemanticMemoryConfig : ...
SemanticMemoryClient
class SemanticMemoryClient:
def __init__(config: SemanticMemoryConfig ): ...
@property
def enabled() -> bool: ...
# Synchronous methods (no await needed)
def add(
messages: List[Dict[str, str]],
user_id: str,
metadata: Optional[Dict[str, Any]] = None
) -> Dict[str, Any] : ...
def search(
query: str,
user_id: str,
limit: int = 10
) -> Dict[str, Any] : ...
def get_all(
user_id: str
) -> Dict[str, Any] : ...
def delete(
memory_id: str
) -> Dict[str, Any] : ...
def delete_all(
user_id: str
) -> Dict[str, Any] : ...
# Asynchronous method
async def cleanup() -> None : ...
Returns
add() -> Dict[str, Any] Mem0 result dict. messages is OpenAI format [{"role": "user", "content": "..."}]
search() -> Dict[str, Any] {"results": [...]}. Each element contains memory, score, etc.
get_all() -> Dict[str, Any] Returns all memories for the user
delete() / delete_all() -> Dict[str, Any] Deletion result
Exceptions
SemanticMemoryError (Exception)
└── SemanticMemoryConfigError
Utility Functions
def normalize_provider(provider: str) -> str
Normalize provider name (alias support, e.g., "Azure_OpenAI" -> "azure_openai").
def parse_model_string(model: str) -> tuple[str, str]
Split model string into provider and model name (e.g., "azure_openai/gpt-4.1" -> ("azure_openai", "gpt-4.1")).
def get_api_key(config: LLMProviderConfig ) -> str
Get API key from LLMProviderConfig (SecretStr compatible).
def convert_llm_to_mem0(llm_config: LLMProviderConfig ) -> Dict[str, Any]
Convert LLMProviderConfig to Mem0 format LLM configuration dict.
def convert_embedder_to_mem0(embedder_config: LLMProviderConfig ) -> Dict[str, Any]
Convert LLMProviderConfig to Mem0 format Embedder configuration dict.
from agenticstar_platform . memory import ( SemanticMemoryConfig , SemanticMemoryClient ) config = SemanticMemoryConfig . from_toml ( "config.toml" ) memory = SemanticMemoryClient ( config ) memory . add ( messages = [ { "role" : "user" , "content" : "I'm proficient in Python" } , { "role" : "assistant" , "content" : "Understood" } , ] , user_id = "user-123" , ) result = memory . search ( query = "What are the user's technical skills?" , user_id = "user-123" ) for r in result . get ( "results" , [ ] ) : print ( f" { r [ 'memory' ] } " ) await memory . cleanup ( )
Related
Auth Module — Use authenticated user ID as memory user_id
RAG Module — Convert memory contents to Embeddings for enhanced similarity search
security Security Module
Content Moderation (Azure / AWS / GCP) + PII Detection.
Enums
SecurityProvider
AZURE = "azure"
AWS = "aws"
GCP = "gcp"
ContentCategory
Content moderation categories.
HATE = "hate"
SEXUAL = "sexual"
SELF_HARM = "self_harm"
VIOLENCE = "violence"
PROFANITY = "profanity"
INSULT = "insult"
THREAT = "threat"
PIICategory
PII (Personally Identifiable Information) categories.
PERSON_NAME = "person_name"
EMAIL = "email"
PHONE_NUMBER = "phone_number"
ADDRESS = "address"
DATE_OF_BIRTH = "date_of_birth"
NATIONAL_ID = "national_id"
PASSPORT_NUMBER = "passport_number"
SOCIAL_SECURITY = "social_security"
CREDIT_CARD = "credit_card"
IP_ADDRESS = "ip_address"
API_KEY = "api_key"
CONNECTION_STRING = "connection_string"
Exceptions
SecurityError (Exception)
├── SecurityConfigError
└── SecurityAPIError (status_code, error_code)
Result Types
ContentModerationResult
Content moderation result. categories values are severity (0-6).
@dataclass
class ContentModerationResult:
blocked: bool # True if above threshold
categories: Dict[ContentCategory, int] = {} # severity 0 -6
threshold: int = 2 # Applied threshold
raw_response: Dict[str, Any] = {}
error: Optional[str] = None
error_code: Optional[str] = None
PromptShieldResult
Prompt injection detection result.
@dataclass
class PromptShieldResult:
attack_detected: bool
attack_type: Optional[str] = None
confidence: float = 0.0
raw_response: Dict[str, Any] = {}
error: Optional[str] = None
error_code: Optional[str] = None
PIIEntity
Detected PII entity.
@dataclass
class PIIEntity:
category: PIICategory
text: str # Detected text
offset: int # Start position in text
length: int # Length of detected text
confidence: float
provider_category: str = "" # Provider-specific category name
PIIDetectionResult
PII detection result.
@dataclass
class PIIDetectionResult:
success: bool
masked_text: str = ""
entities: List[PIIEntity] = []
categories_detected: List[PIICategory] = []
raw_response: Dict[str, Any] = {}
error: Optional[str] = None
error_code: Optional[str] = None
SecurityCheckResult
Integrated security check result.
@dataclass
class SecurityCheckResult:
allowed: bool # Allowed / Blocked
violations: List[str] = [] # List of violation reasons
content_moderation: Optional[ContentModerationResult] = None
prompt_shield: Optional[PromptShieldResult] = None
pii_detection: Optional[PIIDetectionResult] = None
Configuration
AzureSecurityConfig
class AzureSecurityConfig:
content_safety_endpoint: str
content_safety_api_key: str # hidden
language_endpoint: Optional[str] = None # For PII detection
language_api_key: Optional[str] = None # hidden
enabled: bool = True
moderation_threshold: int = 2 # severity 0 -6
pii_enabled: bool = True
pii_confidence_threshold: float = 0.7
timeout: float = 30.0
AWSSecurityConfig
class AWSSecurityConfig:
aws_access_key_id: str = "" # hidden
aws_secret_access_key: str = "" # hidden
region_name: str = "us-east-1"
guardrail_id: str = ""
guardrail_version: str = "DRAFT"
enabled: bool = False
moderation_threshold: int = 2
pii_enabled: bool = True
timeout: float = 30.0
GCPSecurityConfig
class GCPSecurityConfig:
project_id: str
credentials_path: Optional[str] = None # hidden
model_armor_template: str = ""
model_armor_region: str = ""
dlp_location: str = "global"
enabled: bool = False
moderation_threshold: int = 2
pii_enabled: bool = True
timeout: float = 30.0
Clients
AzureSecurityClient
class AzureSecurityClient(SecurityClientBase):
def __init__(config: AzureSecurityConfig ): ...
async def __aenter__() -> AzureSecurityClient : ...
async def __aexit__(...) -> None : ...
async def check_content_moderation(
text: str,
threshold: Optional[int] = None
) -> ContentModerationResult : ...
async def check_prompt_shield(
user_prompt: str,
documents: Optional[List[str]] = None
) -> PromptShieldResult : ...
async def detect_pii(
text: str,
mask: bool = True ,
language: str = "ja" ,
confidence_threshold: Optional[float] = None
) -> PIIDetectionResult : ...
async def check_security(
text: str,
check_moderation: bool = True ,
check_prompt_shield: bool = True ,
check_pii: bool = False ,
fail_on_error: bool = True
) -> SecurityCheckResult : ...
async def close() -> None : ...
Returns
check_content_moderation() -> ContentModerationResult blocked (block decision), categories (severity 0-6)
check_prompt_shield() -> PromptShieldResult attack_detected, attack_type, confidence
detect_pii() -> PIIDetectionResult success, entities (list of PIIEntity), masked_text
check_security() -> SecurityCheckResult allowed, violations, individual detection results
# Using PII mask results
result = await security.detect_pii("Email: user@example.com", mask=True)
print(result.masked_text) # -> "Email: ***"
print(result.entities[0].category) # -> PIICategory.EMAIL
AWSSecurityClient / GCPSecurityClient
Same interface as Azure (inherits SecurityClientBase). Implements check_content_moderation(), check_prompt_shield(), detect_pii(), check_security(), close().
from agenticstar_platform . security import ( AzureSecurityConfig , AzureSecurityClient ) config = AzureSecurityConfig ( content_safety_endpoint = "https://your-cs.cognitiveservices.azure.com/" , content_safety_api_key = "your-key" , ) async with AzureSecurityClient ( config ) as security : moderation = await security . check_content_moderation ( text = "This is a sample message" , threshold = 2 , ) if moderation . blocked : print ( f"Content blocked: { moderation . categories } " ) result = await security . check_security ( text = "Contact me at user@example.com or 123-456-7890" , check_pii = True , ) if not result . allowed : print ( f"Violations: { result . violations } " ) if result . pii_detection and result . pii_detection . entities : print ( f"PII found: { result . pii_detection . masked_text } " )
common Common Module
Common utilities. SQL injection prevention, secret masking, async context manager mixin.
SecretMasker
Secret value masking. Safely displays passwords and tokens in log output.
class SecretMasker:
def __init__(mask_char: str = "*" , visible_chars: int = 4 ): ...
def mask(value: str) -> str: ...
def mask_secret(value: str, visible_chars: int = 4 ) -> str
Shortcut function. Equivalent to SecretMasker().mask(value).
from agenticstar_platform . common import mask_secret masked = mask_secret ( "my-secret-api-key-12345" )
Validation Functions
Validation functions for SQL injection prevention. Used automatically within DataAccess, but can also be used directly when building custom queries.
def validate_identifier(name: str) -> str
Validates table and column names. Only alphanumeric characters and underscores are allowed. Raises ValueError for invalid input.
def validate_identifiers(names: List[str] ) -> List[str]
Batch validation of multiple identifiers.
def validate_order_by(order_by: str) -> str
Validates ORDER BY clauses. Only column_name ASC|DESC format is allowed.
from agenticstar_platform . common import validate_identifier , validate_order_by table = validate_identifier ( "users" ) validate_identifier ( "users; DROP TABLE --" ) order = validate_order_by ( "created_at DESC" )
AsyncContextManagerMixin
Mixin class for async context managers. Used internally by the SDK to uniformly implement the async with pattern.
class AsyncContextManagerMixin:
async def __aenter__(self ) -> Self : ...
async def __aexit__(...) -> None : ...
Error Handling
All modules use a common exception handling pattern. Each module has its own base exception class, with subclasses to distinguish specific errors.
Exception Hierarchy
Exception
├── DatabaseError
│ ├── ConnectionError
│ ├── QueryError
│ └── ConfigError
├── RAGError (VectorStoreError)
│ ├── EmbeddingError
│ ├── QdrantError (VectorStoreConnectionError)
│ ├── SearchError
│ └── VectorStoreConfigError
├── StorageError
│ ├── StorageConfigError
│ ├── StorageConnectionError
│ └── StorageOperationError
├── AuthError
│ ├── AuthConfigError
│ └── AuthAPIError
│ ├── AuthUnauthorizedError (401)
│ ├── AuthNotFoundError (404)
│ └── AuthRateLimitError (429)
├── SemanticMemoryError
│ └── SemanticMemoryConfigError
└── SecurityError
├── SecurityConfigError
└── SecurityAPIError
Common Pattern
Across all modules, catch exceptions in order: specific exception -> base exception -> Exception.
from agenticstar_platform . storage import StorageError , StorageOperationError from agenticstar_platform . auth import AuthError , AuthAPIError from agenticstar_platform . security import SecurityError , SecurityAPIError async def process_document ( doc_id : str ) : try : doc = await db . fetch_one ( "SELECT * FROM documents WHERE id = $1" , ( doc_id , ) ) moderation = await security . check_content_moderation ( doc [ "content" ] ) if moderation . blocked : raise ValueError ( "Content policy violation" ) await qdrant . upsert ( point_id = doc_id , text = doc [ "content" ] , payload = { "source" : "documents" } ) await storage . upload_bytes ( data = doc [ "content" ] . encode ( ) , object_name = f"processed/ { doc_id } .txt" ) except StorageOperationError as e : logger . error ( f"Storage op failed: { e . error_code } " ) except AuthAPIError as e : logger . error ( f"Auth failed: { e . status_code } " ) except ( StorageError , AuthError , SecurityError ) as e : logger . error ( f"Module error: { type ( e ) . __name__ } : { e } " ) except Exception as e : logger . critical ( f"Unexpected: { e } " ) raise
Summary
AGENTIC STAR Platform SDK v0.5.3 is a comprehensive toolkit consisting of the following 8 specialized modules:
Events — Async event delivery (SSE / Webhook / DB)
Database — PostgreSQL + Azure AD auth + specialized access layers (Config / Execution / Telemetry / PodRuntime)
RAG — Embedding + Vector Search (Qdrant)
Storage — Cloud Storage (Azure / S3 / GCS)
Auth — AGENTIC STAR authentication service integration
Memory — Semantic Memory (Mem0 + Qdrant)
Security — Content Moderation + PII Detection (Azure / AWS / GCP multi-cloud support)
Common — Common utilities (SQL injection prevention, secret masking)
Each module can be installed and used independently, and by combining them you can build powerful AI agent applications. The SDK provides infrastructure only, and developers freely design agent logic.