Skip to content

sk-agents

sk_agents
sk_agents.a2a
sk_agents.a2a.redis_task_store

Redis implementation of the TaskStore interface. This implementation uses Redis as the persistent store for Task objects.

sk_agents.a2a.redis_task_store.RedisTaskStore

Bases: TaskStore

Redis implementation of the TaskStore interface.

This class provides Redis-based persistence for Task objects.

Source code in src/sk_agents/a2a/redis_task_store.py
class RedisTaskStore(TaskStore):
    """Redis implementation of the TaskStore interface.

    This class provides Redis-based persistence for Task objects.
    """

    def __init__(self, redis_client: Redis, ttl: int | None = None, key_prefix: str = "task:"):
        """Initialize the RedisTaskStore with a Redis client.

        Args:
            redis_client: An instance of Redis client
            key_prefix: Prefix used for Redis keys (default: "task:")
        """
        self._redis = redis_client
        self._ttl = ttl
        self._key_prefix = key_prefix

    def _get_key(self, task_id: str) -> str:
        """Generate a Redis key for a given task ID.

        Args:
            task_id: The ID of the task

        Returns:
            A Redis key string
        """
        return f"{self._key_prefix}{task_id}"

    async def save(self, task: Task):
        """Saves or updates a task in the Redis store.

        Args:
            task: The Task object to save
        """
        # Convert the Task object to a serializable dictionary
        task_dict = task.model_dump(mode="json")

        # Serialize the task dictionary to JSON
        task_json = json.dumps(task_dict)

        # Store the serialized task in Redis using the task ID as the key
        await self._redis.set(self._get_key(task.id), task_json, ex=self._ttl)

    async def get(self, task_id: str) -> Task | None:
        """Retrieves a task from the Redis store by ID.

        Args:
            task_id: The ID of the task to retrieve

        Returns:
            The Task object if found, None otherwise
        """
        # Get the serialized task from Redis
        task_json = await self._redis.get(self._get_key(task_id))

        if task_json is None:
            return None

        # Deserialize the JSON string to a dictionary
        task_dict = json.loads(task_json)

        # Create and return a Task object from the dictionary
        return Task.model_validate(task_dict)

    async def delete(self, task_id: str):
        """Deletes a task from the Redis store by ID.

        Args:
            task_id: The ID of the task to delete
        """
        # Delete the task from Redis
        await self._redis.delete(self._get_key(task_id))
sk_agents.a2a.redis_task_store.RedisTaskStore.__init__
__init__(
    redis_client: Redis,
    ttl: int | None = None,
    key_prefix: str = "task:",
)

Initialize the RedisTaskStore with a Redis client.

Parameters:

Name Type Description Default
redis_client Redis

An instance of Redis client

required
key_prefix str

Prefix used for Redis keys (default: "task:")

'task:'
Source code in src/sk_agents/a2a/redis_task_store.py
def __init__(self, redis_client: Redis, ttl: int | None = None, key_prefix: str = "task:"):
    """Initialize the RedisTaskStore with a Redis client.

    Args:
        redis_client: An instance of Redis client
        key_prefix: Prefix used for Redis keys (default: "task:")
    """
    self._redis = redis_client
    self._ttl = ttl
    self._key_prefix = key_prefix
sk_agents.a2a.redis_task_store.RedisTaskStore.save async
save(task: Task)

Saves or updates a task in the Redis store.

Parameters:

Name Type Description Default
task Task

The Task object to save

required
Source code in src/sk_agents/a2a/redis_task_store.py
async def save(self, task: Task):
    """Saves or updates a task in the Redis store.

    Args:
        task: The Task object to save
    """
    # Convert the Task object to a serializable dictionary
    task_dict = task.model_dump(mode="json")

    # Serialize the task dictionary to JSON
    task_json = json.dumps(task_dict)

    # Store the serialized task in Redis using the task ID as the key
    await self._redis.set(self._get_key(task.id), task_json, ex=self._ttl)
sk_agents.a2a.redis_task_store.RedisTaskStore.get async
get(task_id: str) -> Task | None

Retrieves a task from the Redis store by ID.

Parameters:

Name Type Description Default
task_id str

The ID of the task to retrieve

required

Returns:

Type Description
Task | None

The Task object if found, None otherwise

Source code in src/sk_agents/a2a/redis_task_store.py
async def get(self, task_id: str) -> Task | None:
    """Retrieves a task from the Redis store by ID.

    Args:
        task_id: The ID of the task to retrieve

    Returns:
        The Task object if found, None otherwise
    """
    # Get the serialized task from Redis
    task_json = await self._redis.get(self._get_key(task_id))

    if task_json is None:
        return None

    # Deserialize the JSON string to a dictionary
    task_dict = json.loads(task_json)

    # Create and return a Task object from the dictionary
    return Task.model_validate(task_dict)
sk_agents.a2a.redis_task_store.RedisTaskStore.delete async
delete(task_id: str)

Deletes a task from the Redis store by ID.

Parameters:

Name Type Description Default
task_id str

The ID of the task to delete

required
Source code in src/sk_agents/a2a/redis_task_store.py
async def delete(self, task_id: str):
    """Deletes a task from the Redis store by ID.

    Args:
        task_id: The ID of the task to delete
    """
    # Delete the task from Redis
    await self._redis.delete(self._get_key(task_id))
sk_agents.a2a.response_classifier
sk_agents.a2a.response_classifier.A2AResponseClassifier

A class to classify responses from the A2A agent.

Source code in src/sk_agents/a2a/response_classifier.py
class A2AResponseClassifier:
    """
    A class to classify responses from the A2A agent.
    """

    NAME = "a2a-response-classifier"
    SYSTEM_PROMPT = (
        "## System Prompt: Agent Output Classifier\n"
        "\n"
        "**You are an AI agent tasked with analyzing the output of another AI agent "
        '(referred to as the "Primary Agent") and classifying its status. Your output MUST '
        "be a JSON object.**\n"
        "\n"
        "Your goal is to determine which of the following categories best describes "
        "the Primary Agent's output and structure your response accordingly.\n"
        "\n"
        "**Possible Classification Statuses & JSON Output Structures:**\n"
        "\n"
        "1.  **Status: `completed`**\n"
        "    * The Primary Agent has successfully completed the assigned task or answered the "
        "user's query.\n"
        "    * **JSON Output Structure:**\n"
        "        ```json\n"
        "        {\n"
        '          "status": "completed"\n'
        "        }\n"
        "        ```\n"
        '    * Keywords/phrases to look for: "done," "completed," "finished," "success," '
        '"here is the result," "I have finished," "the task is complete," direct answers '
        "to questions, generated content that fulfills the request.\n"
        "    * Context: The output clearly indicates finality and achievement of the original "
        "goal.\n"
        "\n"
        "2.  **Status: `failed`**\n"
        "    * The Primary Agent has failed to complete the assigned task or answered the "
        "user's query.\n"
        "    * **JSON Output Structure:**\n"
        "        ```json\n"
        "        {\n"
        '          "status": "failed"\n'
        "        }\n"
        "        ```\n"
        '    * Keywords/phrases to look for: "failed," "unable to," "cannot complete," '
        '"error," "encountered a problem," "not possible," "I\'m sorry, I can\'t," '
        '"task aborted."\n'
        "    * Context: The output indicates an inability to proceed or a definitive negative "
        "outcome regarding the task. This includes technical errors, lack of capability, "
        "or hitting a dead end.\n"
        "\n"
        "3.  **Status: `input-required`**\n"
        "    * The Primary Agent requires additional information, clarification, or a decision "
        "from the user to continue or complete the task.\n"
        "    * **JSON Output Structure:**\n"
        "        ```json\n"
        "        {\n"
        '          "status": "input-required",\n'
        '          "message": "A description of what info is needed from the user and why."\n'
        "        }\n"
        "        ```\n"
        '    * Keywords/phrases to look for: "what do you mean by," "could you please '
        'specify," "which option do you prefer," "do you want to proceed," "please '
        'provide," "I need more information," questions directed at the user.\n'
        "    * Context: The output is a direct or indirect request for user interaction to "
        "resolve ambiguity, make a choice, or provide necessary data. The `message` field "
        "should summarize this request.\n"
        "\n"
        "4.  **Status: `auth-required`**\n"
        "    * The Primary Agent has indicated that it needs to perform some form of "
        "authentication (e.g., login, API key verification, permission grant) before it "
        "can proceed with the task.\n"
        "    * **JSON Output Structure:**\n"
        "        ```json\n"
        "        {\n"
        '          "status": "auth-required",\n'
        '          "message": "A description of what authentication is needed and why.",\n'
        '          "auth_details": {} // Likely a JSON structure extracted from the Primary '
        "Agent's output containing technical details about the auth request. Can be an "
        "empty object if no specific structure is found.\n"
        "        }\n"
        "        ```\n"
        '    * Keywords/phrases to look for: "please log in," "authentication required," '
        '"access denied," "invalid credentials," "API key needed," "sign in to continue," '
        '"verify your identity," "permissions needed."\n'
        "    * Context: The output explicitly states or strongly implies that a security or "
        "access barrier is preventing task progression. The `message` field should explain "
        "this. The `auth_details` field should attempt to capture any structured information "
        "(e.g., OAuth URLs, scopes needed, realm info) provided by the Primary Agent "
        "regarding the authentication. If the Primary Agent provides a JSON blob related to "
        "auth, try to pass that through in `auth_details`.\n"
        "\n"
        "**Your Analysis Process:**\n"
        "\n"
        "1.  **Carefully review the entire output from the Primary Agent.** Understand the "
        "context and the overall message.\n"
        "2.  **Look for explicit keywords and phrases** associated with each category.\n"
        "3.  **Consider the intent** behind the Primary Agent's message.\n"
        "4.  **Prioritize:**\n"
        "    * If authentication is mentioned as a blocker, classify as `auth-required`. "
        "Extract relevant details for the `message` and `auth_details` fields.\n"
        "    * If the agent is clearly asking the user a question to proceed (and it's not "
        "primarily an authentication request), classify as `input-required`. Formulate the "
        "`message` field.\n"
        "    * If the agent explicitly states success, classify as `completed`.\n"
        "    * If the agent explicitly states failure or an insurmountable error (not related "
        "to needing input or auth), classify as `failed`.\n"
        "5.  **Extract Information for `message` and `auth_details`:**\n"
        "    * For `input-required` and `auth-required`, the `message` should be a concise "
        "explanation derived from the Primary Agent's output.\n"
        "    * For `auth-required`, if the Primary Agent's output includes a structured "
        "(e.g., JSON) segment detailing the authentication requirements, attempt to extract "
        "and place this into the `auth_details` field. If no specific structure is found, "
        "`auth_details` can be an empty object `{}`. Do not invent details; only extract "
        "what is provided.\n"
        "6.  **If the output is ambiguous, try to infer the most likely category.** If truly "
        'unclear, you may need a default or "UNCLEAR" category (though this prompt focuses '
        "on the four defined). In such a case, defaulting to `failed` with an appropriate "
        "message might be a safe fallback if no other category fits.\n"
        "\n"
        "**Output Format:**\n"
        "\n"
        "Your output **MUST** be a single JSON object corresponding to one of the structures "
        "defined above.\n"
        "\n"
        "**Example Scenarios:**\n"
        "\n"
        "* **Primary Agent Output:** \"I've finished generating the report you asked for. "
        "It's attached below.\"\n"
        "    * **Your JSON Output:**\n"
        "        ```json\n"
        "        {\n"
        '          "status": "completed"\n'
        "        }\n"
        "        ```\n"
        "* **Primary Agent Output:** \"I'm sorry, I encountered an unexpected error and cannot "
        'process your request at this time. Error code: 503. Please try again later."\n'
        "    * **Your JSON Output:**\n"
        "        ```json\n"
        "        {\n"
        '          "status": "failed"\n'
        "        }\n"
        "        ```\n"
        '* **Primary Agent Output:** "To help you with that, could you please tell me which '
        'specific date range you are interested in for the sales data?"\n'
        "    * **Your JSON Output:**\n"
        "        ```json\n"
        "        {\n"
        '          "status": "input-required",\n'
        '          "message": "The agent needs to know the specific date range for the sales '
        'data to proceed."\n'
        "        }\n"
        "        ```\n"
        '* **Primary Agent Output:** "Access to this API endpoint requires authentication. '
        "Please provide a valid Bearer token. Details: {'type': 'Bearer', 'realm': "
        "'[api.example.com/auth](https://api.example.com/auth)'}}\"\n"
        "    * **Your JSON Output:**\n"
        "        ```json\n"
        "        {\n"
        '          "status": "auth-required",\n'
        '          "message": "Access to the API endpoint requires a valid Bearer token.",\n'
        '          "auth_details": {\n'
        '            "type": "Bearer",\n'
        '            "realm": "[api.example.com/auth](https://api.example.com/auth)"\n'
        "          }\n"
        "        }\n"
        "        ```\n"
        '* **Primary Agent Output:** "You need to sign in to your account to access your '
        'profile. Click here to login."\n'
        "    * **Your JSON Output:**\n"
        "        ```json\n"
        "        {\n"
        '          "status": "auth-required",\n'
        '          "message": "User needs to sign in to their account to access their profile.",\n'
        '          "auth_details": {}\n'
        "        }\n"
        "        ```\n"
        "\n"
        "**Critical Considerations:**\n"
        "\n"
        "* Ensure your output is always valid JSON.\n"
        "* Be precise in your classification and in the information extracted for the `message` "
        "and `auth_details` fields.\n"
        "* Focus solely on the provided output from the Primary Agent.\n"
        "* Adhere to the prioritization logic.\n"
    )

    def __init__(self, app_config: AppConfig, chat_completion_builder: ChatCompletionBuilder):
        model_name = app_config.get(TA_A2A_OUTPUT_CLASSIFIER_MODEL.env_name)
        chat_completion = chat_completion_builder.get_chat_completion_for_model(
            service_id=self.NAME, model_name=model_name
        )
        kernel = Kernel()
        kernel.add_service(chat_completion)
        settings = kernel.get_prompt_execution_settings_from_service_id(self.NAME)
        settings.response_format = A2AResponseClassification
        self.agent = ChatCompletionAgent(
            kernel=kernel,
            name=self.NAME,
            instructions=self.SYSTEM_PROMPT,
            arguments=KernelArguments(settings=settings),
        )

    async def classify_response(self, response: str) -> A2AResponseClassification:
        """
        Classify the response from the A2A agent.

        Args:
            response (str): The response from the A2A agent.

        Returns:
            str: The classification of the response.
        """
        chat_history = ChatHistory()
        chat_history.add_user_message(f"Please classify the following response:\n\n{response}")
        async for content in self.agent.invoke(messages=chat_history):
            data = json.loads(str(content.content))
            return A2AResponseClassification(**data)
        return A2AResponseClassification(
            status=A2AResponseStatus.failed,
            message="No response received from response classifier.",
        )
sk_agents.a2a.response_classifier.A2AResponseClassifier.classify_response async
classify_response(
    response: str,
) -> A2AResponseClassification

Classify the response from the A2A agent.

Parameters:

Name Type Description Default
response str

The response from the A2A agent.

required

Returns:

Name Type Description
str A2AResponseClassification

The classification of the response.

Source code in src/sk_agents/a2a/response_classifier.py
async def classify_response(self, response: str) -> A2AResponseClassification:
    """
    Classify the response from the A2A agent.

    Args:
        response (str): The response from the A2A agent.

    Returns:
        str: The classification of the response.
    """
    chat_history = ChatHistory()
    chat_history.add_user_message(f"Please classify the following response:\n\n{response}")
    async for content in self.agent.invoke(messages=chat_history):
        data = json.loads(str(content.content))
        return A2AResponseClassification(**data)
    return A2AResponseClassification(
        status=A2AResponseStatus.failed,
        message="No response received from response classifier.",
    )
sk_agents.authorization
sk_agents.authorization.request_authorizer
sk_agents.authorization.request_authorizer.RequestAuthorizer

Bases: ABC

Source code in src/sk_agents/authorization/request_authorizer.py
class RequestAuthorizer(ABC):
    @abstractmethod
    def authorize_request(self, auth_header: str) -> str:
        """
        Validates the given authorization header and returns a unique identifier
        for the authenticated user.

        Parameters:
            auth_header (str): The value of the 'Authorization' HTTP header.
                Typically, this is in the format 'Bearer <token>' or some other
                scheme depending on the implementation.

        Returns:
            str: A unique string that identifies the authenticated user.
                This could be a user ID, username, email, or any other unique
                identifier suitable for tracking and authorization.
            Examples:
                "user_12345"
                "alice@example.com"

        Raises:
            ValueError: If the authorization header is missing, malformed, or invalid.
            AuthenticationError (optional): If used in your implementation, it may
                be raised to signal an authentication failure.
        """
        pass
sk_agents.authorization.request_authorizer.RequestAuthorizer.authorize_request abstractmethod
authorize_request(auth_header: str) -> str

Validates the given authorization header and returns a unique identifier for the authenticated user.

Parameters:

Name Type Description Default
auth_header str

The value of the 'Authorization' HTTP header. Typically, this is in the format 'Bearer ' or some other scheme depending on the implementation.

required

Returns:

Name Type Description
str str

A unique string that identifies the authenticated user. This could be a user ID, username, email, or any other unique identifier suitable for tracking and authorization.

Examples str

"user_12345" "alice@example.com"

Raises:

Type Description
ValueError

If the authorization header is missing, malformed, or invalid.

AuthenticationError(optional)

If used in your implementation, it may be raised to signal an authentication failure.

Source code in src/sk_agents/authorization/request_authorizer.py
@abstractmethod
def authorize_request(self, auth_header: str) -> str:
    """
    Validates the given authorization header and returns a unique identifier
    for the authenticated user.

    Parameters:
        auth_header (str): The value of the 'Authorization' HTTP header.
            Typically, this is in the format 'Bearer <token>' or some other
            scheme depending on the implementation.

    Returns:
        str: A unique string that identifies the authenticated user.
            This could be a user ID, username, email, or any other unique
            identifier suitable for tracking and authorization.
        Examples:
            "user_12345"
            "alice@example.com"

    Raises:
        ValueError: If the authorization header is missing, malformed, or invalid.
        AuthenticationError (optional): If used in your implementation, it may
            be raised to signal an authentication failure.
    """
    pass
sk_agents.exceptions
sk_agents.exceptions.AgentsException

Bases: Exception

Base class for all exception in SKagents

Source code in src/sk_agents/exceptions.py
class AgentsException(Exception):
    """Base class for all exception in SKagents"""
sk_agents.exceptions.InvalidConfigException

Bases: AgentsException

Exception raised when the provided configuration is invalid

Source code in src/sk_agents/exceptions.py
class InvalidConfigException(AgentsException):
    """Exception raised when the provided configuration is invalid"""

    message: str

    def __init__(self, message: str):
        self.message = message
sk_agents.exceptions.InvalidInputException

Bases: AgentsException

Exception raised when the provided input type is invalid

Source code in src/sk_agents/exceptions.py
class InvalidInputException(AgentsException):
    """Exception raised when the provided input type is invalid"""

    message: str

    def __init__(self, message: str):
        self.message = message
sk_agents.exceptions.AgentInvokeException

Bases: AgentsException

Exception raised when invoking an Agent failed

Source code in src/sk_agents/exceptions.py
class AgentInvokeException(AgentsException):
    """Exception raised when invoking an Agent failed"""

    message: str

    def __init__(self, message: str):
        self.message = message
sk_agents.exceptions.PersistenceCreateError

Bases: AgentsException

Exception raised for errors during task creation.

Source code in src/sk_agents/exceptions.py
class PersistenceCreateError(AgentsException):
    """Exception raised for errors during task creation."""

    message: str

    def __init__(self, message: str):
        self.message = message
sk_agents.exceptions.PersistenceLoadError

Bases: AgentsException

Exception raised for errors during task loading.

Source code in src/sk_agents/exceptions.py
class PersistenceLoadError(AgentsException):
    """Exception raised for errors during task loading."""

    message: str

    def __init__(self, message: str):
        self.message = message
sk_agents.exceptions.PersistenceUpdateError

Bases: AgentsException

Exception raised for errors during task update.

Source code in src/sk_agents/exceptions.py
class PersistenceUpdateError(AgentsException):
    """Exception raised for errors during task update."""

    message: str

    def __init__(self, message: str):
        self.message = message
sk_agents.exceptions.PersistenceDeleteError

Bases: AgentsException

Exception raised for errors during task deletion.

Source code in src/sk_agents/exceptions.py
class PersistenceDeleteError(AgentsException):
    """Exception raised for errors during task deletion."""

    message: str

    def __init__(self, message: str):
        self.message = message
sk_agents.exceptions.AuthenticationException

Bases: AgentsException

Exception raised errors when authenticating users

Source code in src/sk_agents/exceptions.py
class AuthenticationException(AgentsException):
    """Exception raised errors when authenticating users"""

    message: str

    def __init__(self, message: str):
        self.message = message
sk_agents.ska_types
sk_agents.ska_types.HistoryMessage

Bases: BaseModel

A single interaction in a chat history.
'role' - Either 'user' (requestor) or 'assistant' (responder) indicating who sent the message.
'content' - The content of the message

Source code in src/sk_agents/ska_types.py
class HistoryMessage(BaseModel):
    """A single interaction in a chat history.<br/>
    'role' - Either 'user' (requestor) or 'assistant' (responder) indicating
    who sent the message.<br/>
    'content' - The content of the message"""

    role: Literal["user", "assistant"]
    content: str
sk_agents.ska_types.BaseInput

Bases: KernelBaseModel

The history of a chat interaction between an automated assistant and a human.

Source code in src/sk_agents/ska_types.py
class BaseInput(KernelBaseModel):
    """The history of a chat interaction between an automated assistant and a
    human."""

    chat_history: list[HistoryMessage] | None = None
sk_agents.ska_types.BaseInputWithUserContext

Bases: KernelBaseModel

The history of a chat interaction between an automated assistant and a human, along with context about the user.

Source code in src/sk_agents/ska_types.py
class BaseInputWithUserContext(KernelBaseModel):
    """The history of a chat interaction between an automated assistant and a
    human, along with context about the user."""

    chat_history: list[HistoryMessage] | None = None
    user_context: dict[str, str] | None = None
sk_agents.state
sk_agents.state.redis_state_manager

Redis implementation of the StateManager interface. This implementation uses Redis as the persistent store for task state management.

sk_agents.state.redis_state_manager.RedisStateManager

Bases: StateManager

Redis implementation of the StateManager interface.

This class provides Redis-based persistence for task state management.

Source code in src/sk_agents/state/redis_state_manager.py
class RedisStateManager(StateManager):
    """Redis implementation of the StateManager interface.

    This class provides Redis-based persistence for task state management.
    """

    def __init__(
        self,
        redis_client: Redis,
        ttl: int | None = None,
        key_prefix: str = "task_state:",
    ):
        """Initialize the RedisStateManager with a Redis client.

        Args:
            redis_client: An instance of Redis client
            key_prefix: Prefix used for Redis keys (default: "task_state:")
        """
        self._redis = redis_client
        self._key_prefix = key_prefix
        self._ttl = ttl

    def _get_message_key(self, task_id: str) -> str:
        """Generate a Redis key for a task's messages.

        Args:
            task_id: The ID of the task

        Returns:
            A Redis key string for the task's messages
        """
        return f"{self._key_prefix}{task_id}:messages"

    def _get_canceled_key(self, task_id: str) -> str:
        """Generate a Redis key for a task's canceled status.

        Args:
            task_id: The ID of the task

        Returns:
            A Redis key string for the task's canceled status
        """
        return f"{self._key_prefix}{task_id}:canceled"

    async def update_task_messages(
        self, task_id: str, new_message: HistoryMultiModalMessage
    ) -> list[HistoryMultiModalMessage]:
        """Updates the messages for a specific task.

        Appends a new message to the task's message history and returns
        the complete list of messages.

        Args:
            task_id: The ID of the task
            new_message: The new message to add to the task's history

        Returns:
            The complete list of messages for the task
        """
        # Get the Redis key for this task's messages
        message_key = self._get_message_key(task_id)

        # Serialize the new message to JSON with mode='json' to ensure enums are properly serialized
        message_json = json.dumps(new_message.model_dump(mode="json"))

        # Add the new message to the list in Redis
        await self._redis.rpush(message_key, message_json)
        if self._ttl:
            await self._redis.expire(message_key, int(self._ttl))

        # Retrieve all messages for the task
        message_jsons = await self._redis.lrange(message_key, 0, -1)

        # Deserialize each message from JSON
        messages = [
            HistoryMultiModalMessage.model_validate(json.loads(msg)) for msg in message_jsons
        ]

        return messages

    async def set_canceled(self, task_id: str) -> None:
        """Marks a task as canceled.

        Args:
            task_id: The ID of the task to mark as canceled
        """
        # Set the canceled flag for the task
        await self._redis.set(self._get_canceled_key(task_id), "1", ex=self._ttl)

    async def is_canceled(self, task_id: str) -> bool:
        """Checks if a task is marked as canceled.

        Args:
            task_id: The ID of the task to check

        Returns:
            True if the task is canceled, False otherwise
        """
        # Check if the canceled flag is set
        canceled = await self._redis.get(self._get_canceled_key(task_id))
        return canceled == "1"
sk_agents.state.redis_state_manager.RedisStateManager.__init__
__init__(
    redis_client: Redis,
    ttl: int | None = None,
    key_prefix: str = "task_state:",
)

Initialize the RedisStateManager with a Redis client.

Parameters:

Name Type Description Default
redis_client Redis

An instance of Redis client

required
key_prefix str

Prefix used for Redis keys (default: "task_state:")

'task_state:'
Source code in src/sk_agents/state/redis_state_manager.py
def __init__(
    self,
    redis_client: Redis,
    ttl: int | None = None,
    key_prefix: str = "task_state:",
):
    """Initialize the RedisStateManager with a Redis client.

    Args:
        redis_client: An instance of Redis client
        key_prefix: Prefix used for Redis keys (default: "task_state:")
    """
    self._redis = redis_client
    self._key_prefix = key_prefix
    self._ttl = ttl
sk_agents.state.redis_state_manager.RedisStateManager.update_task_messages async
update_task_messages(
    task_id: str, new_message: HistoryMultiModalMessage
) -> list[HistoryMultiModalMessage]

Updates the messages for a specific task.

Appends a new message to the task's message history and returns the complete list of messages.

Parameters:

Name Type Description Default
task_id str

The ID of the task

required
new_message HistoryMultiModalMessage

The new message to add to the task's history

required

Returns:

Type Description
list[HistoryMultiModalMessage]

The complete list of messages for the task

Source code in src/sk_agents/state/redis_state_manager.py
async def update_task_messages(
    self, task_id: str, new_message: HistoryMultiModalMessage
) -> list[HistoryMultiModalMessage]:
    """Updates the messages for a specific task.

    Appends a new message to the task's message history and returns
    the complete list of messages.

    Args:
        task_id: The ID of the task
        new_message: The new message to add to the task's history

    Returns:
        The complete list of messages for the task
    """
    # Get the Redis key for this task's messages
    message_key = self._get_message_key(task_id)

    # Serialize the new message to JSON with mode='json' to ensure enums are properly serialized
    message_json = json.dumps(new_message.model_dump(mode="json"))

    # Add the new message to the list in Redis
    await self._redis.rpush(message_key, message_json)
    if self._ttl:
        await self._redis.expire(message_key, int(self._ttl))

    # Retrieve all messages for the task
    message_jsons = await self._redis.lrange(message_key, 0, -1)

    # Deserialize each message from JSON
    messages = [
        HistoryMultiModalMessage.model_validate(json.loads(msg)) for msg in message_jsons
    ]

    return messages
sk_agents.state.redis_state_manager.RedisStateManager.set_canceled async
set_canceled(task_id: str) -> None

Marks a task as canceled.

Parameters:

Name Type Description Default
task_id str

The ID of the task to mark as canceled

required
Source code in src/sk_agents/state/redis_state_manager.py
async def set_canceled(self, task_id: str) -> None:
    """Marks a task as canceled.

    Args:
        task_id: The ID of the task to mark as canceled
    """
    # Set the canceled flag for the task
    await self._redis.set(self._get_canceled_key(task_id), "1", ex=self._ttl)
sk_agents.state.redis_state_manager.RedisStateManager.is_canceled async
is_canceled(task_id: str) -> bool

Checks if a task is marked as canceled.

Parameters:

Name Type Description Default
task_id str

The ID of the task to check

required

Returns:

Type Description
bool

True if the task is canceled, False otherwise

Source code in src/sk_agents/state/redis_state_manager.py
async def is_canceled(self, task_id: str) -> bool:
    """Checks if a task is marked as canceled.

    Args:
        task_id: The ID of the task to check

    Returns:
        True if the task is canceled, False otherwise
    """
    # Check if the canceled flag is set
    canceled = await self._redis.get(self._get_canceled_key(task_id))
    return canceled == "1"
sk_agents.tealagents
sk_agents.tealagents.v1alpha1
sk_agents.tealagents.v1alpha1.hitl_manager
sk_agents.tealagents.v1alpha1.hitl_manager.HitlInterventionRequired

Bases: Exception

Exception raised when a tool call requires human-in-the-loop intervention.

Source code in src/sk_agents/tealagents/v1alpha1/hitl_manager.py
class HitlInterventionRequired(Exception):
    """
    Exception raised when a tool call requires human-in-the-loop intervention.
    """

    def __init__(self, function_calls: list[FunctionCallContent]):
        self.function_calls = function_calls
        if function_calls:
            self.plugin_name = function_calls[0].plugin_name
            self.function_name = function_calls[0].function_name
            message = f"HITL intervention required for {self.plugin_name}.{self.function_name}"
        else:
            message = "HITL intervention required"
        super().__init__(message)
sk_agents.tealagents.v1alpha1.hitl_manager.check_for_intervention
check_for_intervention(
    tool_call: FunctionCallContent,
) -> bool

Checks if the tool call requires user consent based on predefined high-risk tools.

Returns True if the tool call matches a high-risk tool, otherwise False.

Source code in src/sk_agents/tealagents/v1alpha1/hitl_manager.py
def check_for_intervention(tool_call: FunctionCallContent) -> bool:
    """
    Checks if the tool call requires user consent based on predefined high-risk tools.

    Returns True if the tool call matches a high-risk tool, otherwise False.
    """
    is_high_risk = (tool_call.plugin_name, tool_call.function_name) in HIGH_RISK_TOOLS
    print(
        f"HITL Check: Intercepted call to {tool_call.plugin_name}.{tool_call.function_name}. "
        f"{'Requires intervention.' if is_high_risk else 'Allowing to proceed.'}"
    )

    return is_high_risk