Task Persistence System
This document provides a comprehensive overview of the task persistence system for the Teal Agents framework, including detailed documentation of all classes, interfaces, and implementation patterns.
Overview
The task persistence system provides a flexible, pluggable architecture for storing and retrieving agent tasks and their state. The system supports multiple storage backends through a factory pattern with dependency injection.
Core Features
- Pluggable Architecture: Factory pattern with configurable implementations
- Thread-Safe Operations: Async-safe with proper locking mechanisms
- In-Memory Storage (default): Zero configuration, perfect for development
- Redis Storage (optional): Persistent, scalable, production-ready
- Custom Storage: Support for user-defined implementations
- Request ID Indexing: Fast lookups by request ID across all implementations
Folder Structure
persistence/
├── __init__.py # Package initialization (empty)
├── README.md # This documentation file
├── singleton.py # Thread-safe singleton metaclass
├── task_persistence_manager.py # Abstract base class interface
├── in_memory_persistence_manager.py # Default in-memory implementation
├── persistence_factory.py # Factory pattern with dependency injection
└── custom/ # Custom implementations directory
└── example_redis_persistence.py # Production-ready Redis implementation
Core Classes Documentation
1. TaskPersistenceManager (Abstract Base Class)
File: task_persistence_manager.py
The abstract base class that defines the interface for all persistence implementations.
Methods
async create(task: AgentTask) -> None- Creates a new task in the persistence layer
-
Should raise
PersistenceCreateErrorif task already exists or on failure -
async load(task_id: str) -> AgentTask | None - Loads a task by its unique task ID
- Returns
Noneif task not found -
Should raise
PersistenceLoadErroron failure -
async update(task: AgentTask) -> None - Updates an existing task in the persistence layer
-
Should raise
PersistenceUpdateErrorif task doesn't exist or on failure -
async delete(task_id: str) -> None - Deletes a task by its unique task ID
-
Should raise
PersistenceDeleteErrorif task doesn't exist or on failure -
async load_by_request_id(request_id: str) -> AgentTask | None - Loads a task by request ID (for tasks containing items with specific request IDs)
- Returns the first matching task if multiple exist
- Returns
Noneif no task found
2. InMemoryPersistenceManager (Default Implementation)
File: in_memory_persistence_manager.py
Production-ready in-memory implementation with thread safety and request ID indexing.
Implementation Features
- Thread Safety: Uses
asyncio.Lock()for concurrent access protection - Dual Indexing: Primary storage by task_id + secondary index by request_id
- Memory Efficient: Automatic cleanup of empty index entries
- Error Handling: Comprehensive exception handling with custom error types
Internal Data Structures
in_memory: dict[str, AgentTask]: Primary storage mapping task_id to AgentTaskitem_request_id_index: dict[str, set[str]]: Secondary index mapping request_id to set of task_ids
Thread Safety Implementation
3. PersistenceFactory (Factory Pattern)
File: persistence_factory.py
Singleton factory responsible for creating and managing persistence manager instances with dependency injection support.
Key Features
- Singleton Pattern: Ensures single instance per application lifecycle
- Dynamic Module Loading: Loads custom implementations via
ModuleLoader - Configuration-Driven: Uses environment variables for custom implementations
- Validation: Ensures custom classes inherit from
TaskPersistenceManager - Graceful Fallback: Falls back to in-memory implementation if custom module fails
Configuration Methods
_get_custom_persistence_config() -> tuple[str | None, str | None]- Retrieves module and class names from environment variables
-
Returns
(None, None)if using default configuration -
_validate_custom_class() - Validates that custom class exists and inherits from
TaskPersistenceManager - Raises appropriate exceptions for missing or invalid classes
Dependency Injection
The factory attempts to pass app_config to custom implementations:
try:
return custom_class(app_config=self.app_config)
except TypeError:
# Fallback if app_config not accepted
return custom_class()
4. Singleton (Metaclass)
File: singleton.py
Thread-safe singleton metaclass implementation using Python's threading.Lock.
Features
- Thread Safety: Uses
threading.Lock()to prevent race conditions - Metaclass Pattern: Implements singleton at the class level
- Instance Caching: Maintains
_instancesdictionary for created instances
Usage Pattern
Custom Implementations Directory
RedisTaskPersistenceManager (Production Example)
File: custom/example_redis_persistence.py
A complete, production-ready Redis-based persistence implementation demonstrating advanced patterns.
Architecture Features
- Connection Management: Robust Redis connection with retry logic
- Serialization: JSON-based task serialization using Pydantic models
- TTL Support: Configurable time-to-live for all stored data
- Index Management: Maintains request_id to task_id mapping in Redis sets
- Error Recovery: Handles corrupted data with automatic cleanup
- Health Monitoring: Built-in health check capabilities
Configuration Environment Variables
TA_REDIS_HOST # Redis server hostname (default: localhost)
TA_REDIS_PORT # Redis server port (default: 6379)
TA_REDIS_DB # Redis database number (default: 0)
TA_REDIS_TTL # Time-to-live in seconds (default: 3600)
TA_REDIS_PWD # Redis password (optional)
TA_REDIS_SSL # Enable SSL connection (default: false)
Redis Key Patterns
- Task Storage:
task_persistence:task:{task_id} - Request Index:
task_persistence:request_index:{request_id}
Advanced Methods
health_check() -> bool: Tests Redis connectivityclear_all_tasks() -> int: Utility method for testing/cleanup_serialize_task(task: AgentTask) -> str: JSON serialization_deserialize_task(task_str: str) -> AgentTask: JSON deserialization
Error Handling Strategy
try:
# Redis operation
except redis.RedisError as e:
raise PersistenceCreateError(f"Redis error: {e}") from e
except json.JSONDecodeError as e:
# Handle corrupted data with cleanup
self.redis_client.delete(task_key)
raise PersistenceLoadError(f"Corrupted data: {e}") from e
Configuration System
Environment Variables
The persistence system uses the following environment variables:
TA_PERSISTENCE_MODULE: Path to custom task persistence moduleTA_PERSISTENCE_CLASS: Class name for custom implementation
Configuration Examples
Development (Default - In-Memory)
Production with Redis
export TA_PERSISTENCE_MODULE=src/sk_agents/persistence/custom/example_redis_persistence.py
export TA_PERSISTENCE_CLASS=RedisTaskPersistenceManager
export TA_REDIS_HOST=redis.production.com
export TA_REDIS_PORT=6379
export TA_REDIS_PWD=secure_password
export TA_REDIS_SSL=true
export TA_REDIS_TTL=7200
Custom Implementation
export TA_PERSISTENCE_MODULE=my_custom_module.py
export TA_PERSISTENCE_CLASS=MyCustomPersistenceManager
Usage Patterns
Basic Usage
from ska_utils import AppConfig
from sk_agents.persistence.persistence_factory import PersistenceFactory
from sk_agents.tealagents.models import AgentTask
# Get task persistence manager
app_config = AppConfig()
factory = PersistenceFactory(app_config)
persistence_manager = factory.get_persistence_manager()
# Store, retrieve, and manage task data
await persistence_manager.create(agent_task)
retrieved_task = await persistence_manager.load("task_id_123")
await persistence_manager.update(agent_task)
await persistence_manager.delete("task_id_123")
# Load by request ID (useful for resuming workflows)
task = await persistence_manager.load_by_request_id("request_123")
Advanced Usage with Error Handling
from sk_agents.exceptions import PersistenceCreateError, PersistenceLoadError
try:
await persistence_manager.create(task)
except PersistenceCreateError as e:
logger.error(f"Failed to create task: {e.message}")
# Handle creation failure
try:
task = await persistence_manager.load(task_id)
if task is None:
logger.info(f"Task {task_id} not found")
else:
# Process loaded task
pass
except PersistenceLoadError as e:
logger.error(f"Failed to load task: {e.message}")
Creating Custom Implementations
Step 1: Implement the Interface
Create a new class that inherits from TaskPersistenceManager:
from sk_agents.persistence.task_persistence_manager import TaskPersistenceManager
from sk_agents.tealagents.models import AgentTask
from sk_agents.exceptions import (
PersistenceCreateError,
PersistenceDeleteError,
PersistenceLoadError,
PersistenceUpdateError,
)
class MyCustomTaskPersistenceManager(TaskPersistenceManager):
def __init__(self, app_config=None):
# Initialize your storage backend
self.storage = self._initialize_storage(app_config)
async def create(self, task: AgentTask) -> None:
try:
# Your implementation
if await self._task_exists(task.task_id):
raise PersistenceCreateError(
f"Task {task.task_id} already exists"
)
await self._store_task(task)
except Exception as e:
raise PersistenceCreateError(f"Create failed: {e}") from e
async def load(self, task_id: str) -> AgentTask | None:
try:
return await self._retrieve_task(task_id)
except Exception as e:
raise PersistenceLoadError(f"Load failed: {e}") from e
async def update(self, task: AgentTask) -> None:
try:
if not await self._task_exists(task.task_id):
raise PersistenceUpdateError(
f"Task {task.task_id} does not exist"
)
await self._update_task(task)
except Exception as e:
raise PersistenceUpdateError(f"Update failed: {e}") from e
async def delete(self, task_id: str) -> None:
try:
if not await self._task_exists(task_id):
raise PersistenceDeleteError(
f"Task {task_id} does not exist"
)
await self._remove_task(task_id)
except Exception as e:
raise PersistenceDeleteError(f"Delete failed: {e}") from e
async def load_by_request_id(self, request_id: str) -> AgentTask | None:
try:
task_ids = await self._find_tasks_by_request_id(request_id)
if not task_ids:
return None
return await self.load(task_ids[0])
except Exception as e:
raise PersistenceLoadError(f"Load by request_id failed: {e}") from e
Step 2: Configuration
Set the environment variables to use your custom implementation:
export TA_PERSISTENCE_MODULE=path/to/your/custom_module.py
export TA_PERSISTENCE_CLASS=MyCustomTaskPersistenceManager
Step 3: Integration
The factory will automatically load and validate your implementation when the application starts.
Error Handling
The persistence system uses custom exception types for different failure scenarios:
PersistenceCreateError: Task creation failuresPersistenceLoadError: Task retrieval failuresPersistenceUpdateError: Task update failuresPersistenceDeleteError: Task deletion failures
All implementations should raise these specific exceptions to maintain consistent error handling across the system.
Testing Considerations
When implementing custom persistence managers:
- Unit Tests: Test all CRUD operations with various edge cases
- Concurrency Tests: Verify thread safety with concurrent operations
- Error Scenarios: Test network failures, corrupted data, etc.
- Performance Tests: Measure latency and throughput under load
- Integration Tests: Test with actual AgentTask objects
Example test pattern:
import pytest
from sk_agents.tealagents.models import AgentTask
@pytest.mark.asyncio
async def test_create_and_load():
persistence_manager = MyCustomPersistenceManager()
# Create test task
task = AgentTask(task_id="test_123", ...)
await persistence_manager.create(task)
# Verify retrieval
loaded_task = await persistence_manager.load("test_123")
assert loaded_task is not None
assert loaded_task.task_id == "test_123"
Best Practices
- Thread Safety: Always implement proper locking for concurrent access
- Error Handling: Use the standard exception types for consistency
- Resource Management: Properly close connections and clean up resources
- Configuration: Support dependency injection via
app_configparameter - Logging: Include comprehensive logging for debugging and monitoring
- Validation: Validate input parameters and handle edge cases
- Documentation: Document configuration requirements and usage patterns