Skip to content

sk-agents

sk_agents
sk_agents.a2a
sk_agents.a2a.redis_task_store

Redis implementation of the TaskStore interface. This implementation uses Redis as the persistent store for Task objects.

sk_agents.a2a.redis_task_store.RedisTaskStore

Bases: TaskStore

Redis implementation of the TaskStore interface.

This class provides Redis-based persistence for Task objects.

Source code in src/sk_agents/a2a/redis_task_store.py
class RedisTaskStore(TaskStore):
    """Redis implementation of the TaskStore interface.

    This class provides Redis-based persistence for Task objects.
    """

    def __init__(self, redis_client: Redis, ttl: int | None = None, key_prefix: str = "task:"):
        """Initialize the RedisTaskStore with a Redis client.

        Args:
            redis_client: An instance of Redis client
            key_prefix: Prefix used for Redis keys (default: "task:")
        """
        self._redis = redis_client
        self._ttl = ttl
        self._key_prefix = key_prefix

    def _get_key(self, task_id: str) -> str:
        """Generate a Redis key for a given task ID.

        Args:
            task_id: The ID of the task

        Returns:
            A Redis key string
        """
        return f"{self._key_prefix}{task_id}"

    async def save(self, task: Task):
        """Saves or updates a task in the Redis store.

        Args:
            task: The Task object to save
        """
        # Convert the Task object to a serializable dictionary
        task_dict = task.model_dump(mode="json")

        # Serialize the task dictionary to JSON
        task_json = json.dumps(task_dict)

        # Store the serialized task in Redis using the task ID as the key
        await self._redis.set(self._get_key(task.id), task_json, ex=self._ttl)

    async def get(self, task_id: str) -> Task | None:
        """Retrieves a task from the Redis store by ID.

        Args:
            task_id: The ID of the task to retrieve

        Returns:
            The Task object if found, None otherwise
        """
        # Get the serialized task from Redis
        task_json = await self._redis.get(self._get_key(task_id))

        if task_json is None:
            return None

        # Deserialize the JSON string to a dictionary
        task_dict = json.loads(task_json)

        # Create and return a Task object from the dictionary
        return Task.model_validate(task_dict)

    async def delete(self, task_id: str):
        """Deletes a task from the Redis store by ID.

        Args:
            task_id: The ID of the task to delete
        """
        # Delete the task from Redis
        await self._redis.delete(self._get_key(task_id))
sk_agents.a2a.redis_task_store.RedisTaskStore.__init__
__init__(
    redis_client: Redis,
    ttl: int | None = None,
    key_prefix: str = "task:",
)

Initialize the RedisTaskStore with a Redis client.

Parameters:

Name Type Description Default
redis_client Redis

An instance of Redis client

required
key_prefix str

Prefix used for Redis keys (default: "task:")

'task:'
Source code in src/sk_agents/a2a/redis_task_store.py
def __init__(self, redis_client: Redis, ttl: int | None = None, key_prefix: str = "task:"):
    """Initialize the RedisTaskStore with a Redis client.

    Args:
        redis_client: An instance of Redis client
        key_prefix: Prefix used for Redis keys (default: "task:")
    """
    self._redis = redis_client
    self._ttl = ttl
    self._key_prefix = key_prefix
sk_agents.a2a.redis_task_store.RedisTaskStore.save async
save(task: Task)

Saves or updates a task in the Redis store.

Parameters:

Name Type Description Default
task Task

The Task object to save

required
Source code in src/sk_agents/a2a/redis_task_store.py
async def save(self, task: Task):
    """Saves or updates a task in the Redis store.

    Args:
        task: The Task object to save
    """
    # Convert the Task object to a serializable dictionary
    task_dict = task.model_dump(mode="json")

    # Serialize the task dictionary to JSON
    task_json = json.dumps(task_dict)

    # Store the serialized task in Redis using the task ID as the key
    await self._redis.set(self._get_key(task.id), task_json, ex=self._ttl)
sk_agents.a2a.redis_task_store.RedisTaskStore.get async
get(task_id: str) -> Task | None

Retrieves a task from the Redis store by ID.

Parameters:

Name Type Description Default
task_id str

The ID of the task to retrieve

required

Returns:

Type Description
Task | None

The Task object if found, None otherwise

Source code in src/sk_agents/a2a/redis_task_store.py
async def get(self, task_id: str) -> Task | None:
    """Retrieves a task from the Redis store by ID.

    Args:
        task_id: The ID of the task to retrieve

    Returns:
        The Task object if found, None otherwise
    """
    # Get the serialized task from Redis
    task_json = await self._redis.get(self._get_key(task_id))

    if task_json is None:
        return None

    # Deserialize the JSON string to a dictionary
    task_dict = json.loads(task_json)

    # Create and return a Task object from the dictionary
    return Task.model_validate(task_dict)
sk_agents.a2a.redis_task_store.RedisTaskStore.delete async
delete(task_id: str)

Deletes a task from the Redis store by ID.

Parameters:

Name Type Description Default
task_id str

The ID of the task to delete

required
Source code in src/sk_agents/a2a/redis_task_store.py
async def delete(self, task_id: str):
    """Deletes a task from the Redis store by ID.

    Args:
        task_id: The ID of the task to delete
    """
    # Delete the task from Redis
    await self._redis.delete(self._get_key(task_id))
sk_agents.a2a.response_classifier
sk_agents.a2a.response_classifier.A2AResponseClassifier

A class to classify responses from the A2A agent.

Source code in src/sk_agents/a2a/response_classifier.py
class A2AResponseClassifier:
    """
    A class to classify responses from the A2A agent.
    """

    NAME = "a2a-response-classifier"
    SYSTEM_PROMPT = (
        "## System Prompt: Agent Output Classifier\n"
        "\n"
        "**You are an AI agent tasked with analyzing the output of another AI agent "
        '(referred to as the "Primary Agent") and classifying its status. Your output MUST '
        "be a JSON object.**\n"
        "\n"
        "Your goal is to determine which of the following categories best describes "
        "the Primary Agent's output and structure your response accordingly.\n"
        "\n"
        "**Possible Classification Statuses & JSON Output Structures:**\n"
        "\n"
        "1.  **Status: `completed`**\n"
        "    * The Primary Agent has successfully completed the assigned task or answered the "
        "user's query.\n"
        "    * **JSON Output Structure:**\n"
        "        ```json\n"
        "        {\n"
        '          "status": "completed"\n'
        "        }\n"
        "        ```\n"
        '    * Keywords/phrases to look for: "done," "completed," "finished," "success," '
        '"here is the result," "I have finished," "the task is complete," direct answers '
        "to questions, generated content that fulfills the request.\n"
        "    * Context: The output clearly indicates finality and achievement of the original "
        "goal.\n"
        "\n"
        "2.  **Status: `failed`**\n"
        "    * The Primary Agent has failed to complete the assigned task or answered the "
        "user's query.\n"
        "    * **JSON Output Structure:**\n"
        "        ```json\n"
        "        {\n"
        '          "status": "failed"\n'
        "        }\n"
        "        ```\n"
        '    * Keywords/phrases to look for: "failed," "unable to," "cannot complete," '
        '"error," "encountered a problem," "not possible," "I\'m sorry, I can\'t," '
        '"task aborted."\n'
        "    * Context: The output indicates an inability to proceed or a definitive negative "
        "outcome regarding the task. This includes technical errors, lack of capability, "
        "or hitting a dead end.\n"
        "\n"
        "3.  **Status: `input-required`**\n"
        "    * The Primary Agent requires additional information, clarification, or a decision "
        "from the user to continue or complete the task.\n"
        "    * **JSON Output Structure:**\n"
        "        ```json\n"
        "        {\n"
        '          "status": "input-required",\n'
        '          "message": "A description of what info is needed from the user and why."\n'
        "        }\n"
        "        ```\n"
        '    * Keywords/phrases to look for: "what do you mean by," "could you please '
        'specify," "which option do you prefer," "do you want to proceed," "please '
        'provide," "I need more information," questions directed at the user.\n'
        "    * Context: The output is a direct or indirect request for user interaction to "
        "resolve ambiguity, make a choice, or provide necessary data. The `message` field "
        "should summarize this request.\n"
        "\n"
        "4.  **Status: `auth-required`**\n"
        "    * The Primary Agent has indicated that it needs to perform some form of "
        "authentication (e.g., login, API key verification, permission grant) before it "
        "can proceed with the task.\n"
        "    * **JSON Output Structure:**\n"
        "        ```json\n"
        "        {\n"
        '          "status": "auth-required",\n'
        '          "message": "A description of what authentication is needed and why.",\n'
        '          "auth_details": {} // Likely a JSON structure extracted from the Primary '
        "Agent's output containing technical details about the auth request. Can be an "
        "empty object if no specific structure is found.\n"
        "        }\n"
        "        ```\n"
        '    * Keywords/phrases to look for: "please log in," "authentication required," '
        '"access denied," "invalid credentials," "API key needed," "sign in to continue," '
        '"verify your identity," "permissions needed."\n'
        "    * Context: The output explicitly states or strongly implies that a security or "
        "access barrier is preventing task progression. The `message` field should explain "
        "this. The `auth_details` field should attempt to capture any structured information "
        "(e.g., OAuth URLs, scopes needed, realm info) provided by the Primary Agent "
        "regarding the authentication. If the Primary Agent provides a JSON blob related to "
        "auth, try to pass that through in `auth_details`.\n"
        "\n"
        "**Your Analysis Process:**\n"
        "\n"
        "1.  **Carefully review the entire output from the Primary Agent.** Understand the "
        "context and the overall message.\n"
        "2.  **Look for explicit keywords and phrases** associated with each category.\n"
        "3.  **Consider the intent** behind the Primary Agent's message.\n"
        "4.  **Prioritize:**\n"
        "    * If authentication is mentioned as a blocker, classify as `auth-required`. "
        "Extract relevant details for the `message` and `auth_details` fields.\n"
        "    * If the agent is clearly asking the user a question to proceed (and it's not "
        "primarily an authentication request), classify as `input-required`. Formulate the "
        "`message` field.\n"
        "    * If the agent explicitly states success, classify as `completed`.\n"
        "    * If the agent explicitly states failure or an insurmountable error (not related "
        "to needing input or auth), classify as `failed`.\n"
        "5.  **Extract Information for `message` and `auth_details`:**\n"
        "    * For `input-required` and `auth-required`, the `message` should be a concise "
        "explanation derived from the Primary Agent's output.\n"
        "    * For `auth-required`, if the Primary Agent's output includes a structured "
        "(e.g., JSON) segment detailing the authentication requirements, attempt to extract "
        "and place this into the `auth_details` field. If no specific structure is found, "
        "`auth_details` can be an empty object `{}`. Do not invent details; only extract "
        "what is provided.\n"
        "6.  **If the output is ambiguous, try to infer the most likely category.** If truly "
        'unclear, you may need a default or "UNCLEAR" category (though this prompt focuses '
        "on the four defined). In such a case, defaulting to `failed` with an appropriate "
        "message might be a safe fallback if no other category fits.\n"
        "\n"
        "**Output Format:**\n"
        "\n"
        "Your output **MUST** be a single JSON object corresponding to one of the structures "
        "defined above.\n"
        "\n"
        "**Example Scenarios:**\n"
        "\n"
        "* **Primary Agent Output:** \"I've finished generating the report you asked for. "
        "It's attached below.\"\n"
        "    * **Your JSON Output:**\n"
        "        ```json\n"
        "        {\n"
        '          "status": "completed"\n'
        "        }\n"
        "        ```\n"
        "* **Primary Agent Output:** \"I'm sorry, I encountered an unexpected error and cannot "
        'process your request at this time. Error code: 503. Please try again later."\n'
        "    * **Your JSON Output:**\n"
        "        ```json\n"
        "        {\n"
        '          "status": "failed"\n'
        "        }\n"
        "        ```\n"
        '* **Primary Agent Output:** "To help you with that, could you please tell me which '
        'specific date range you are interested in for the sales data?"\n'
        "    * **Your JSON Output:**\n"
        "        ```json\n"
        "        {\n"
        '          "status": "input-required",\n'
        '          "message": "The agent needs to know the specific date range for the sales '
        'data to proceed."\n'
        "        }\n"
        "        ```\n"
        '* **Primary Agent Output:** "Access to this API endpoint requires authentication. '
        "Please provide a valid Bearer token. Details: {'type': 'Bearer', 'realm': "
        "'[api.example.com/auth](https://api.example.com/auth)'}}\"\n"
        "    * **Your JSON Output:**\n"
        "        ```json\n"
        "        {\n"
        '          "status": "auth-required",\n'
        '          "message": "Access to the API endpoint requires a valid Bearer token.",\n'
        '          "auth_details": {\n'
        '            "type": "Bearer",\n'
        '            "realm": "[api.example.com/auth](https://api.example.com/auth)"\n'
        "          }\n"
        "        }\n"
        "        ```\n"
        '* **Primary Agent Output:** "You need to sign in to your account to access your '
        'profile. Click here to login."\n'
        "    * **Your JSON Output:**\n"
        "        ```json\n"
        "        {\n"
        '          "status": "auth-required",\n'
        '          "message": "User needs to sign in to their account to access their profile.",\n'
        '          "auth_details": {}\n'
        "        }\n"
        "        ```\n"
        "\n"
        "**Critical Considerations:**\n"
        "\n"
        "* Ensure your output is always valid JSON.\n"
        "* Be precise in your classification and in the information extracted for the `message` "
        "and `auth_details` fields.\n"
        "* Focus solely on the provided output from the Primary Agent.\n"
        "* Adhere to the prioritization logic.\n"
    )

    def __init__(self, app_config: AppConfig, chat_completion_builder: ChatCompletionBuilder):
        model_name = app_config.get(TA_A2A_OUTPUT_CLASSIFIER_MODEL.env_name)
        chat_completion = chat_completion_builder.get_chat_completion_for_model(
            service_id=self.NAME, model_name=model_name
        )
        kernel = Kernel()
        kernel.add_service(chat_completion)
        settings = kernel.get_prompt_execution_settings_from_service_id(self.NAME)
        settings.response_format = A2AResponseClassification
        self.agent = ChatCompletionAgent(
            kernel=kernel,
            name=self.NAME,
            instructions=self.SYSTEM_PROMPT,
            arguments=KernelArguments(settings=settings),
        )

    async def classify_response(self, response: str) -> A2AResponseClassification:
        """
        Classify the response from the A2A agent.

        Args:
            response (str): The response from the A2A agent.

        Returns:
            str: The classification of the response.
        """
        chat_history = ChatHistory()
        chat_history.add_user_message(f"Please classify the following response:\n\n{response}")
        async for content in self.agent.invoke(messages=chat_history):
            data = json.loads(str(content.content))
            return A2AResponseClassification(**data)
        return A2AResponseClassification(
            status=A2AResponseStatus.failed,
            message="No response received from response classifier.",
        )
sk_agents.a2a.response_classifier.A2AResponseClassifier.classify_response async
classify_response(
    response: str,
) -> A2AResponseClassification

Classify the response from the A2A agent.

Parameters:

Name Type Description Default
response str

The response from the A2A agent.

required

Returns:

Name Type Description
str A2AResponseClassification

The classification of the response.

Source code in src/sk_agents/a2a/response_classifier.py
async def classify_response(self, response: str) -> A2AResponseClassification:
    """
    Classify the response from the A2A agent.

    Args:
        response (str): The response from the A2A agent.

    Returns:
        str: The classification of the response.
    """
    chat_history = ChatHistory()
    chat_history.add_user_message(f"Please classify the following response:\n\n{response}")
    async for content in self.agent.invoke(messages=chat_history):
        data = json.loads(str(content.content))
        return A2AResponseClassification(**data)
    return A2AResponseClassification(
        status=A2AResponseStatus.failed,
        message="No response received from response classifier.",
    )
sk_agents.appv3
class AppV3

@staticmethod def run(name, version, app_config, config, app): pass

sk_agents.authorization
sk_agents.authorization.request_authorizer
sk_agents.authorization.request_authorizer.RequestAuthorizer

Bases: ABC

Source code in src/sk_agents/authorization/request_authorizer.py
class RequestAuthorizer(ABC):
    @abstractmethod
    async def authorize_request(self, auth_header: str) -> str:
        """
        Validates the given authorization header and returns a unique identifier
        for the authenticated user.

        Parameters:
            auth_header (str): The value of the 'Authorization' HTTP header.
                Typically, this is in the format 'Bearer <token>' or some other
                scheme depending on the implementation.

        Returns:
            str: A unique string that identifies the authenticated user.
                This could be a user ID, username, email, or any other unique
                identifier suitable for tracking and authorization.
            Examples:
                "user_12345"
                "alice@example.com"

        Raises:
            ValueError: If the authorization header is missing, malformed, or invalid.
            AuthenticationError (optional): If used in your implementation, it may
                be raised to signal an authentication failure.
        """
        pass
sk_agents.authorization.request_authorizer.RequestAuthorizer.authorize_request abstractmethod async
authorize_request(auth_header: str) -> str

Validates the given authorization header and returns a unique identifier for the authenticated user.

Parameters:

Name Type Description Default
auth_header str

The value of the 'Authorization' HTTP header. Typically, this is in the format 'Bearer ' or some other scheme depending on the implementation.

required

Returns:

Name Type Description
str str

A unique string that identifies the authenticated user. This could be a user ID, username, email, or any other unique identifier suitable for tracking and authorization.

Examples str

"user_12345" "alice@example.com"

Raises:

Type Description
ValueError

If the authorization header is missing, malformed, or invalid.

AuthenticationError(optional)

If used in your implementation, it may be raised to signal an authentication failure.

Source code in src/sk_agents/authorization/request_authorizer.py
@abstractmethod
async def authorize_request(self, auth_header: str) -> str:
    """
    Validates the given authorization header and returns a unique identifier
    for the authenticated user.

    Parameters:
        auth_header (str): The value of the 'Authorization' HTTP header.
            Typically, this is in the format 'Bearer <token>' or some other
            scheme depending on the implementation.

    Returns:
        str: A unique string that identifies the authenticated user.
            This could be a user ID, username, email, or any other unique
            identifier suitable for tracking and authorization.
        Examples:
            "user_12345"
            "alice@example.com"

    Raises:
        ValueError: If the authorization header is missing, malformed, or invalid.
        AuthenticationError (optional): If used in your implementation, it may
            be raised to signal an authentication failure.
    """
    pass
sk_agents.exceptions
sk_agents.exceptions.AgentsException

Bases: Exception

Base class for all exception in SKagents

Source code in src/sk_agents/exceptions.py
class AgentsException(Exception):
    """Base class for all exception in SKagents"""
sk_agents.exceptions.InvalidConfigException

Bases: AgentsException

Exception raised when the provided configuration is invalid

Source code in src/sk_agents/exceptions.py
class InvalidConfigException(AgentsException):
    """Exception raised when the provided configuration is invalid"""

    message: str

    def __init__(self, message: str):
        self.message = message
sk_agents.exceptions.InvalidInputException

Bases: AgentsException

Exception raised when the provided input type is invalid

Source code in src/sk_agents/exceptions.py
class InvalidInputException(AgentsException):
    """Exception raised when the provided input type is invalid"""

    message: str

    def __init__(self, message: str):
        self.message = message
sk_agents.exceptions.AgentInvokeException

Bases: AgentsException

Exception raised when invoking an Agent failed

Source code in src/sk_agents/exceptions.py
class AgentInvokeException(AgentsException):
    """Exception raised when invoking an Agent failed"""

    message: str

    def __init__(self, message: str):
        self.message = message
sk_agents.exceptions.PersistenceCreateError

Bases: AgentsException

Exception raised for errors during task creation.

Source code in src/sk_agents/exceptions.py
class PersistenceCreateError(AgentsException):
    """Exception raised for errors during task creation."""

    message: str

    def __init__(self, message: str):
        self.message = message
sk_agents.exceptions.PersistenceLoadError

Bases: AgentsException

Exception raised for errors during task loading.

Source code in src/sk_agents/exceptions.py
class PersistenceLoadError(AgentsException):
    """Exception raised for errors during task loading."""

    message: str

    def __init__(self, message: str):
        self.message = message
sk_agents.exceptions.PersistenceUpdateError

Bases: AgentsException

Exception raised for errors during task update.

Source code in src/sk_agents/exceptions.py
class PersistenceUpdateError(AgentsException):
    """Exception raised for errors during task update."""

    message: str

    def __init__(self, message: str):
        self.message = message
sk_agents.exceptions.PersistenceDeleteError

Bases: AgentsException

Exception raised for errors during task deletion.

Source code in src/sk_agents/exceptions.py
class PersistenceDeleteError(AgentsException):
    """Exception raised for errors during task deletion."""

    message: str

    def __init__(self, message: str):
        self.message = message
sk_agents.exceptions.AuthenticationException

Bases: AgentsException

Exception raised errors when authenticating users

Source code in src/sk_agents/exceptions.py
class AuthenticationException(AgentsException):
    """Exception raised errors when authenticating users"""

    message: str

    def __init__(self, message: str):
        self.message = message
sk_agents.exceptions.PluginCatalogDefinitionException

Bases: AgentsException

Exception raised when the parsed json does not match the PluginCatalogDefinition Model

Source code in src/sk_agents/exceptions.py
class PluginCatalogDefinitionException(AgentsException):
    """Exception raised when the parsed json does not match the PluginCatalogDefinition Model"""

    message: str

    def __init__(self, message: str):
        self.message = message
sk_agents.exceptions.PluginFileReadException

Bases: AgentsException

Raise this exception when the plugin file fails to be read

Source code in src/sk_agents/exceptions.py
class PluginFileReadException(AgentsException):
    """Raise this exception when the plugin file fails to be read"""

    message: str

    def __init__(self, message: str):
        self.message = message
sk_agents.routes
sk_agents.routes.Routes
Source code in src/sk_agents/routes.py
class Routes:
    @staticmethod
    def get_url(name: str, version: str, app_config: AppConfig) -> str:
        base_url = app_config.get(TA_AGENT_BASE_URL.env_name)
        if not base_url:
            logger.exception("Base URL is not provided in the app config.")
            raise ValueError("Base URL is not provided in the app config.")
        return f"{base_url}/{name}/{version}/a2a"

    @staticmethod
    def get_provider(app_config: AppConfig) -> AgentProvider:
        return AgentProvider(
            organization=app_config.get(TA_PROVIDER_ORG.env_name),
            url=app_config.get(TA_PROVIDER_URL.env_name),
        )

    @staticmethod
    def get_agent_card(config: BaseConfig, app_config: AppConfig) -> AgentCard:
        if config.metadata is None:
            logger.exception("Agent card metadata is not provided in the config.")
            raise ValueError("Agent card metadata is not provided in the config.")

        metadata = config.metadata
        skills = [
            AgentSkill(
                id=skill.id,
                name=skill.name,
                description=skill.description,
                tags=skill.tags,
                examples=skill.examples,
                inputModes=skill.input_modes,
                outputModes=skill.output_modes,
            )
            for skill in metadata.skills
        ]
        return AgentCard(
            name=config.name,
            version=str(config.version),
            description=metadata.description,
            url=Routes.get_url(config.name, config.version, app_config),
            provider=Routes.get_provider(app_config),
            documentationUrl=config.metadata.documentation_url,
            capabilities=AgentCapabilities(
                streaming=True, pushNotifications=False, stateTransitionHistory=True
            ),
            defaultInputModes=["text"],
            defaultOutputModes=["text"],
            skills=skills,
        )

    @staticmethod
    def get_request_handler(
        config: BaseConfig,
        app_config: AppConfig,
        chat_completion_builder: ChatCompletionBuilder,
        state_manager: StateManager,
        task_store: TaskStore,
    ) -> DefaultRequestHandler:
        return DefaultRequestHandler(
            agent_executor=A2AAgentExecutor(
                config, app_config, chat_completion_builder, state_manager
            ),
            task_store=task_store,
        )

    @staticmethod
    def get_a2a_routes(
        name: str,
        version: str,
        description: str,
        config: BaseConfig,
        app_config: AppConfig,
        chat_completion_builder: ChatCompletionBuilder,
        task_store: TaskStore,
        state_manager: StateManager,
    ) -> APIRouter:
        a2a_app = A2AStarletteApplication(
            agent_card=Routes.get_agent_card(config, app_config),
            http_handler=Routes.get_request_handler(
                config, app_config, chat_completion_builder, state_manager, task_store
            ),
        )
        a2a_router = APIRouter()

        @a2a_router.post("")
        @docstring_parameter(description)
        async def handle_a2a(request: Request):
            """
            {0}

            Agent-to-Agent Invocation
            """
            return await a2a_app._handle_requests(request)

        @a2a_router.get("/.well-known/agent.json")
        @docstring_parameter(f"{name}:{version} - {description}")
        async def handle_get_agent_card(request: Request):
            """
            Retrieve agent card for {0}
            """
            return await a2a_app._handle_get_agent_card(request)

        return a2a_router

    @staticmethod
    def get_rest_routes(
        name: str,
        version: str,
        description: str,
        root_handler_name: str,
        config: BaseConfig,
        app_config: AppConfig,
        input_class: type,
        output_class: type,
    ) -> APIRouter:
        router = APIRouter()

        @router.post("")
        @docstring_parameter(description)
        async def invoke(inputs: input_class, request: Request) -> InvokeResponse[output_class]:  # type: ignore
            """
            {0}
            """
            st = get_telemetry()
            context = extract(request.headers)

            authorization = request.headers.get("authorization", None)
            with (
                st.tracer.start_as_current_span(
                    f"{name}-{version}-invoke",
                    context=context,
                )
                if st.telemetry_enabled()
                else nullcontext()
            ):
                match root_handler_name:
                    case "skagents":
                        handler: BaseHandler = skagents_handle(config, app_config, authorization)
                    case _:
                        raise ValueError(f"Unknown apiVersion: {config.apiVersion}")

                inv_inputs = inputs.__dict__
                output = await handler.invoke(inputs=inv_inputs)
                return output

        @router.post("/sse")
        @docstring_parameter(description)
        async def invoke_sse(inputs: input_class, request: Request) -> StreamingResponse:
            """
            {0}
            Initiate SSE call
            """
            st = get_telemetry()
            context = extract(request.headers)
            authorization = request.headers.get("authorization", None)
            inv_inputs = inputs.__dict__

            async def event_generator():
                with (
                    st.tracer.start_as_current_span(
                        f"{config.service_name}-{str(config.version)}-invoke_sse",
                        context=context,
                    )
                    if st.telemetry_enabled()
                    else nullcontext()
                ):
                    match root_handler_name:
                        case "skagents":
                            handler: BaseHandler = skagents_handle(
                                config, app_config, authorization
                            )
                            # noinspection PyTypeChecker
                            async for content in handler.invoke_stream(inputs=inv_inputs):
                                yield get_sse_event_for_response(content)
                        case _:
                            logger.exception(
                                "Unknown apiVersion: %s", config.apiVersion, exc_info=True
                            )
                            raise ValueError(f"Unknown apiVersion: {config.apiVersion}")

            return StreamingResponse(event_generator(), media_type="text/event-stream")

        return router

    @staticmethod
    def get_websocket_routes(
        name: str,
        version: str,
        root_handler_name: str,
        config: BaseConfig,
        app_config: AppConfig,
        input_class: type,
    ) -> APIRouter:
        router = APIRouter()

        @router.websocket("/stream")
        async def invoke_stream(websocket: WebSocket) -> None:
            await websocket.accept()
            st = get_telemetry()
            context = extract(websocket.headers)

            authorization = websocket.headers.get("authorization", None)
            try:
                data = await websocket.receive_json()
                with (
                    st.tracer.start_as_current_span(
                        f"{name}-{str(version)}-invoke_stream",
                        context=context,
                    )
                    if st.telemetry_enabled()
                    else nullcontext()
                ):
                    inputs = input_class(**data)
                    inv_inputs = inputs.__dict__
                    match root_handler_name:
                        case "skagents":
                            handler: BaseHandler = skagents_handle(
                                config, app_config, authorization
                            )
                            async for content in handler.invoke_stream(inputs=inv_inputs):
                                if isinstance(content, PartialResponse):
                                    await websocket.send_text(content.output_partial)
                            await websocket.close()
                        case _:
                            logger.exception(
                                "Unknown apiVersion: %s", config.apiVersion, exc_info=True
                            )
                            raise ValueError(f"Unknown apiVersion %s: {config.apiVersion}")
            except WebSocketDisconnect:
                logger.exception("websocket disconnected")
                print("websocket disconnected")

        return router

    @staticmethod
    def get_stateful_routes(
        name: str,
        version: str,
        description: str,
        config: BaseConfig,
        state_manager: StateManager,
        authorizer: RequestAuthorizer,
        input_class: type[UserMessage],
    ) -> APIRouter:
        """
        Get the stateful API routes for the given configuration.
        """
        router = APIRouter()

        async def get_user_id(authorization: str = Header(None)):
            user_id = await authorizer.authorize_request(authorization)
            if not user_id:
                raise HTTPException(
                    status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required"
                )
            return user_id

        @router.post(
            "/chat",
            response_model=StateResponse,
            summary="Send a message to the agent",
            response_description="Agent response with state identifiers",
            tags=["Agent"],
        )
        async def chat(message: input_class, user_id: str = Depends(get_user_id)) -> StateResponse:
            # Handle new task creation or task retrieval
            if message.task_id is None:
                # New task
                session_id, task_id = await state_manager.create_task(message.session_id, user_id)
                task_state = await state_manager.get_task(task_id)
            else:
                # Follow-on request
                task_id = message.task_id
                task_state = await state_manager.get_task(task_id)
                # Verify user ownership
                if task_state.user_id != user_id:
                    raise HTTPException(
                        status_code=status.HTTP_401_UNAUTHORIZED,
                        detail="Not authorized to access this task",
                    )
                session_id = task_state.session_id

            # Create a new request
            request_id = await state_manager.create_request(task_id)

            # Return response with state identifiers
            return StateResponse(
                session_id=session_id,
                task_id=task_id,
                request_id=request_id,
                status=TaskStatus.COMPLETED,
                content="Agent response",  # Replace with actual response
            )

        return router

    @staticmethod
    def get_resume_routes() -> APIRouter:
        router = APIRouter()

        @router.post("/tealagents/v1alpha1/resume/{request_id}")
        async def resume(request_id: str, request: Request, body: ResumeRequest):
            authorization = request.headers.get("authorization", None)
            try:
                return await TealAgentsV1Alpha1Handler.resume_task(
                    request_id, authorization, body.model_dump(), stream=False
                )
            except Exception as e:
                logger.exception(f"Error in resume: {e}")
                raise HTTPException(status_code=500, detail="Internal Server Error") from e

        @router.post("/tealagents/v1alpha1/resume/{request_id}/sse")
        async def resume_sse(request_id: str, request: Request, body: ResumeRequest):
            authorization = request.headers.get("authorization", None)

            async def event_generator():
                try:
                    async for content in TealAgentsV1Alpha1Handler.resume_task(
                        request_id, authorization, body.model_dump(), stream=True
                    ):
                        yield get_sse_event_for_response(content)
                except Exception as e:
                    logger.exception(f"Error in resume_sse: {e}")
                    raise HTTPException(status_code=500, detail="Internal Server Error") from e

            return StreamingResponse(event_generator(), media_type="text/event-stream")

        return router
sk_agents.routes.Routes.get_stateful_routes staticmethod
get_stateful_routes(
    name: str,
    version: str,
    description: str,
    config: BaseConfig,
    state_manager: StateManager,
    authorizer: RequestAuthorizer,
    input_class: type[UserMessage],
) -> APIRouter

Get the stateful API routes for the given configuration.

Source code in src/sk_agents/routes.py
@staticmethod
def get_stateful_routes(
    name: str,
    version: str,
    description: str,
    config: BaseConfig,
    state_manager: StateManager,
    authorizer: RequestAuthorizer,
    input_class: type[UserMessage],
) -> APIRouter:
    """
    Get the stateful API routes for the given configuration.
    """
    router = APIRouter()

    async def get_user_id(authorization: str = Header(None)):
        user_id = await authorizer.authorize_request(authorization)
        if not user_id:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required"
            )
        return user_id

    @router.post(
        "/chat",
        response_model=StateResponse,
        summary="Send a message to the agent",
        response_description="Agent response with state identifiers",
        tags=["Agent"],
    )
    async def chat(message: input_class, user_id: str = Depends(get_user_id)) -> StateResponse:
        # Handle new task creation or task retrieval
        if message.task_id is None:
            # New task
            session_id, task_id = await state_manager.create_task(message.session_id, user_id)
            task_state = await state_manager.get_task(task_id)
        else:
            # Follow-on request
            task_id = message.task_id
            task_state = await state_manager.get_task(task_id)
            # Verify user ownership
            if task_state.user_id != user_id:
                raise HTTPException(
                    status_code=status.HTTP_401_UNAUTHORIZED,
                    detail="Not authorized to access this task",
                )
            session_id = task_state.session_id

        # Create a new request
        request_id = await state_manager.create_request(task_id)

        # Return response with state identifiers
        return StateResponse(
            session_id=session_id,
            task_id=task_id,
            request_id=request_id,
            status=TaskStatus.COMPLETED,
            content="Agent response",  # Replace with actual response
        )

    return router
sk_agents.ska_types
sk_agents.ska_types.HistoryMessage

Bases: BaseModel

A single interaction in a chat history.
'role' - Either 'user' (requestor) or 'assistant' (responder) indicating who sent the message.
'content' - The content of the message

Source code in src/sk_agents/ska_types.py
class HistoryMessage(BaseModel):
    """A single interaction in a chat history.<br/>
    'role' - Either 'user' (requestor) or 'assistant' (responder) indicating
    who sent the message.<br/>
    'content' - The content of the message"""

    role: Literal["user", "assistant"]
    content: str
sk_agents.ska_types.BaseInput

Bases: KernelBaseModel

The history of a chat interaction between an automated assistant and a human.

Source code in src/sk_agents/ska_types.py
class BaseInput(KernelBaseModel):
    """The history of a chat interaction between an automated assistant and a
    human."""

    chat_history: list[HistoryMessage] | None = None
sk_agents.ska_types.BaseInputWithUserContext

Bases: KernelBaseModel

The history of a chat interaction between an automated assistant and a human, along with context about the user.

Source code in src/sk_agents/ska_types.py
class BaseInputWithUserContext(KernelBaseModel):
    """The history of a chat interaction between an automated assistant and a
    human, along with context about the user."""

    chat_history: list[HistoryMessage] | None = None
    user_context: dict[str, str] | None = None
sk_agents.state
sk_agents.state.redis_state_manager

Redis implementation of the StateManager interface. This implementation uses Redis as the persistent store for task state management.

sk_agents.state.redis_state_manager.RedisStateManager

Bases: StateManager

Redis implementation of the StateManager interface.

This class provides Redis-based persistence for task state management.

Source code in src/sk_agents/state/redis_state_manager.py
class RedisStateManager(StateManager):
    """Redis implementation of the StateManager interface.

    This class provides Redis-based persistence for task state management.
    """

    def __init__(
        self,
        redis_client: Redis,
        ttl: int | None = None,
        key_prefix: str = "task_state:",
    ):
        """Initialize the RedisStateManager with a Redis client.

        Args:
            redis_client: An instance of Redis client
            key_prefix: Prefix used for Redis keys (default: "task_state:")
        """
        self._redis = redis_client
        self._key_prefix = key_prefix
        self._ttl = ttl

    def _get_message_key(self, task_id: str) -> str:
        """Generate a Redis key for a task's messages.

        Args:
            task_id: The ID of the task

        Returns:
            A Redis key string for the task's messages
        """
        return f"{self._key_prefix}{task_id}:messages"

    def _get_canceled_key(self, task_id: str) -> str:
        """Generate a Redis key for a task's canceled status.

        Args:
            task_id: The ID of the task

        Returns:
            A Redis key string for the task's canceled status
        """
        return f"{self._key_prefix}{task_id}:canceled"

    async def update_task_messages(
        self, task_id: str, new_message: HistoryMultiModalMessage
    ) -> list[HistoryMultiModalMessage]:
        """Updates the messages for a specific task.

        Appends a new message to the task's message history and returns
        the complete list of messages.

        Args:
            task_id: The ID of the task
            new_message: The new message to add to the task's history

        Returns:
            The complete list of messages for the task
        """
        # Get the Redis key for this task's messages
        message_key = self._get_message_key(task_id)

        # Serialize the new message to JSON with mode='json' to ensure enums are properly serialized
        message_json = json.dumps(new_message.model_dump(mode="json"))

        # Add the new message to the list in Redis
        await self._redis.rpush(message_key, message_json)
        if self._ttl:
            await self._redis.expire(message_key, int(self._ttl))

        # Retrieve all messages for the task
        message_jsons = await self._redis.lrange(message_key, 0, -1)

        # Deserialize each message from JSON
        messages = [
            HistoryMultiModalMessage.model_validate(json.loads(msg)) for msg in message_jsons
        ]

        return messages

    async def set_canceled(self, task_id: str) -> None:
        """Marks a task as canceled.

        Args:
            task_id: The ID of the task to mark as canceled
        """
        # Set the canceled flag for the task
        await self._redis.set(self._get_canceled_key(task_id), "1", ex=self._ttl)

    async def is_canceled(self, task_id: str) -> bool:
        """Checks if a task is marked as canceled.

        Args:
            task_id: The ID of the task to check

        Returns:
            True if the task is canceled, False otherwise
        """
        # Check if the canceled flag is set
        canceled = await self._redis.get(self._get_canceled_key(task_id))
        return canceled == "1"
sk_agents.state.redis_state_manager.RedisStateManager.__init__
__init__(
    redis_client: Redis,
    ttl: int | None = None,
    key_prefix: str = "task_state:",
)

Initialize the RedisStateManager with a Redis client.

Parameters:

Name Type Description Default
redis_client Redis

An instance of Redis client

required
key_prefix str

Prefix used for Redis keys (default: "task_state:")

'task_state:'
Source code in src/sk_agents/state/redis_state_manager.py
def __init__(
    self,
    redis_client: Redis,
    ttl: int | None = None,
    key_prefix: str = "task_state:",
):
    """Initialize the RedisStateManager with a Redis client.

    Args:
        redis_client: An instance of Redis client
        key_prefix: Prefix used for Redis keys (default: "task_state:")
    """
    self._redis = redis_client
    self._key_prefix = key_prefix
    self._ttl = ttl
sk_agents.state.redis_state_manager.RedisStateManager.update_task_messages async
update_task_messages(
    task_id: str, new_message: HistoryMultiModalMessage
) -> list[HistoryMultiModalMessage]

Updates the messages for a specific task.

Appends a new message to the task's message history and returns the complete list of messages.

Parameters:

Name Type Description Default
task_id str

The ID of the task

required
new_message HistoryMultiModalMessage

The new message to add to the task's history

required

Returns:

Type Description
list[HistoryMultiModalMessage]

The complete list of messages for the task

Source code in src/sk_agents/state/redis_state_manager.py
async def update_task_messages(
    self, task_id: str, new_message: HistoryMultiModalMessage
) -> list[HistoryMultiModalMessage]:
    """Updates the messages for a specific task.

    Appends a new message to the task's message history and returns
    the complete list of messages.

    Args:
        task_id: The ID of the task
        new_message: The new message to add to the task's history

    Returns:
        The complete list of messages for the task
    """
    # Get the Redis key for this task's messages
    message_key = self._get_message_key(task_id)

    # Serialize the new message to JSON with mode='json' to ensure enums are properly serialized
    message_json = json.dumps(new_message.model_dump(mode="json"))

    # Add the new message to the list in Redis
    await self._redis.rpush(message_key, message_json)
    if self._ttl:
        await self._redis.expire(message_key, int(self._ttl))

    # Retrieve all messages for the task
    message_jsons = await self._redis.lrange(message_key, 0, -1)

    # Deserialize each message from JSON
    messages = [
        HistoryMultiModalMessage.model_validate(json.loads(msg)) for msg in message_jsons
    ]

    return messages
sk_agents.state.redis_state_manager.RedisStateManager.set_canceled async
set_canceled(task_id: str) -> None

Marks a task as canceled.

Parameters:

Name Type Description Default
task_id str

The ID of the task to mark as canceled

required
Source code in src/sk_agents/state/redis_state_manager.py
async def set_canceled(self, task_id: str) -> None:
    """Marks a task as canceled.

    Args:
        task_id: The ID of the task to mark as canceled
    """
    # Set the canceled flag for the task
    await self._redis.set(self._get_canceled_key(task_id), "1", ex=self._ttl)
sk_agents.state.redis_state_manager.RedisStateManager.is_canceled async
is_canceled(task_id: str) -> bool

Checks if a task is marked as canceled.

Parameters:

Name Type Description Default
task_id str

The ID of the task to check

required

Returns:

Type Description
bool

True if the task is canceled, False otherwise

Source code in src/sk_agents/state/redis_state_manager.py
async def is_canceled(self, task_id: str) -> bool:
    """Checks if a task is marked as canceled.

    Args:
        task_id: The ID of the task to check

    Returns:
        True if the task is canceled, False otherwise
    """
    # Check if the canceled flag is set
    canceled = await self._redis.get(self._get_canceled_key(task_id))
    return canceled == "1"
sk_agents.stateful
sk_agents.stateful.UserMessage

Bases: BaseModel

New input model for the tealagents/v1alpha1 API version. Unlike BaseMultiModalInput, chat history is maintained server-side.

Source code in src/sk_agents/stateful.py
class UserMessage(BaseModel):
    """
    New input model for the tealagents/v1alpha1 API version.
    Unlike BaseMultiModalInput, chat history is maintained server-side.
    """

    session_id: UUID4 | None = None
    task_id: UUID4 | None = None
    items: list[MultiModalItem]

    @validator("session_id", "task_id", pre=True)
    def validate_uuid(cls, v):
        if v is not None and not isinstance(v, uuid.UUID):
            try:
                return uuid.UUID(v)
            except (ValueError, AttributeError) as err:
                raise ValueError(f"Invalid UUID format: {v}") from err
        return v
sk_agents.stateful.TaskState

Bases: BaseModel

Model for the state associated with a Task ID

Source code in src/sk_agents/stateful.py
class TaskState(BaseModel):
    """Model for the state associated with a Task ID"""

    task_id: UUID4
    session_id: UUID4
    user_id: str  # User identity for authorization
    messages: list[dict[str, Any]]  # Chat history and execution trace
    status: TaskStatus = TaskStatus.RUNNING
    created_at: datetime = Field(default_factory=datetime.utcnow)
    updated_at: datetime = Field(default_factory=datetime.utcnow)
    metadata: dict[str, Any] = Field(default_factory=dict)
sk_agents.stateful.RequestState

Bases: BaseModel

Model for the state associated with a Request ID

Source code in src/sk_agents/stateful.py
class RequestState(BaseModel):
    """Model for the state associated with a Request ID"""

    request_id: UUID4
    task_id: UUID4
    status: TaskStatus = TaskStatus.RUNNING
    created_at: datetime = Field(default_factory=datetime.utcnow)
    updated_at: datetime = Field(default_factory=datetime.utcnow)
    metadata: dict[str, Any] = Field(default_factory=dict)
sk_agents.stateful.StateResponse

Bases: BaseModel

Response model including state identifiers

Source code in src/sk_agents/stateful.py
class StateResponse(BaseModel):
    """Response model including state identifiers"""

    session_id: UUID4
    task_id: UUID4
    request_id: UUID4
    status: TaskStatus
    content: Any
sk_agents.stateful.StateManager

Bases: ABC

Abstract base class for state management

Source code in src/sk_agents/stateful.py
class StateManager(ABC):
    """Abstract base class for state management"""

    @abstractmethod
    async def create_task(self, session_id: UUID4 | None, user_id: str) -> tuple[UUID4, UUID4]:
        """Create a new task and return session_id and task_id"""

    @abstractmethod
    async def get_task(self, task_id: UUID4) -> TaskState:
        """Get a task by ID"""

    @abstractmethod
    async def update_task(self, task_state: TaskState) -> None:
        """Update a task state"""

    @abstractmethod
    async def create_request(self, task_id: UUID4) -> UUID4:
        """Create a new request and return request_id"""

    @abstractmethod
    async def get_request(self, request_id: UUID4) -> RequestState:
        """Get a request by ID"""

    @abstractmethod
    async def update_request(self, request_state: RequestState) -> None:
        """Update a request state"""
sk_agents.stateful.StateManager.create_task abstractmethod async
create_task(
    session_id: UUID4 | None, user_id: str
) -> tuple[UUID4, UUID4]

Create a new task and return session_id and task_id

Source code in src/sk_agents/stateful.py
@abstractmethod
async def create_task(self, session_id: UUID4 | None, user_id: str) -> tuple[UUID4, UUID4]:
    """Create a new task and return session_id and task_id"""
sk_agents.stateful.StateManager.get_task abstractmethod async
get_task(task_id: UUID4) -> TaskState

Get a task by ID

Source code in src/sk_agents/stateful.py
@abstractmethod
async def get_task(self, task_id: UUID4) -> TaskState:
    """Get a task by ID"""
sk_agents.stateful.StateManager.update_task abstractmethod async
update_task(task_state: TaskState) -> None

Update a task state

Source code in src/sk_agents/stateful.py
@abstractmethod
async def update_task(self, task_state: TaskState) -> None:
    """Update a task state"""
sk_agents.stateful.StateManager.create_request abstractmethod async
create_request(task_id: UUID4) -> UUID4

Create a new request and return request_id

Source code in src/sk_agents/stateful.py
@abstractmethod
async def create_request(self, task_id: UUID4) -> UUID4:
    """Create a new request and return request_id"""
sk_agents.stateful.StateManager.get_request abstractmethod async
get_request(request_id: UUID4) -> RequestState

Get a request by ID

Source code in src/sk_agents/stateful.py
@abstractmethod
async def get_request(self, request_id: UUID4) -> RequestState:
    """Get a request by ID"""
sk_agents.stateful.StateManager.update_request abstractmethod async
update_request(request_state: RequestState) -> None

Update a request state

Source code in src/sk_agents/stateful.py
@abstractmethod
async def update_request(self, request_state: RequestState) -> None:
    """Update a request state"""
sk_agents.stateful.InMemoryStateManager

Bases: StateManager

In-memory implementation of state manager

Source code in src/sk_agents/stateful.py
class InMemoryStateManager(StateManager):
    """In-memory implementation of state manager"""

    def __init__(self):
        self.tasks: dict[UUID4, TaskState] = {}
        self.requests: dict[UUID4, RequestState] = {}

    async def create_task(self, session_id: UUID4 | None, user_id: str) -> tuple[UUID4, UUID4]:
        session_id = session_id or uuid.uuid4()
        task_id = uuid.uuid4()
        self.tasks[task_id] = TaskState(
            task_id=task_id, session_id=session_id, user_id=user_id, messages=[]
        )
        return session_id, task_id

    async def get_task(self, task_id: UUID4) -> TaskState:
        if task_id not in self.tasks:
            raise ValueError(f"Task not found: {task_id}")
        return self.tasks[task_id]

    async def update_task(self, task_state: TaskState) -> None:
        task_state.updated_at = datetime.utcnow()
        self.tasks[task_state.task_id] = task_state

    async def create_request(self, task_id: UUID4) -> UUID4:
        request_id = uuid.uuid4()
        self.requests[request_id] = RequestState(request_id=request_id, task_id=task_id)
        return request_id

    async def get_request(self, request_id: UUID4) -> RequestState:
        if request_id not in self.requests:
            raise ValueError(f"Request not found: {request_id}")
        return self.requests[request_id]

    async def update_request(self, request_state: RequestState) -> None:
        request_state.updated_at = datetime.utcnow()
        self.requests[request_state.request_id] = request_state
sk_agents.stateful.RedisStateManager

Bases: StateManager

Redis implementation of state manager

Source code in src/sk_agents/stateful.py
class RedisStateManager(StateManager):
    """Redis implementation of state manager"""

    def __init__(self, redis_client: Redis, ttl: int | None = None):
        self.redis = redis_client
        self.ttl = ttl  # Time-to-live in seconds

    async def create_task(self, session_id: UUID4 | None, user_id: str) -> tuple[UUID4, UUID4]:
        session_id = session_id or uuid.uuid4()
        task_id = uuid.uuid4()
        task_state = TaskState(task_id=task_id, session_id=session_id, user_id=user_id, messages=[])
        await self._set_task(task_state)
        return session_id, task_id

    async def get_task(self, task_id: UUID4) -> TaskState:
        key = f"task:{task_id}"
        data = await self.redis.get(key)
        if not data:
            raise ValueError(f"Task not found: {task_id}")
        return TaskState.parse_raw(data)

    async def update_task(self, task_state: TaskState) -> None:
        task_state.updated_at = datetime.utcnow()
        await self._set_task(task_state)

    async def _set_task(self, task_state: TaskState) -> None:
        key = f"task:{task_state.task_id}"
        await self.redis.set(key, task_state.json(), ex=self.ttl)

    async def create_request(self, task_id: UUID4) -> UUID4:
        request_id = uuid.uuid4()
        request_state = RequestState(request_id=request_id, task_id=task_id)
        await self._set_request(request_state)
        return request_id

    async def get_request(self, request_id: UUID4) -> RequestState:
        key = f"request:{request_id}"
        data = await self.redis.get(key)
        if not data:
            raise ValueError(f"Request not found: {request_id}")
        return RequestState.parse_raw(data)

    async def update_request(self, request_state: RequestState) -> None:
        request_state.updated_at = datetime.utcnow()
        await self._set_request(request_state)

    async def _set_request(self, request_state: RequestState) -> None:
        key = f"request:{request_state.request_id}"
        await self.redis.set(key, request_state.json(), ex=self.ttl)
sk_agents.stateful.AuthenticationManager

Bases: ABC

Abstract base class for authentication management

Source code in src/sk_agents/stateful.py
class AuthenticationManager(ABC):
    """Abstract base class for authentication management"""

    @abstractmethod
    async def authenticate(self, token: str) -> str:
        """Authenticate a token and return the user ID"""
        pass

    @abstractmethod
    async def validate_task_access(self, task_id: UUID4, user_id: str) -> bool:
        """Validate if a user has access to a task"""
        pass
sk_agents.stateful.AuthenticationManager.authenticate abstractmethod async
authenticate(token: str) -> str

Authenticate a token and return the user ID

Source code in src/sk_agents/stateful.py
@abstractmethod
async def authenticate(self, token: str) -> str:
    """Authenticate a token and return the user ID"""
    pass
sk_agents.stateful.AuthenticationManager.validate_task_access abstractmethod async
validate_task_access(task_id: UUID4, user_id: str) -> bool

Validate if a user has access to a task

Source code in src/sk_agents/stateful.py
@abstractmethod
async def validate_task_access(self, task_id: UUID4, user_id: str) -> bool:
    """Validate if a user has access to a task"""
    pass
sk_agents.stateful.MockAuthenticationManager

Bases: AuthenticationManager

Mock implementation of authentication manager for development

Source code in src/sk_agents/stateful.py
class MockAuthenticationManager(AuthenticationManager):
    """Mock implementation of authentication manager for development"""

    async def authenticate(self, token: str) -> str:
        # In mock implementation, just return the token as the user ID
        # In real implementation, this would validate the token with Entra ID
        return token or "anonymous-user"

    async def validate_task_access(self, task_id: UUID4, user_id: str) -> bool:
        # In mock implementation, always return True
        # In real implementation, this would check if the user owns the task
        return True
sk_agents.tealagents
sk_agents.tealagents.models
sk_agents.tealagents.models.TaskStatus

Bases: Enum

Enum representing the status of a task

Source code in src/sk_agents/tealagents/models.py
class TaskStatus(Enum):
    """Enum representing the status of a task"""

    RUNNING = "Running"
    PAUSED = "Paused"
    COMPLETED = "Completed"
    FAILED = "Failed"
sk_agents.tealagents.v1alpha1
sk_agents.tealagents.v1alpha1.agent
sk_agents.tealagents.v1alpha1.agent.handler
sk_agents.tealagents.v1alpha1.agent.handler.TealAgentsV1Alpha1Handler

Bases: BaseHandler

Source code in src/sk_agents/tealagents/v1alpha1/agent/handler.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
class TealAgentsV1Alpha1Handler(BaseHandler):
    def __init__(self, config: BaseConfig, agent_builder: AgentBuilder):
        self.version = config.version
        self.name = config.name
        if hasattr(config, "spec"):
            self.config = Config(config=config)
        else:
            raise ValueError("Invalid config")
        self.agent_builder = agent_builder
        self.state = InMemoryPersistenceManager()
        self.authorizer = DummyAuthorizer()

    @staticmethod
    async def _invoke_function(kernel, fc_content: FunctionCallContent) -> FunctionResultContent:
        """Helper to execute a single tool function call."""
        function = kernel.get_function(
            fc_content.plugin_name,
            fc_content.function_name,
        )
        function_result = await function(kernel, fc_content.to_kernel_arguments())
        return FunctionResultContent.from_function_call_content_and_result(
            fc_content, function_result
        )

    @staticmethod
    def _augment_with_user_context(inputs: UserMessage, chat_history: ChatHistory) -> None:
        if inputs.user_context:
            content = "The following user context was provided:\n"
            for key, value in inputs.user_context.items():
                content += f"  {key}: {value}\n"
            chat_history.add_message(
                ChatMessageContent(role=AuthorRole.USER, items=[TextContent(text=content)])
            )

    @staticmethod
    def _configure_agent_task(
        session_id: str,
        user_id: str,
        task_id: str,
        role: Literal["user", "assistant"],
        request_id: str,
        inputs: UserMessage,
        status: Literal["Running", "Paused", "Completed", "Failed", "Canceled"],
    ) -> AgentTask:
        agent_items = []
        for item in inputs.items:
            task_item = AgentTaskItem(
                task_id=task_id, role=role, item=item, request_id=request_id, updated=datetime.now()
            )
            agent_items.append(task_item)

        agent_task = AgentTask(
            task_id=task_id,
            session_id=session_id,
            user_id=user_id,
            items=agent_items,
            created_at=datetime.now(),
            last_updated=datetime.now(),
            status=status,
        )
        return agent_task

    async def authenticate_user(self, token: str) -> str:
        try:
            user_id = await self.authorizer.authorize_request(auth_header=token)
            return user_id
        except Exception as e:
            raise AuthenticationException(
                message=(f"Unable to authenticate user, exception message: {e}")
            ) from e

    @staticmethod
    def handle_state_id(inputs: UserMessage) -> tuple[str, str, str]:
        if inputs.session_id:
            session_id = inputs.session_id
        else:
            session_id = str(uuid.uuid4().hex)

        if inputs.task_id:
            task_id = inputs.task_id
        else:
            task_id = str(uuid.uuid4().hex)

        request_id = str(uuid.uuid4().hex)

        return session_id, task_id, request_id

    async def _manage_incoming_task(
        self, task_id: str, session_id: str, user_id: str, request_id: str, inputs: UserMessage
    ) -> AgentTask | None:
        try:
            agent_task = await self.state.load(task_id)
            if not agent_task:
                agent_task = TealAgentsV1Alpha1Handler._configure_agent_task(
                    session_id=session_id,
                    user_id=user_id,
                    task_id=task_id,
                    role="user",
                    request_id=request_id,
                    inputs=inputs,
                    status="Running",
                )
                await self.state.create(agent_task)
                return agent_task
        except Exception as e:
            raise Exception(f"Unexpected error ocurred while managing incoming task: {e}") from e

    async def _manage_agent_response_task(
        self, agent_task: AgentTask, agent_response: TealAgentsResponse
    ) -> None:
        new_item = AgentTaskItem(
            task_id=agent_response.task_id,
            role="assistant",
            item=MultiModalItem(content_type=ContentType.TEXT, content=agent_response.output),
            request_id=agent_response.request_id,
            updated=datetime.now(),
        )
        agent_task.items.append(new_item)
        agent_task.last_updated = datetime.now()
        await self.state.update(agent_task)

    @staticmethod
    def _validate_user_id(user_id: str, task_id: str, agent_task: AgentTask) -> None:
        try:
            assert user_id == agent_task.user_id
        except AssertionError as e:
            raise AgentInvokeException(
                message=(f"Invalid user ID {user_id}and task ID {task_id} provided. {e}")
            ) from e

    @staticmethod
    def _build_chat_history(agent_task: AgentTask, chat_history: ChatHistory) -> ChatHistory:
        chat_message_items: list[TextContent | ImageContent] = []
        for task_item in agent_task.items:
            chat_message_items.append(item_to_content(task_item.item))
            message_content = ChatMessageContent(role=task_item.role, items=chat_message_items)
            chat_history.add_message(message_content)
        return chat_history

    @staticmethod
    def _rejected_task_item(task_id: str, request_id: str) -> AgentTaskItem:
        return AgentTaskItem(
            task_id=task_id,
            role="user",
            item=MultiModalItem(content_type=ContentType.TEXT, content="tool execution rejected"),
            request_id=request_id,
            updated=datetime.now(),
        )

    @staticmethod
    def _approved_task_item(task_id: str, request_id: str) -> AgentTaskItem:
        return AgentTaskItem(
            task_id=task_id,
            role="user",
            item=MultiModalItem(content_type=ContentType.TEXT, content="tool execution approved"),
            request_id=request_id,
            updated=datetime.now(),
        )

    async def _manage_hitl_exception(
        self,
        agent_task: AgentTask,
        session_id: str,
        task_id: str,
        request_id: str,
        function_calls: list,
        chat_history: ChatHistory,
    ):
        agent_task.status = "Paused"
        assistant_item = AgentTaskItem(
            task_id=task_id,
            role="assistant",
            item=MultiModalItem(
                content_type=ContentType.TEXT, content="HITL intervention required."
            ),
            request_id=request_id,
            updated=datetime.now(),
            pending_tool_calls=[fc.model_dump() for fc in function_calls],
            chat_history=chat_history,
        )
        agent_task.items.append(assistant_item)
        agent_task.last_updated = datetime.now()
        await self.state.update(agent_task)

        base_url = "/tealagents/v1alpha1/resume"
        approval_url = f"{base_url}/{request_id}?action=approve"
        rejection_url = f"{base_url}/{request_id}?action=reject"

        hitl_response = HitlResponse(
            session_id=session_id,
            task_id=task_id,
            request_id=request_id,
            tool_calls=[fc.model_dump() for fc in function_calls],
            approval_url=approval_url,
            rejection_url=rejection_url,
        )
        return hitl_response

    @staticmethod
    async def _manage_function_calls(
        function_calls: list[FunctionCallContent], chat_history: ChatHistory, kernel: Kernel
    ) -> None:
        intervention_calls = []
        non_intervention_calls = []

        # Separate function calls into intervention and non-intervention
        for fc in function_calls:
            if hitl_manager.check_for_intervention(fc):
                intervention_calls.append(fc)
            else:
                non_intervention_calls.append(fc)

        # Process non-intervention function calls first
        if non_intervention_calls:
            results = await asyncio.gather(
                *[
                    TealAgentsV1Alpha1Handler._invoke_function(kernel, fc)
                    for fc in non_intervention_calls
                ]
            )

            # Add results to history
            for result in results:
                chat_history.add_message(result.to_chat_message_content())

        # Handle intervention function calls
        if intervention_calls:
            logger.info(f"Intervention required for{len(intervention_calls)} function calls.")
            raise hitl_manager.HitlInterventionRequired(intervention_calls)

    async def prepare_agent_response(
        self,
        agent_task: AgentTask,
        request_id: str,
        response: ChatMessageContent | list[str],
        token_usage: TokenUsage,
        extra_data_collector: ExtraDataCollector,
    ):
        if isinstance(response, list):
            agent_output = "".join(response)
        else:
            agent_output = response.content

        total_tokens = token_usage.total_tokens
        session_id = agent_task.session_id
        task_id = agent_task.task_id
        request_id = request_id

        agent_response = TealAgentsResponse(
            session_id=session_id,
            task_id=task_id,
            request_id=request_id,
            output=agent_output,
            source=f"{self.name}:{self.version}",
            token_usage=token_usage,
            extra_data=extra_data_collector.get_extra_data(),
        )
        await self._manage_agent_response_task(agent_task, agent_response)
        logger.info(
            f"{self.name}:{self.version}"
            f"successful invocation with {total_tokens} tokens. "
            f"Session ID: {session_id}, Task ID: {task_id},"
            f"Request ID {request_id}"
        )
        return agent_response

    async def resume_task(
        self, auth_token: str, request_id: str, action_status: ResumeRequest, stream: bool
    ) -> (
        TealAgentsResponse
        | RejectedToolResponse
        | HitlResponse
        | AsyncIterable[TealAgentsResponse | TealAgentsPartialResponse | HitlResponse]
    ):
        user_id = await self.authenticate_user(token=auth_token)
        agent_task = await self.state.load_by_request_id(request_id)
        if agent_task is None:
            raise AgentInvokeException(f"No agent task found for request ID: {request_id}")
        session_id = agent_task.session_id
        task_id = agent_task.task_id
        chat_history = agent_task.items[-1].chat_history
        if chat_history is None:
            raise AgentInvokeException(f"Chat history not found for request ID: {request_id}")

        TealAgentsV1Alpha1Handler._validate_user_id(user_id, task_id, agent_task)
        try:
            assert agent_task.status == "Paused"
        except Exception as e:
            raise Exception(f"Agent in resume request is not in Paused state: {e}") from e

        if action_status.action == "reject":
            agent_task.status = "Canceled"
            agent_task.items.append(
                TealAgentsV1Alpha1Handler._rejected_task_item(
                    task_id=task_id, request_id=request_id
                )
            )
            agent_task.last_updated = datetime.now()
            await self.state.update(agent_task)
            return RejectedToolResponse(
                task_id=task_id, session_id=agent_task.session_id, request_id=request_id
            )
        # Record Approval state
        agent_task.status = "Running"
        agent_task.items.append(
            TealAgentsV1Alpha1Handler._approved_task_item(
                task_id=agent_task.task_id, request_id=request_id
            )
        )
        agent_task.last_updated = datetime.now()
        await self.state.update(agent_task)

        # Retrieve the pending_tool_calls from the last
        # AgentTaskItem before approval/rejection item
        tool_calls_in_task_items = agent_task.items[-2].pending_tool_calls
        if tool_calls_in_task_items is None:
            raise AgentInvokeException(f"Pending tool calls no found for request ID: {request_id}")
        _pending_tools = list(tool_calls_in_task_items)  # [fc for fc in tool_calls_in_task_items]
        pending_tools = [FunctionCallContent(**function_call) for function_call in _pending_tools]

        # Execute the tool calls using asyncio.gather(),
        # just as the agent would have.
        extra_data_collector = ExtraDataCollector()
        agent = self.agent_builder.build_agent(self.config.get_agent(), extra_data_collector)
        kernel = agent.agent.kernel

        # Create ToolContent objects from the results
        results = await asyncio.gather(
            *[TealAgentsV1Alpha1Handler._invoke_function(kernel, fc) for fc in pending_tools]
        )
        # Add results to chat history
        for result in results:
            chat_history.add_message(result.to_chat_message_content())

        if stream:
            final_response_stream = self.recursion_invoke_stream(
                chat_history, session_id, task_id, request_id
            )
            return final_response_stream
        else:
            final_response_invoke = await self.recursion_invoke(
                inputs=chat_history, session_id=session_id, request_id=request_id, task_id=task_id
            )

            return final_response_invoke

    async def invoke(
        self, auth_token: str, inputs: UserMessage
    ) -> TealAgentsResponse | HitlResponse:
        # Initial setup
        user_id = await self.authenticate_user(token=auth_token)
        state_ids = TealAgentsV1Alpha1Handler.handle_state_id(inputs)
        session_id, task_id, request_id = state_ids
        agent_task = await self._manage_incoming_task(
            task_id, session_id, user_id, request_id, inputs
        )
        if agent_task is None:
            raise AgentInvokeException("Agent task not created")
        # Check user_id match request and state
        TealAgentsV1Alpha1Handler._validate_user_id(user_id, task_id, agent_task)

        chat_history = ChatHistory()
        TealAgentsV1Alpha1Handler._augment_with_user_context(
            inputs=inputs, chat_history=chat_history
        )
        TealAgentsV1Alpha1Handler._build_chat_history(agent_task, chat_history)

        final_response_invoke = await self.recursion_invoke(
            inputs=chat_history, session_id=session_id, request_id=request_id, task_id=task_id
        )

        return final_response_invoke

    async def invoke_stream(
        self, auth_token: str, inputs: UserMessage
    ) -> AsyncIterable[TealAgentsResponse | TealAgentsPartialResponse | HitlResponse]:
        # Initial setup
        user_id = await self.authenticate_user(token=auth_token)
        state_ids = TealAgentsV1Alpha1Handler.handle_state_id(inputs)
        session_id, task_id, request_id = state_ids
        agent_task = await self._manage_incoming_task(
            task_id, session_id, user_id, request_id, inputs
        )
        if agent_task is None:
            raise AgentInvokeException("Agent task not created")
        # Check user_id match request and state
        TealAgentsV1Alpha1Handler._validate_user_id(user_id, task_id, agent_task)

        chat_history = ChatHistory()
        TealAgentsV1Alpha1Handler._augment_with_user_context(
            inputs=inputs, chat_history=chat_history
        )
        TealAgentsV1Alpha1Handler._build_chat_history(agent_task, chat_history)
        final_response_stream = self.recursion_invoke_stream(
            chat_history, session_id, task_id, request_id
        )
        return final_response_stream

    async def recursion_invoke(
        self, inputs: ChatHistory, session_id: str, task_id: str, request_id: str
    ) -> TealAgentsResponse | HitlResponse:
        # Initial setup

        chat_history = inputs
        agent_task = await self.state.load_by_request_id(request_id)
        if not agent_task:
            raise PersistenceLoadError(f"Agent task with ID {task_id} not found in state.")

        extra_data_collector = ExtraDataCollector()
        agent = self.agent_builder.build_agent(self.config.get_agent(), extra_data_collector)

        # Prepare metadata
        completion_tokens: int = 0
        prompt_tokens: int = 0
        total_tokens: int = 0

        try:
            # Manual tool calling implementation (existing logic)
            kernel = agent.agent.kernel
            arguments = agent.agent.arguments
            chat_completion_service, settings = kernel.select_ai_service(
                arguments=arguments, type=ChatCompletionClientBase
            )

            assert isinstance(chat_completion_service, ChatCompletionClientBase)

            # Initial call to the LLM
            response_list = []
            responses = await chat_completion_service.get_chat_message_contents(
                chat_history=chat_history,
                settings=settings,
                kernel=kernel,
                arguments=arguments,
            )
            for response_chunk in responses:
                # response_list.extend(response_chunk)
                chat_history.add_message(response_chunk)
                response_list.append(response_chunk)

            function_calls = []
            final_response = None

            # Separate content and tool calls
            for response in response_list:
                # Update token usage
                call_usage = get_token_usage_for_response(agent.get_model_type(), response)
                completion_tokens += call_usage.completion_tokens
                prompt_tokens += call_usage.prompt_tokens
                total_tokens += call_usage.total_tokens

                # A response may have multiple items, e.g., multiple tool calls
                fc_in_response = [
                    item for item in response.items if isinstance(item, FunctionCallContent)
                ]

                if fc_in_response:
                    # chat_history.add_message(response)
                    # Add assistant's message to history
                    function_calls.extend(fc_in_response)
                else:
                    # If no function calls, it's a direct answer
                    final_response = response
            token_usage = TokenUsage(
                completion_tokens=completion_tokens,
                prompt_tokens=prompt_tokens,
                total_tokens=total_tokens,
            )
            # If tool calls were returned, execute them
            if function_calls:
                await self._manage_function_calls(function_calls, chat_history, kernel)

                # Make a recursive call to get the final response from the LLM
                recursive_response = await self.recursion_invoke(
                    inputs=chat_history,
                    session_id=session_id,
                    task_id=task_id,
                    request_id=request_id,
                )
                return recursive_response

            # No tool calls, return the direct response
            if final_response is None:
                logger.exception("No response received from LLM")
                raise AgentInvokeException("No response received from LLM")
        except hitl_manager.HitlInterventionRequired as hitl_exc:
            return await self._manage_hitl_exception(
                agent_task, session_id, task_id, request_id, hitl_exc.function_calls, chat_history
            )

        except Exception as e:
            logger.exception(
                f"Error invoking {self.name}:{self.version}"
                f"for Session ID {session_id}, Task ID {task_id},"
                f"Request ID {request_id}, Error message: {str(e)}",
                exc_info=True,
            )
            raise AgentInvokeException(
                f"Error invoking {self.name}:{self.version}"
                f"for Session ID {session_id}, Task ID {task_id},"
                f" Request ID {request_id}, Error message: {str(e)}"
            ) from e

        # Persist and return response
        return await self.prepare_agent_response(
            agent_task, request_id, final_response, token_usage, extra_data_collector
        )

    async def recursion_invoke_stream(
        self, inputs: ChatHistory, session_id: str, task_id: str, request_id: str
    ) -> AsyncIterable[TealAgentsResponse | TealAgentsPartialResponse | HitlResponse]:
        chat_history = inputs
        agent_task = await self.state.load_by_request_id(request_id)
        if not agent_task:
            raise PersistenceLoadError(f"Agent task with ID {task_id} not found in state.")

        extra_data_collector = ExtraDataCollector()
        agent = self.agent_builder.build_agent(self.config.get_agent(), extra_data_collector)

        # Prepare metadata
        final_response = []
        completion_tokens: int = 0
        prompt_tokens: int = 0
        total_tokens: int = 0

        try:
            kernel = agent.agent.kernel
            arguments = agent.agent.arguments
            kernel_configs = kernel.select_ai_service(
                arguments=arguments, type=ChatCompletionClientBase
            )
            chat_completion_service, settings = kernel_configs
            assert isinstance(chat_completion_service, ChatCompletionClientBase)

            all_responses = []
            # Stream the initial response from the LLM
            response_list = []
            responses = await chat_completion_service.get_chat_message_contents(
                chat_history=chat_history,
                settings=settings,
                kernel=kernel,
                arguments=arguments,
            )
            for response_chunk in responses:
                chat_history.add_message(response_chunk)
                response_list.append(response_chunk)

            for response in response_list:
                all_responses.append(response)
                # Calculate usage metrics
                call_usage = get_token_usage_for_response(agent.get_model_type(), response)
                completion_tokens += call_usage.completion_tokens
                prompt_tokens += call_usage.prompt_tokens
                total_tokens += call_usage.total_tokens

                if response.content:
                    try:
                        # Attempt to parse as ExtraDataPartial
                        extra_data_partial: ExtraDataPartial = ExtraDataPartial.new_from_json(
                            response.content
                        )
                        extra_data_collector.add_extra_data_items(extra_data_partial.extra_data)
                    except Exception:
                        if len(response.content) > 0:
                            # Handle and return partial response
                            final_response.append(response.content)
                            yield TealAgentsPartialResponse(
                                session_id=session_id,
                                task_id=task_id,
                                request_id=request_id,
                                output_partial=response.content,
                                source=f"{self.name}:{self.version}",
                            )

            token_usage = TokenUsage(
                completion_tokens=completion_tokens,
                prompt_tokens=prompt_tokens,
                total_tokens=total_tokens,
            )
            # Aggregate the full response to check for tool calls
            if not all_responses:
                return

            full_completion: StreamingChatMessageContent = reduce(lambda x, y: x + y, all_responses)
            function_calls = [
                item for item in full_completion.items if isinstance(item, FunctionCallContent)
            ]

            # If tool calls are present, execute them
            if function_calls:
                await self._manage_function_calls(function_calls, chat_history, kernel)
                # Make a recursive call to get the final streamed response
                async for final_response_chunk in self.recursion_invoke_stream(
                    chat_history, session_id, task_id, request_id
                ):
                    yield final_response_chunk
                return
        except hitl_manager.HitlInterventionRequired as hitl_exc:
            yield await self._manage_hitl_exception(
                agent_task, session_id, task_id, request_id, hitl_exc.function_calls, chat_history
            )
            return

        except Exception as e:
            logger.exception(
                f"Error invoking stream for {self.name}:{self.version} "
                f"for Session ID {session_id}, Task ID {task_id},"
                f" Request ID {request_id}, Error message: {str(e)}",
                exc_info=True,
            )
            raise AgentInvokeException(
                f"Error invoking stream for {self.name}:{self.version}"
                f"for Session ID {session_id}, Task ID {task_id},"
                f"Request ID {request_id}, Error message: {str(e)}"
            ) from e

        # # Persist and return response
        yield await self.prepare_agent_response(
            agent_task, request_id, final_response, token_usage, extra_data_collector
        )