Services API¶
Studiorum's service architecture provides a flexible, type-safe foundation for dependency injection and service lifecycle management.
Service Container¶
ServiceContainer¶
The central service container manages all application services with full lifecycle support.
from studiorum.core.container import ServiceContainer, get_global_container
from studiorum.core.services.lifecycle import ServiceLifecycle
from studiorum.core.services.protocols import OmnidexerProtocol
# Get global container instance
container = get_global_container()
# Register a service
container.register_service(
protocol=OmnidexerProtocol,
factory=create_omnidexer_service,
lifecycle=ServiceLifecycle.SINGLETON,
dependencies=(ConfigProtocol,),
hot_reloadable=True
)
# Resolve services (sync)
omnidexer = container.get_omnidexer_sync()
# Resolve services (async)
async def get_service_async():
omnidexer = await container.get_service(OmnidexerProtocol)
return omnidexer
Methods¶
register_service[T](protocol, factory, lifecycle, dependencies=(), **options)
Register a service with the container.
- protocol: Service protocol type
- factory: Factory function or class
- lifecycle: Service lifecycle strategy
- dependencies: Required dependency protocols
- hot_reloadable: Whether service supports hot reloading
- cleanup_priority: Cleanup order priority
def register_service[T](
self,
protocol: type[T],
factory: ServiceFactory[T] | AsyncServiceFactory[T],
lifecycle: ServiceLifecycle,
dependencies: tuple[type[Any], ...] = (),
hot_reloadable: bool = False,
cleanup_priority: CleanupPriority = CleanupPriority.NORMAL,
) -> None:
"""Register service with complete configuration."""
get_service[T](protocol) -> T
Resolve service by protocol (async).
Sync Service Access
For CLI and synchronous contexts, use dedicated sync factories:
from studiorum.core.services.factories import (
create_omnidexer_service_sync,
create_data_source_manager_service_sync
)
# Register sync factories in CLI container
container.register_service(
OmnidexerProtocol,
create_omnidexer_service_sync, # Sync factory
lifecycle=ServiceLifecycle.SINGLETON
)
# Direct sync access methods
omnidexer = container.get_omnidexer_sync()
content_resolver = container.get_content_resolver_sync()
tag_resolver = container.get_tag_resolver_sync()
Context Separation: CLI and MCP contexts use different service registration patterns:
- CLI: Uses
SINGLETON
lifecycle with sync factories for performance - MCP: Uses
ASYNC_RESOURCE
lifecycle with async factories for request isolation
Service Lifecycles¶
Services support different lifecycle strategies:
from studiorum.core.services.lifecycle import ServiceLifecycle
class ServiceLifecycle(Enum):
SINGLETON = "singleton" # One instance per container
TRANSIENT = "transient" # New instance per request
SCOPED = "scoped" # One instance per scope
ASYNC_RESOURCE = "async_resource" # Async lifecycle for MCP contexts
Core Services¶
Omnidexer Service¶
Content indexing and search service.
from studiorum.core.services.protocols import OmnidexerProtocol
@runtime_checkable
class OmnidexerProtocol(Protocol):
"""Protocol for content indexing and search."""
def is_loaded(self) -> bool:
"""Check if content is loaded."""
def load_all_data(self) -> None:
"""Load all content data."""
def get_creature_by_name(self, name: str) -> Creature | None:
"""Get creature by exact name."""
def get_spell_by_name(self, name: str) -> Spell | None:
"""Get spell by exact name."""
def get_adventure_by_name(self, name: str) -> Adventure | None:
"""Get adventure by name or abbreviation."""
def search_creatures(
self,
query: str | None = None,
*,
challenge_rating: str | None = None,
creature_type: str | None = None,
size: str | None = None,
environment: str | None = None,
) -> list[Creature]:
"""Search creatures with filters."""
def search_spells(
self,
query: str | None = None,
*,
level: int | str | None = None,
school: str | None = None,
spell_class: str | None = None,
) -> list[Spell]:
"""Search spells with filters."""
def get_all_by_type(self, content_type: str) -> list[Any]:
"""Get all content of specified type."""
def get_statistics(self) -> dict[str, Any]:
"""Get content statistics."""
Usage Examples
# Get omnidexer instance
omnidexer = container.get_omnidexer_sync()
# Load data if not already loaded
if not omnidexer.is_loaded():
omnidexer.load_all_data()
# Get specific content
dragon = omnidexer.get_creature_by_name("Ancient Red Dragon")
fireball = omnidexer.get_spell_by_name("Fireball")
lmop = omnidexer.get_adventure_by_name("LMoP")
# Search with filters
high_cr_dragons = omnidexer.search_creatures(
challenge_rating="15-30",
creature_type="dragon"
)
wizard_spells = omnidexer.search_spells(
level="1-3",
spell_class="wizard"
)
# Get statistics
stats = omnidexer.get_statistics()
print(f"Loaded {stats['creature_count']} creatures")
ContentResolver Service¶
Resolves content references and dependencies.
from studiorum.core.services.protocols import ContentResolverProtocol
@runtime_checkable
class ContentResolverProtocol(Protocol):
"""Protocol for content resolution."""
def resolve_creature(self, creature_id: str) -> Result[Creature, ResolveError]:
"""Resolve creature with validation."""
def resolve_spell(self, spell_id: str) -> Result[Spell, ResolveError]:
"""Resolve spell with validation."""
def resolve_adventure(self, adventure_id: str) -> Result[Adventure, ResolveError]:
"""Resolve adventure with all dependencies."""
def resolve_item(self, item_id: str) -> Result[Item, ResolveError]:
"""Resolve magic item with validation."""
Usage Examples
from studiorum.core.result import Success, Error
resolver = container.get_content_resolver_sync()
# Resolve with error handling
result = resolver.resolve_creature("ancient-red-dragon")
if isinstance(result, Error):
print(f"Resolution failed: {result.error}")
else:
creature = result.unwrap()
print(f"Resolved: {creature.name}")
# Chain resolutions
adventure_result = resolver.resolve_adventure("curse-of-strahd")
if isinstance(adventure_result, Success):
adventure = adventure_result.unwrap()
print(f"Adventure: {adventure.name}")
print(f"Chapters: {len(adventure.contents)}")
TagResolver Service¶
Resolves 5e content tags and cross-references.
from studiorum.core.services.protocols import TagResolverProtocol
@runtime_checkable
class TagResolverProtocol(Protocol):
"""Protocol for tag resolution."""
def resolve_tag(
self,
tag: str,
context: dict[str, Any] | None = None
) -> str:
"""Resolve a single tag to content."""
def resolve_tags_in_content(
self,
content: str,
context: dict[str, Any] | None = None
) -> str:
"""Resolve all tags in content string."""
def get_referenced_content(
self,
content: str
) -> dict[str, list[str]]:
"""Extract all content references from text."""
Usage Examples
tag_resolver = container.get_tag_resolver_sync()
# Resolve individual tags
creature_tag = "{@creature Adult Red Dragon|SRD}"
resolved = tag_resolver.resolve_tag(creature_tag)
print(resolved) # "Adult Red Dragon"
# Resolve tags in full content
content = "The {@creature hobgoblin|SRD} attacks with its {@item scimitar|SRD}."
resolved_content = tag_resolver.resolve_tags_in_content(content)
print(resolved_content) # "The hobgoblin attacks with its scimitar."
# Extract references for appendices
references = tag_resolver.get_referenced_content(adventure_text)
print(references)
Async Request Context¶
For MCP server and async operations, use AsyncRequestContext:
from studiorum.core.async_request_context import AsyncRequestContext
class AsyncRequestContext:
"""Request-scoped service context for async operations."""
async def get_service[T](self, protocol: type[T]) -> T:
"""Get service instance for this request."""
async def __aenter__(self) -> AsyncRequestContext:
"""Enter async context."""
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""Clean up request-scoped resources."""
Usage in MCP Tools
from studiorum.mcp.core.decorators import mcp_tool
from studiorum.core.async_request_context import AsyncRequestContext
@mcp_tool("lookup_creature")
async def lookup_creature_tool(
ctx: AsyncRequestContext,
name: str
) -> dict[str, Any]:
"""MCP tool using async request context."""
# Get services for this request
omnidexer = await ctx.get_service(OmnidexerProtocol)
resolver = await ctx.get_service(ContentResolverProtocol)
# Use services with full type safety
creature = omnidexer.get_creature_by_name(name)
if not creature:
return {"error": f"Creature not found: {name}"}
# Resolve additional data if needed
resolution_result = resolver.resolve_creature(creature.id)
if isinstance(resolution_result, Error):
return {"error": f"Resolution failed: {resolution_result.error}"}
resolved_creature = resolution_result.unwrap()
return {
"name": resolved_creature.name,
"challenge_rating": resolved_creature.challenge_rating,
"armor_class": resolved_creature.armor_class,
# ... additional fields
}
Service Registration¶
Built-in Services¶
Register all core services:
from studiorum.core.container import register_core_services
def register_core_services(container: ServiceContainer) -> None:
"""Register all core application services."""
# Configuration service
container.register_service(
ConfigProtocol,
create_config_service,
ServiceLifecycle.SINGLETON,
hot_reloadable=True
)
# Content services
container.register_service(
OmnidexerProtocol,
create_omnidexer_service,
ServiceLifecycle.SINGLETON,
dependencies=(ConfigProtocol,)
)
container.register_service(
ContentResolverProtocol,
create_content_resolver_service,
ServiceLifecycle.SINGLETON,
dependencies=(OmnidexerProtocol,)
)
# Tag processing
container.register_service(
TagResolverProtocol,
create_tag_resolver_service,
ServiceLifecycle.SINGLETON,
dependencies=(OmnidexerProtocol, ConfigProtocol)
)
Custom Services¶
Register your own services:
from typing import Protocol, runtime_checkable
@runtime_checkable
class MyServiceProtocol(Protocol):
def process_data(self, data: str) -> str: ...
class MyService:
def __init__(self, omnidexer: OmnidexerProtocol, config: ConfigProtocol):
self.omnidexer = omnidexer
self.config = config
def process_data(self, data: str) -> str:
# Custom processing logic
return f"Processed: {data}"
def create_my_service(
omnidexer: OmnidexerProtocol,
config: ConfigProtocol
) -> MyService:
"""Factory function for custom service."""
return MyService(omnidexer, config)
# Register custom service
container.register_service(
MyServiceProtocol,
create_my_service,
ServiceLifecycle.SINGLETON,
dependencies=(OmnidexerProtocol, ConfigProtocol)
)
# Use custom service
my_service = await container.get_service(MyServiceProtocol)
result = my_service.process_data("test data")
Service Factories¶
Service factories can be functions or classes:
# Function factory
def create_simple_service() -> SimpleService:
return SimpleService()
# Function factory with dependencies
def create_complex_service(
dep1: Dependency1Protocol,
dep2: Dependency2Protocol
) -> ComplexService:
return ComplexService(dep1, dep2)
# Class factory
class ServiceFactory:
def __init__(self, config: ConfigProtocol):
self.config = config
def create(self) -> MyService:
return MyService(self.config)
# Async factory
async def create_async_service() -> AsyncService:
await initialize_async_resources()
return AsyncService()
Error Handling¶
Services use Result types for error handling:
from studiorum.core.result import Result, Success, Error
class MyService:
def risky_operation(self, input_data: str) -> Result[str, str]:
"""Operation that might fail."""
if not input_data:
return Error("Input cannot be empty")
try:
result = self._process_data(input_data)
return Success(result)
except ValueError as e:
return Error(f"Processing failed: {e}")
except Exception as e:
return Error(f"Unexpected error: {e}")
# Usage with error handling
service = await container.get_service(MyServiceProtocol)
result = service.risky_operation("test input")
if isinstance(result, Error):
logger.error(f"Operation failed: {result.error}")
else:
processed_data = result.unwrap()
logger.info(f"Success: {processed_data}")
Service Configuration¶
Services can be configured through the application config:
# Configuration model
class MyServiceConfig(BaseModel):
enabled: bool = True
batch_size: int = 100
timeout_seconds: int = 30
class ApplicationConfig(BaseModel):
# ... other config sections
my_service: MyServiceConfig = MyServiceConfig()
# Service using configuration
class ConfigurableService:
def __init__(self, config: ApplicationConfig):
self.service_config = config.my_service
self.batch_size = self.service_config.batch_size
self.timeout = self.service_config.timeout_seconds
def process_batch(self, items: list[Any]) -> list[Any]:
# Use configured batch size
batches = self._chunk_items(items, self.batch_size)
return self._process_batches(batches)
Testing Services¶
Test services in isolation:
import pytest
from unittest.mock import Mock
from studiorum.core.container import ServiceContainer, reset_global_container
class TestMyService:
def setup_method(self):
"""Reset container for each test."""
reset_global_container()
async def test_service_registration_and_resolution(self):
"""Test service container functionality."""
container = ServiceContainer()
# Register test service
container.register_service(
MyServiceProtocol,
lambda: MyService(),
ServiceLifecycle.SINGLETON
)
# Resolve and test
service = await container.get_service(MyServiceProtocol)
assert isinstance(service, MyService)
async def test_service_with_mocked_dependencies(self):
"""Test service with mocked dependencies."""
container = ServiceContainer()
# Mock dependencies
mock_omnidexer = Mock(spec=OmnidexerProtocol)
mock_config = Mock(spec=ConfigProtocol)
# Register mocks
container.register_service(
OmnidexerProtocol,
lambda: mock_omnidexer,
ServiceLifecycle.SINGLETON
)
container.register_service(
ConfigProtocol,
lambda: mock_config,
ServiceLifecycle.SINGLETON
)
# Register service under test
container.register_service(
MyServiceProtocol,
create_my_service,
ServiceLifecycle.SINGLETON,
dependencies=(OmnidexerProtocol, ConfigProtocol)
)
# Test service behavior
service = await container.get_service(MyServiceProtocol)
result = service.process_data("test")
# Verify mocks were called
assert mock_omnidexer.method_was_called
assert result.startswith("Processed:")
Performance Considerations¶
Service Caching¶
Services are automatically cached based on lifecycle:
# Singletons are cached in the container
omnidexer1 = await container.get_service(OmnidexerProtocol)
omnidexer2 = await container.get_service(OmnidexerProtocol)
assert omnidexer1 is omnidexer2 # Same cached instance
# Transient services are never cached
processor1 = await container.get_service(ProcessorProtocol)
processor2 = await container.get_service(ProcessorProtocol)
assert processor1 is not processor2 # Different instances
Lazy Loading¶
Services support lazy initialization:
class LazyService:
def __init__(self):
self._heavy_resource = None
@property
def heavy_resource(self):
"""Lazy load expensive resource."""
if self._heavy_resource is None:
self._heavy_resource = load_heavy_resource()
return self._heavy_resource
Memory Management¶
Use cleanup priorities for proper resource management:
from studiorum.core.services.cleanup import CleanupPriority
# Register service with cleanup priority
container.register_service(
DatabaseProtocol,
create_database_service,
ServiceLifecycle.SINGLETON,
cleanup_priority=CleanupPriority.HIGH # Clean up early
)
container.register_service(
CacheProtocol,
create_cache_service,
ServiceLifecycle.SINGLETON,
cleanup_priority=CleanupPriority.LOW # Clean up late
)
Next Steps¶
- Models API: Data models and validation
- Renderers API: Output rendering system
- Architecture Guide: System design patterns