From 0d68eef3a8de05cd16afadb7ecf88b87a478bc6d Mon Sep 17 00:00:00 2001 From: Brett Cannon Date: Mon, 16 Mar 2026 16:44:20 -0700 Subject: [PATCH 1/8] Remove `copilot.types` Along the way, simplify `copilot.__init__` to only export the high-level API. --- docs/auth/byok.md | 2 +- docs/features/custom-agents.md | 2 +- docs/features/image-input.md | 2 +- docs/features/skills.md | 2 +- docs/features/steering-and-queueing.md | 4 +- docs/getting-started.md | 15 +- docs/hooks/error-handling.md | 2 +- docs/hooks/post-tool-use.md | 2 +- docs/hooks/pre-tool-use.md | 2 +- docs/hooks/session-lifecycle.md | 4 +- docs/hooks/user-prompt-submitted.md | 2 +- docs/setup/azure-managed-identity.md | 6 +- python/README.md | 3 +- python/copilot/__init__.py | 64 +- python/copilot/client.py | 637 ++++++++- python/copilot/session.py | 545 +++++++- python/copilot/tools.py | 53 +- python/copilot/types.py | 1155 ----------------- python/e2e/test_agent_and_compact_rpc.py | 4 +- python/e2e/test_ask_user.py | 2 +- python/e2e/test_client.py | 4 +- python/e2e/test_compaction.py | 2 +- python/e2e/test_hooks.py | 2 +- python/e2e/test_mcp_and_agents.py | 2 +- python/e2e/test_multi_client.py | 13 +- python/e2e/test_permissions.py | 2 +- python/e2e/test_rpc.py | 4 +- python/e2e/test_session.py | 6 +- python/e2e/test_skills.py | 2 +- python/e2e/test_streaming_fidelity.py | 4 +- python/e2e/test_tools.py | 9 +- python/e2e/test_tools_unit.py | 4 +- python/e2e/testharness/context.py | 3 +- python/samples/chat.py | 3 +- python/test_client.py | 13 +- python/test_telemetry.py | 2 +- .../auth/byok-anthropic/python/main.py | 3 +- test/scenarios/auth/byok-azure/python/main.py | 3 +- .../scenarios/auth/byok-ollama/python/main.py | 3 +- .../scenarios/auth/byok-openai/python/main.py | 3 +- test/scenarios/auth/gh-app/python/main.py | 3 +- .../app-backend-to-server/python/main.py | 3 +- .../bundling/app-direct-server/python/main.py | 3 +- .../bundling/container-proxy/python/main.py | 3 +- .../bundling/fully-bundled/python/main.py | 3 +- test/scenarios/callbacks/hooks/python/main.py | 3 +- .../callbacks/permissions/python/main.py | 3 +- .../callbacks/user-input/python/main.py | 3 +- test/scenarios/modes/default/python/main.py | 3 +- test/scenarios/modes/minimal/python/main.py | 3 +- .../prompts/attachments/python/main.py | 3 +- .../prompts/reasoning-effort/python/main.py | 3 +- .../prompts/system-message/python/main.py | 3 +- .../concurrent-sessions/python/main.py | 3 +- .../sessions/infinite-sessions/python/main.py | 3 +- .../sessions/session-resume/python/main.py | 3 +- .../sessions/streaming/python/main.py | 3 +- .../tools/custom-agents/python/main.py | 3 +- .../tools/mcp-servers/python/main.py | 3 +- test/scenarios/tools/no-tools/python/main.py | 3 +- test/scenarios/tools/skills/python/main.py | 3 +- .../tools/tool-filtering/python/main.py | 3 +- .../tools/tool-overrides/python/main.py | 4 +- .../tools/virtual-filesystem/python/main.py | 3 +- .../transport/reconnect/python/main.py | 3 +- test/scenarios/transport/stdio/python/main.py | 3 +- test/scenarios/transport/tcp/python/main.py | 3 +- 67 files changed, 1318 insertions(+), 1359 deletions(-) delete mode 100644 python/copilot/types.py diff --git a/docs/auth/byok.md b/docs/auth/byok.md index df334508d..cdc2f4b99 100644 --- a/docs/auth/byok.md +++ b/docs/auth/byok.md @@ -338,7 +338,7 @@ const client = new CopilotClient({ ```python from copilot import CopilotClient -from copilot.types import ModelInfo, ModelCapabilities, ModelSupports, ModelLimits +from copilot.client import ModelInfo, ModelCapabilities, ModelSupports, ModelLimits client = CopilotClient({ "on_list_models": lambda: [ diff --git a/docs/features/custom-agents.md b/docs/features/custom-agents.md index f9c1a3734..49934de2a 100644 --- a/docs/features/custom-agents.md +++ b/docs/features/custom-agents.md @@ -65,7 +65,7 @@ const session = await client.createSession({ ```python from copilot import CopilotClient -from copilot.types import PermissionRequestResult +from copilot.session import PermissionRequestResult client = CopilotClient() await client.start() diff --git a/docs/features/image-input.md b/docs/features/image-input.md index aa3bf2f64..c4a4ca0f2 100644 --- a/docs/features/image-input.md +++ b/docs/features/image-input.md @@ -65,7 +65,7 @@ await session.send({ ```python from copilot import CopilotClient -from copilot.types import PermissionRequestResult +from copilot.session import PermissionRequestResult client = CopilotClient() await client.start() diff --git a/docs/features/skills.md b/docs/features/skills.md index 1d584ced1..8daf0be23 100644 --- a/docs/features/skills.md +++ b/docs/features/skills.md @@ -43,7 +43,7 @@ await session.sendAndWait({ prompt: "Review this code for security issues" }); ```python from copilot import CopilotClient -from copilot.types import PermissionRequestResult +from copilot.session import PermissionRequestResult async def main(): client = CopilotClient() diff --git a/docs/features/steering-and-queueing.md b/docs/features/steering-and-queueing.md index ad27c4ee0..8c90f0441 100644 --- a/docs/features/steering-and-queueing.md +++ b/docs/features/steering-and-queueing.md @@ -70,7 +70,7 @@ await session.send({ ```python from copilot import CopilotClient -from copilot.types import PermissionRequestResult +from copilot.session import PermissionRequestResult async def main(): client = CopilotClient() @@ -229,7 +229,7 @@ await session.send({ ```python from copilot import CopilotClient -from copilot.types import PermissionRequestResult +from copilot.session import PermissionRequestResult async def main(): client = CopilotClient() diff --git a/docs/getting-started.md b/docs/getting-started.md index 15f11e8b7..71c5cfd6d 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -129,7 +129,8 @@ Create `main.py`: ```python import asyncio -from copilot import CopilotClient, PermissionHandler +from copilot import CopilotClient +from copilot.session import PermissionHandler async def main(): client = CopilotClient() @@ -277,7 +278,8 @@ Update `main.py`: ```python import asyncio import sys -from copilot import CopilotClient, PermissionHandler +from copilot import CopilotClient +from copilot.session import PermissionHandler from copilot.generated.session_events import SessionEventType async def main(): @@ -657,7 +659,8 @@ Update `main.py`: import asyncio import random import sys -from copilot import CopilotClient, PermissionHandler +from copilot import CopilotClient +from copilot.session import PermissionHandler from copilot.tools import define_tool from copilot.generated.session_events import SessionEventType from pydantic import BaseModel, Field @@ -930,7 +933,8 @@ Create `weather_assistant.py`: import asyncio import random import sys -from copilot import CopilotClient, PermissionHandler +from copilot import CopilotClient +from copilot.session import PermissionHandler from copilot.tools import define_tool from copilot.generated.session_events import SessionEventType from pydantic import BaseModel, Field @@ -1306,7 +1310,8 @@ const session = await client.createSession({ onPermissionRequest: approveAll }); Python ```python -from copilot import CopilotClient, PermissionHandler +from copilot import CopilotClient +from copilot.session import PermissionHandler client = CopilotClient({ "cli_url": "localhost:4321" diff --git a/docs/hooks/error-handling.md b/docs/hooks/error-handling.md index 2e7848bc5..3a96b8225 100644 --- a/docs/hooks/error-handling.md +++ b/docs/hooks/error-handling.md @@ -35,7 +35,7 @@ type ErrorOccurredHandler = ( ```python -from copilot.types import ErrorOccurredHookInput, HookInvocation, ErrorOccurredHookOutput +from copilot.session import ErrorOccurredHookInput, ErrorOccurredHookOutput from typing import Callable, Awaitable ErrorOccurredHandler = Callable[ diff --git a/docs/hooks/post-tool-use.md b/docs/hooks/post-tool-use.md index 415acce9e..7bcfbaf87 100644 --- a/docs/hooks/post-tool-use.md +++ b/docs/hooks/post-tool-use.md @@ -35,7 +35,7 @@ type PostToolUseHandler = ( ```python -from copilot.types import PostToolUseHookInput, HookInvocation, PostToolUseHookOutput +from copilot.session import PostToolUseHookInput, PostToolUseHookOutput from typing import Callable, Awaitable PostToolUseHandler = Callable[ diff --git a/docs/hooks/pre-tool-use.md b/docs/hooks/pre-tool-use.md index df194aaf3..57967ac84 100644 --- a/docs/hooks/pre-tool-use.md +++ b/docs/hooks/pre-tool-use.md @@ -35,7 +35,7 @@ type PreToolUseHandler = ( ```python -from copilot.types import PreToolUseHookInput, HookInvocation, PreToolUseHookOutput +from copilot.session import PreToolUseHookInput, PreToolUseHookOutput from typing import Callable, Awaitable PreToolUseHandler = Callable[ diff --git a/docs/hooks/session-lifecycle.md b/docs/hooks/session-lifecycle.md index 93696530e..bc5e311a5 100644 --- a/docs/hooks/session-lifecycle.md +++ b/docs/hooks/session-lifecycle.md @@ -39,7 +39,7 @@ type SessionStartHandler = ( ```python -from copilot.types import SessionStartHookInput, HookInvocation, SessionStartHookOutput +from copilot.session import SessionStartHookInput, SessionStartHookOutput from typing import Callable, Awaitable SessionStartHandler = Callable[ @@ -249,7 +249,7 @@ type SessionEndHandler = ( ```python -from copilot.types import SessionEndHookInput, HookInvocation +from copilot.session import SessionEndHookInput from typing import Callable, Awaitable SessionEndHandler = Callable[ diff --git a/docs/hooks/user-prompt-submitted.md b/docs/hooks/user-prompt-submitted.md index 370c37b8c..4e729a181 100644 --- a/docs/hooks/user-prompt-submitted.md +++ b/docs/hooks/user-prompt-submitted.md @@ -35,7 +35,7 @@ type UserPromptSubmittedHandler = ( ```python -from copilot.types import UserPromptSubmittedHookInput, HookInvocation, UserPromptSubmittedHookOutput +from copilot.session import UserPromptSubmittedHookInput, UserPromptSubmittedHookOutput from typing import Callable, Awaitable UserPromptSubmittedHandler = Callable[ diff --git a/docs/setup/azure-managed-identity.md b/docs/setup/azure-managed-identity.md index b2fa15264..b92b63b18 100644 --- a/docs/setup/azure-managed-identity.md +++ b/docs/setup/azure-managed-identity.md @@ -42,7 +42,8 @@ import asyncio import os from azure.identity import DefaultAzureCredential -from copilot import CopilotClient, ProviderConfig, SessionConfig +from copilot import CopilotClient +from copilot.session import ProviderConfig, SessionConfig COGNITIVE_SERVICES_SCOPE = "https://cognitiveservices.azure.com/.default" @@ -84,7 +85,8 @@ Bearer tokens expire (typically after ~1 hour). For servers or long-running agen ```python from azure.identity import DefaultAzureCredential -from copilot import CopilotClient, ProviderConfig, SessionConfig +from copilot import CopilotClient +from copilot.session import ProviderConfig, SessionConfig COGNITIVE_SERVICES_SCOPE = "https://cognitiveservices.azure.com/.default" diff --git a/python/README.md b/python/README.md index 6d1c81281..5addc7abd 100644 --- a/python/README.md +++ b/python/README.md @@ -205,7 +205,8 @@ session = await client.create_session({ For users who prefer manual schema definition: ```python -from copilot import CopilotClient, Tool +from copilot import CopilotClient +from copilot.tools import Tool async def lookup_issue(invocation): issue_id = invocation["arguments"]["id"] diff --git a/python/copilot/__init__.py b/python/copilot/__init__.py index c25ea4021..92764c0e8 100644 --- a/python/copilot/__init__.py +++ b/python/copilot/__init__.py @@ -4,78 +4,16 @@ JSON-RPC based SDK for programmatic control of GitHub Copilot CLI """ -from .client import CopilotClient +from .client import CopilotClient, ExternalServerConfig, SubprocessConfig from .session import CopilotSession from .tools import define_tool -from .types import ( - AzureProviderOptions, - ConnectionState, - CustomAgentConfig, - ExternalServerConfig, - GetAuthStatusResponse, - GetStatusResponse, - MCPLocalServerConfig, - MCPRemoteServerConfig, - MCPServerConfig, - ModelBilling, - ModelCapabilities, - ModelInfo, - ModelPolicy, - PermissionHandler, - PermissionRequest, - PermissionRequestResult, - PingResponse, - ProviderConfig, - ResumeSessionConfig, - SessionConfig, - SessionContext, - SessionEvent, - SessionListFilter, - SessionMetadata, - StopError, - SubprocessConfig, - TelemetryConfig, - Tool, - ToolHandler, - ToolInvocation, - ToolResult, -) __version__ = "0.1.0" __all__ = [ - "AzureProviderOptions", "CopilotClient", "CopilotSession", - "ConnectionState", - "CustomAgentConfig", "ExternalServerConfig", - "GetAuthStatusResponse", - "GetStatusResponse", - "MCPLocalServerConfig", - "MCPRemoteServerConfig", - "MCPServerConfig", - "ModelBilling", - "ModelCapabilities", - "ModelInfo", - "ModelPolicy", - "PermissionHandler", - "PermissionRequest", - "PermissionRequestResult", - "PingResponse", - "ProviderConfig", - "ResumeSessionConfig", - "SessionConfig", - "SessionContext", - "SessionEvent", - "SessionListFilter", - "SessionMetadata", - "StopError", "SubprocessConfig", - "TelemetryConfig", - "Tool", - "ToolHandler", - "ToolInvocation", - "ToolResult", "define_tool", ] diff --git a/python/copilot/client.py b/python/copilot/client.py index 0d8074fe0..87b07aeb8 100644 --- a/python/copilot/client.py +++ b/python/copilot/client.py @@ -12,6 +12,8 @@ ... await session.send("Hello!") """ +from __future__ import annotations + import asyncio import inspect import os @@ -22,36 +24,619 @@ import threading import uuid from collections.abc import Awaitable, Callable +from dataclasses import KW_ONLY, dataclass, field from pathlib import Path -from typing import Any, cast, overload +from typing import Any, Literal, TypedDict, cast, overload from .generated.rpc import ServerRpc from .generated.session_events import PermissionRequest, session_event_from_dict from .jsonrpc import JsonRpcClient, ProcessExitedError from .sdk_protocol_version import get_sdk_protocol_version -from .session import CopilotSession -from .telemetry import get_trace_context, trace_context -from .types import ( - ConnectionState, +from .session import ( + CopilotSession, CustomAgentConfig, - ExternalServerConfig, - GetAuthStatusResponse, - GetStatusResponse, - ModelInfo, - PingResponse, ProviderConfig, ResumeSessionConfig, SessionConfig, - SessionLifecycleEvent, - SessionLifecycleEventType, - SessionLifecycleHandler, - SessionListFilter, - SessionMetadata, - StopError, - SubprocessConfig, - ToolInvocation, - ToolResult, ) +from .telemetry import get_trace_context, trace_context +from .tools import ToolInvocation, ToolResult + +# ============================================================================ +# Connection Types +# ============================================================================ + +ConnectionState = Literal["disconnected", "connecting", "connected", "error"] + +LogLevel = Literal["none", "error", "warning", "info", "debug", "all"] + + +class TelemetryConfig(TypedDict, total=False): + """Configuration for OpenTelemetry integration with the Copilot CLI.""" + + otlp_endpoint: str + """OTLP HTTP endpoint URL for trace/metric export. Sets OTEL_EXPORTER_OTLP_ENDPOINT.""" + file_path: str + """File path for JSON-lines trace output. Sets COPILOT_OTEL_FILE_EXPORTER_PATH.""" + exporter_type: str + """Exporter backend type: "otlp-http" or "file". Sets COPILOT_OTEL_EXPORTER_TYPE.""" + source_name: str + """Instrumentation scope name. Sets COPILOT_OTEL_SOURCE_NAME.""" + capture_content: bool + """Whether to capture message content. Sets OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT.""" # noqa: E501 + + +@dataclass +class SubprocessConfig: + """Config for spawning a local Copilot CLI subprocess. + + Example: + >>> config = SubprocessConfig(github_token="ghp_...") + >>> client = CopilotClient(config) + + >>> # Custom CLI path with TCP transport + >>> config = SubprocessConfig( + ... cli_path="/usr/local/bin/copilot", + ... use_stdio=False, + ... log_level="debug", + ... ) + """ + + cli_path: str | None = None + """Path to the Copilot CLI executable. ``None`` uses the bundled binary.""" + + cli_args: list[str] = field(default_factory=list) + """Extra arguments passed to the CLI executable (inserted before SDK-managed args).""" + + _: KW_ONLY + + cwd: str | None = None + """Working directory for the CLI process. ``None`` uses the current directory.""" + + use_stdio: bool = True + """Use stdio transport (``True``, default) or TCP (``False``).""" + + port: int = 0 + """TCP port for the CLI server (only when ``use_stdio=False``). 0 means random.""" + + log_level: LogLevel = "info" + """Log level for the CLI process.""" + + env: dict[str, str] | None = None + """Environment variables for the CLI process. ``None`` inherits the current env.""" + + github_token: str | None = None + """GitHub token for authentication. Takes priority over other auth methods.""" + + use_logged_in_user: bool | None = None + """Use the logged-in user for authentication. + + ``None`` (default) resolves to ``True`` unless ``github_token`` is set. + """ + + telemetry: TelemetryConfig | None = None + """OpenTelemetry configuration. Providing this enables telemetry — no separate flag needed.""" + + +@dataclass +class ExternalServerConfig: + """Config for connecting to an existing Copilot CLI server over TCP. + + Example: + >>> config = ExternalServerConfig(url="localhost:3000") + >>> client = CopilotClient(config) + """ + + url: str + """Server URL. Supports ``"host:port"``, ``"http://host:port"``, or just ``"port"``.""" + + +# ============================================================================ +# Response Types +# ============================================================================ + + +@dataclass +class PingResponse: + """Response from ping""" + + message: str # Echo message with "pong: " prefix + timestamp: int # Server timestamp in milliseconds + protocolVersion: int # Protocol version for SDK compatibility + + @staticmethod + def from_dict(obj: Any) -> PingResponse: + assert isinstance(obj, dict) + message = obj.get("message") + timestamp = obj.get("timestamp") + protocolVersion = obj.get("protocolVersion") + if message is None or timestamp is None or protocolVersion is None: + raise ValueError( + f"Missing required fields in PingResponse: message={message}, " + f"timestamp={timestamp}, protocolVersion={protocolVersion}" + ) + return PingResponse(str(message), int(timestamp), int(protocolVersion)) + + def to_dict(self) -> dict: + result: dict = {} + result["message"] = self.message + result["timestamp"] = self.timestamp + result["protocolVersion"] = self.protocolVersion + return result + + +@dataclass +class StopError(Exception): + """Error that occurred during client stop cleanup.""" + + message: str # Error message describing what failed during cleanup + + def __post_init__(self) -> None: + Exception.__init__(self, self.message) + + @staticmethod + def from_dict(obj: Any) -> StopError: + assert isinstance(obj, dict) + message = obj.get("message") + if message is None: + raise ValueError("Missing required field 'message' in StopError") + return StopError(str(message)) + + def to_dict(self) -> dict: + result: dict = {} + result["message"] = self.message + return result + + +@dataclass +class GetStatusResponse: + """Response from status.get""" + + version: str # Package version (e.g., "1.0.0") + protocolVersion: int # Protocol version for SDK compatibility + + @staticmethod + def from_dict(obj: Any) -> GetStatusResponse: + assert isinstance(obj, dict) + version = obj.get("version") + protocolVersion = obj.get("protocolVersion") + if version is None or protocolVersion is None: + raise ValueError( + f"Missing required fields in GetStatusResponse: version={version}, " + f"protocolVersion={protocolVersion}" + ) + return GetStatusResponse(str(version), int(protocolVersion)) + + def to_dict(self) -> dict: + result: dict = {} + result["version"] = self.version + result["protocolVersion"] = self.protocolVersion + return result + + +@dataclass +class GetAuthStatusResponse: + """Response from auth.getStatus""" + + isAuthenticated: bool # Whether the user is authenticated + authType: str | None = None # Authentication type + host: str | None = None # GitHub host URL + login: str | None = None # User login name + statusMessage: str | None = None # Human-readable status message + + @staticmethod + def from_dict(obj: Any) -> GetAuthStatusResponse: + assert isinstance(obj, dict) + isAuthenticated = obj.get("isAuthenticated") + if isAuthenticated is None: + raise ValueError("Missing required field 'isAuthenticated' in GetAuthStatusResponse") + authType = obj.get("authType") + host = obj.get("host") + login = obj.get("login") + statusMessage = obj.get("statusMessage") + return GetAuthStatusResponse( + isAuthenticated=bool(isAuthenticated), + authType=authType, + host=host, + login=login, + statusMessage=statusMessage, + ) + + def to_dict(self) -> dict: + result: dict = {} + result["isAuthenticated"] = self.isAuthenticated + if self.authType is not None: + result["authType"] = self.authType + if self.host is not None: + result["host"] = self.host + if self.login is not None: + result["login"] = self.login + if self.statusMessage is not None: + result["statusMessage"] = self.statusMessage + return result + + +# ============================================================================ +# Model Types +# ============================================================================ + + +@dataclass +class ModelVisionLimits: + """Vision-specific limits""" + + supported_media_types: list[str] | None = None + max_prompt_images: int | None = None + max_prompt_image_size: int | None = None + + @staticmethod + def from_dict(obj: Any) -> ModelVisionLimits: + assert isinstance(obj, dict) + supported_media_types = obj.get("supported_media_types") + max_prompt_images = obj.get("max_prompt_images") + max_prompt_image_size = obj.get("max_prompt_image_size") + return ModelVisionLimits( + supported_media_types=supported_media_types, + max_prompt_images=max_prompt_images, + max_prompt_image_size=max_prompt_image_size, + ) + + def to_dict(self) -> dict: + result: dict = {} + if self.supported_media_types is not None: + result["supported_media_types"] = self.supported_media_types + if self.max_prompt_images is not None: + result["max_prompt_images"] = self.max_prompt_images + if self.max_prompt_image_size is not None: + result["max_prompt_image_size"] = self.max_prompt_image_size + return result + + +@dataclass +class ModelLimits: + """Model limits""" + + max_prompt_tokens: int | None = None + max_context_window_tokens: int | None = None + vision: ModelVisionLimits | None = None + + @staticmethod + def from_dict(obj: Any) -> ModelLimits: + assert isinstance(obj, dict) + max_prompt_tokens = obj.get("max_prompt_tokens") + max_context_window_tokens = obj.get("max_context_window_tokens") + vision_dict = obj.get("vision") + vision = ModelVisionLimits.from_dict(vision_dict) if vision_dict else None + return ModelLimits( + max_prompt_tokens=max_prompt_tokens, + max_context_window_tokens=max_context_window_tokens, + vision=vision, + ) + + def to_dict(self) -> dict: + result: dict = {} + if self.max_prompt_tokens is not None: + result["max_prompt_tokens"] = self.max_prompt_tokens + if self.max_context_window_tokens is not None: + result["max_context_window_tokens"] = self.max_context_window_tokens + if self.vision is not None: + result["vision"] = self.vision.to_dict() + return result + + +@dataclass +class ModelSupports: + """Model support flags""" + + vision: bool + reasoning_effort: bool = False # Whether this model supports reasoning effort + + @staticmethod + def from_dict(obj: Any) -> ModelSupports: + assert isinstance(obj, dict) + vision = obj.get("vision") + if vision is None: + raise ValueError("Missing required field 'vision' in ModelSupports") + reasoning_effort = obj.get("reasoningEffort", False) + return ModelSupports(vision=bool(vision), reasoning_effort=bool(reasoning_effort)) + + def to_dict(self) -> dict: + result: dict = {} + result["vision"] = self.vision + result["reasoningEffort"] = self.reasoning_effort + return result + + +@dataclass +class ModelCapabilities: + """Model capabilities and limits""" + + supports: ModelSupports + limits: ModelLimits + + @staticmethod + def from_dict(obj: Any) -> ModelCapabilities: + assert isinstance(obj, dict) + supports_dict = obj.get("supports") + limits_dict = obj.get("limits") + if supports_dict is None or limits_dict is None: + raise ValueError( + f"Missing required fields in ModelCapabilities: supports={supports_dict}, " + f"limits={limits_dict}" + ) + supports = ModelSupports.from_dict(supports_dict) + limits = ModelLimits.from_dict(limits_dict) + return ModelCapabilities(supports=supports, limits=limits) + + def to_dict(self) -> dict: + result: dict = {} + result["supports"] = self.supports.to_dict() + result["limits"] = self.limits.to_dict() + return result + + +@dataclass +class ModelPolicy: + """Model policy state""" + + state: str # "enabled", "disabled", or "unconfigured" + terms: str + + @staticmethod + def from_dict(obj: Any) -> ModelPolicy: + assert isinstance(obj, dict) + state = obj.get("state") + terms = obj.get("terms") + if state is None or terms is None: + raise ValueError( + f"Missing required fields in ModelPolicy: state={state}, terms={terms}" + ) + return ModelPolicy(state=str(state), terms=str(terms)) + + def to_dict(self) -> dict: + result: dict = {} + result["state"] = self.state + result["terms"] = self.terms + return result + + +@dataclass +class ModelBilling: + """Model billing information""" + + multiplier: float + + @staticmethod + def from_dict(obj: Any) -> ModelBilling: + assert isinstance(obj, dict) + multiplier = obj.get("multiplier") + if multiplier is None: + raise ValueError("Missing required field 'multiplier' in ModelBilling") + return ModelBilling(multiplier=float(multiplier)) + + def to_dict(self) -> dict: + result: dict = {} + result["multiplier"] = self.multiplier + return result + + +@dataclass +class ModelInfo: + """Information about an available model""" + + id: str # Model identifier (e.g., "claude-sonnet-4.5") + name: str # Display name + capabilities: ModelCapabilities # Model capabilities and limits + policy: ModelPolicy | None = None # Policy state + billing: ModelBilling | None = None # Billing information + # Supported reasoning effort levels (only present if model supports reasoning effort) + supported_reasoning_efforts: list[str] | None = None + # Default reasoning effort level (only present if model supports reasoning effort) + default_reasoning_effort: str | None = None + + @staticmethod + def from_dict(obj: Any) -> ModelInfo: + assert isinstance(obj, dict) + id = obj.get("id") + name = obj.get("name") + capabilities_dict = obj.get("capabilities") + if id is None or name is None or capabilities_dict is None: + raise ValueError( + f"Missing required fields in ModelInfo: id={id}, name={name}, " + f"capabilities={capabilities_dict}" + ) + capabilities = ModelCapabilities.from_dict(capabilities_dict) + policy_dict = obj.get("policy") + policy = ModelPolicy.from_dict(policy_dict) if policy_dict else None + billing_dict = obj.get("billing") + billing = ModelBilling.from_dict(billing_dict) if billing_dict else None + supported_reasoning_efforts = obj.get("supportedReasoningEfforts") + default_reasoning_effort = obj.get("defaultReasoningEffort") + return ModelInfo( + id=str(id), + name=str(name), + capabilities=capabilities, + policy=policy, + billing=billing, + supported_reasoning_efforts=supported_reasoning_efforts, + default_reasoning_effort=default_reasoning_effort, + ) + + def to_dict(self) -> dict: + result: dict = {} + result["id"] = self.id + result["name"] = self.name + result["capabilities"] = self.capabilities.to_dict() + if self.policy is not None: + result["policy"] = self.policy.to_dict() + if self.billing is not None: + result["billing"] = self.billing.to_dict() + if self.supported_reasoning_efforts is not None: + result["supportedReasoningEfforts"] = self.supported_reasoning_efforts + if self.default_reasoning_effort is not None: + result["defaultReasoningEffort"] = self.default_reasoning_effort + return result + + +# ============================================================================ +# Session Metadata Types +# ============================================================================ + + +@dataclass +class SessionContext: + """Working directory context for a session""" + + cwd: str # Working directory where the session was created + gitRoot: str | None = None # Git repository root (if in a git repo) + repository: str | None = None # GitHub repository in "owner/repo" format + branch: str | None = None # Current git branch + + @staticmethod + def from_dict(obj: Any) -> SessionContext: + assert isinstance(obj, dict) + cwd = obj.get("cwd") + if cwd is None: + raise ValueError("Missing required field 'cwd' in SessionContext") + return SessionContext( + cwd=str(cwd), + gitRoot=obj.get("gitRoot"), + repository=obj.get("repository"), + branch=obj.get("branch"), + ) + + def to_dict(self) -> dict: + result: dict = {"cwd": self.cwd} + if self.gitRoot is not None: + result["gitRoot"] = self.gitRoot + if self.repository is not None: + result["repository"] = self.repository + if self.branch is not None: + result["branch"] = self.branch + return result + + +@dataclass +class SessionListFilter: + """Filter options for listing sessions""" + + cwd: str | None = None # Filter by exact cwd match + gitRoot: str | None = None # Filter by git root + repository: str | None = None # Filter by repository (owner/repo format) + branch: str | None = None # Filter by branch + + def to_dict(self) -> dict: + result: dict = {} + if self.cwd is not None: + result["cwd"] = self.cwd + if self.gitRoot is not None: + result["gitRoot"] = self.gitRoot + if self.repository is not None: + result["repository"] = self.repository + if self.branch is not None: + result["branch"] = self.branch + return result + + +@dataclass +class SessionMetadata: + """Metadata about a session""" + + sessionId: str # Session identifier + startTime: str # ISO 8601 timestamp when session was created + modifiedTime: str # ISO 8601 timestamp when session was last modified + isRemote: bool # Whether the session is remote + summary: str | None = None # Optional summary of the session + context: SessionContext | None = None # Working directory context + + @staticmethod + def from_dict(obj: Any) -> SessionMetadata: + assert isinstance(obj, dict) + sessionId = obj.get("sessionId") + startTime = obj.get("startTime") + modifiedTime = obj.get("modifiedTime") + isRemote = obj.get("isRemote") + if sessionId is None or startTime is None or modifiedTime is None or isRemote is None: + raise ValueError( + f"Missing required fields in SessionMetadata: sessionId={sessionId}, " + f"startTime={startTime}, modifiedTime={modifiedTime}, isRemote={isRemote}" + ) + summary = obj.get("summary") + context_dict = obj.get("context") + context = SessionContext.from_dict(context_dict) if context_dict else None + return SessionMetadata( + sessionId=str(sessionId), + startTime=str(startTime), + modifiedTime=str(modifiedTime), + isRemote=bool(isRemote), + summary=summary, + context=context, + ) + + def to_dict(self) -> dict: + result: dict = {} + result["sessionId"] = self.sessionId + result["startTime"] = self.startTime + result["modifiedTime"] = self.modifiedTime + result["isRemote"] = self.isRemote + if self.summary is not None: + result["summary"] = self.summary + if self.context is not None: + result["context"] = self.context.to_dict() + return result + + +# ============================================================================ +# Session Lifecycle Types (for TUI+server mode) +# ============================================================================ + +SessionLifecycleEventType = Literal[ + "session.created", + "session.deleted", + "session.updated", + "session.foreground", + "session.background", +] + + +@dataclass +class SessionLifecycleEventMetadata: + """Metadata for session lifecycle events.""" + + startTime: str + modifiedTime: str + summary: str | None = None + + @staticmethod + def from_dict(data: dict) -> SessionLifecycleEventMetadata: + return SessionLifecycleEventMetadata( + startTime=data.get("startTime", ""), + modifiedTime=data.get("modifiedTime", ""), + summary=data.get("summary"), + ) + + +@dataclass +class SessionLifecycleEvent: + """Session lifecycle event notification.""" + + type: SessionLifecycleEventType + sessionId: str + metadata: SessionLifecycleEventMetadata | None = None + + @staticmethod + def from_dict(data: dict) -> SessionLifecycleEvent: + metadata = None + if "metadata" in data and data["metadata"]: + metadata = SessionLifecycleEventMetadata.from_dict(data["metadata"]) + return SessionLifecycleEvent( + type=data.get("type", "session.updated"), + sessionId=data.get("sessionId", ""), + metadata=metadata, + ) + + +SessionLifecycleHandler = Callable[[SessionLifecycleEvent], None] HandlerUnsubcribe = Callable[[], None] @@ -840,7 +1425,7 @@ def get_state(self) -> ConnectionState: """ return self._state - async def ping(self, message: str | None = None) -> "PingResponse": + async def ping(self, message: str | None = None) -> PingResponse: """ Send a ping request to the server to verify connectivity. @@ -863,7 +1448,7 @@ async def ping(self, message: str | None = None) -> "PingResponse": result = await self._client.request("ping", {"message": message}) return PingResponse.from_dict(result) - async def get_status(self) -> "GetStatusResponse": + async def get_status(self) -> GetStatusResponse: """ Get CLI status including version and protocol information. @@ -883,7 +1468,7 @@ async def get_status(self) -> "GetStatusResponse": result = await self._client.request("status.get", {}) return GetStatusResponse.from_dict(result) - async def get_auth_status(self) -> "GetAuthStatusResponse": + async def get_auth_status(self) -> GetAuthStatusResponse: """ Get current authentication status. @@ -904,7 +1489,7 @@ async def get_auth_status(self) -> "GetAuthStatusResponse": result = await self._client.request("auth.getStatus", {}) return GetAuthStatusResponse.from_dict(result) - async def list_models(self) -> list["ModelInfo"]: + async def list_models(self) -> list[ModelInfo]: """ List available models with their metadata. @@ -954,9 +1539,7 @@ async def list_models(self) -> list["ModelInfo"]: return list(models) # Return a copy to prevent cache mutation - async def list_sessions( - self, filter: "SessionListFilter | None" = None - ) -> list["SessionMetadata"]: + async def list_sessions(self, filter: SessionListFilter | None = None) -> list[SessionMetadata]: """ List all available sessions known to the server. @@ -977,7 +1560,7 @@ async def list_sessions( >>> for session in sessions: ... print(f"Session: {session.sessionId}") >>> # Filter sessions by repository - >>> from copilot import SessionListFilter + >>> from copilot.client import SessionListFilter >>> filtered = await client.list_sessions(SessionListFilter(repository="owner/repo")) """ if not self._client: diff --git a/python/copilot/session.py b/python/copilot/session.py index e4a17f2f9..144743f4d 100644 --- a/python/copilot/session.py +++ b/python/copilot/session.py @@ -2,14 +2,18 @@ Copilot Session - represents a single conversation session with the Copilot CLI. This module provides the CopilotSession class for managing individual -conversation sessions with the Copilot CLI. +conversation sessions with the Copilot CLI, along with all session-related +configuration and handler types. """ +from __future__ import annotations + import asyncio import inspect import threading -from collections.abc import Callable -from typing import Any, Literal, cast +from collections.abc import Awaitable, Callable +from dataclasses import dataclass +from typing import Any, Literal, NotRequired, TypedDict, cast from .generated.rpc import ( Kind, @@ -22,26 +26,523 @@ SessionRpc, SessionToolsHandlePendingToolCallParams, ) -from .generated.session_events import SessionEvent, SessionEventType, session_event_from_dict -from .jsonrpc import JsonRpcError, ProcessExitedError -from .telemetry import get_trace_context, trace_context -from .types import ( - Attachment, +from .generated.session_events import ( PermissionRequest, - PermissionRequestResult, - SessionHooks, - Tool, - ToolHandler, - ToolInvocation, - ToolResult, - UserInputHandler, - UserInputRequest, - UserInputResponse, - _PermissionHandlerFn, -) -from .types import ( - SessionEvent as SessionEventTypeAlias, + SessionEvent, + SessionEventType, + session_event_from_dict, ) +from .jsonrpc import JsonRpcError, ProcessExitedError +from .telemetry import get_trace_context, trace_context +from .tools import Tool, ToolHandler, ToolInvocation, ToolResult + +# Re-export SessionEvent under an alias used internally +SessionEventTypeAlias = SessionEvent + +# ============================================================================ +# Reasoning Effort +# ============================================================================ + +ReasoningEffort = Literal["low", "medium", "high", "xhigh"] + +# ============================================================================ +# Attachment Types +# ============================================================================ + + +class SelectionRange(TypedDict): + line: int + character: int + + +class Selection(TypedDict): + start: SelectionRange + end: SelectionRange + + +class FileAttachment(TypedDict): + """File attachment.""" + + type: Literal["file"] + path: str + displayName: NotRequired[str] + + +class DirectoryAttachment(TypedDict): + """Directory attachment.""" + + type: Literal["directory"] + path: str + displayName: NotRequired[str] + + +class SelectionAttachment(TypedDict): + """Selection attachment with text from a file.""" + + type: Literal["selection"] + filePath: str + displayName: str + selection: NotRequired[Selection] + text: NotRequired[str] + + +Attachment = FileAttachment | DirectoryAttachment | SelectionAttachment + +# ============================================================================ +# System Message Configuration +# ============================================================================ + + +class SystemMessageAppendConfig(TypedDict, total=False): + """ + Append mode: Use CLI foundation with optional appended content. + """ + + mode: NotRequired[Literal["append"]] + content: NotRequired[str] + + +class SystemMessageReplaceConfig(TypedDict): + """ + Replace mode: Use caller-provided system message entirely. + Removes all SDK guardrails including security restrictions. + """ + + mode: Literal["replace"] + content: str + + +SystemMessageConfig = SystemMessageAppendConfig | SystemMessageReplaceConfig + +# ============================================================================ +# Permission Types +# ============================================================================ + +PermissionRequestResultKind = Literal[ + "approved", + "denied-by-rules", + "denied-by-content-exclusion-policy", + "denied-no-approval-rule-and-could-not-request-from-user", + "denied-interactively-by-user", + "no-result", +] + + +@dataclass +class PermissionRequestResult: + """Result of a permission request.""" + + kind: PermissionRequestResultKind = "denied-no-approval-rule-and-could-not-request-from-user" + rules: list[Any] | None = None + feedback: str | None = None + message: str | None = None + path: str | None = None + + +_PermissionHandlerFn = Callable[ + [PermissionRequest, dict[str, str]], + PermissionRequestResult | Awaitable[PermissionRequestResult], +] + + +class PermissionHandler: + @staticmethod + def approve_all( + request: PermissionRequest, invocation: dict[str, str] + ) -> PermissionRequestResult: + return PermissionRequestResult(kind="approved") + + +# ============================================================================ +# User Input Request Types +# ============================================================================ + + +class UserInputRequest(TypedDict, total=False): + """Request for user input from the agent (enables ask_user tool)""" + + question: str + choices: list[str] + allowFreeform: bool + + +class UserInputResponse(TypedDict): + """Response to a user input request""" + + answer: str + wasFreeform: bool + + +UserInputHandler = Callable[ + [UserInputRequest, dict[str, str]], + UserInputResponse | Awaitable[UserInputResponse], +] + +# ============================================================================ +# Hook Types +# ============================================================================ + + +class BaseHookInput(TypedDict): + """Base interface for all hook inputs""" + + timestamp: int + cwd: str + + +class PreToolUseHookInput(TypedDict): + """Input for pre-tool-use hook""" + + timestamp: int + cwd: str + toolName: str + toolArgs: Any + + +class PreToolUseHookOutput(TypedDict, total=False): + """Output for pre-tool-use hook""" + + permissionDecision: Literal["allow", "deny", "ask"] + permissionDecisionReason: str + modifiedArgs: Any + additionalContext: str + suppressOutput: bool + + +PreToolUseHandler = Callable[ + [PreToolUseHookInput, dict[str, str]], + PreToolUseHookOutput | None | Awaitable[PreToolUseHookOutput | None], +] + + +class PostToolUseHookInput(TypedDict): + """Input for post-tool-use hook""" + + timestamp: int + cwd: str + toolName: str + toolArgs: Any + toolResult: Any + + +class PostToolUseHookOutput(TypedDict, total=False): + """Output for post-tool-use hook""" + + modifiedResult: Any + additionalContext: str + suppressOutput: bool + + +PostToolUseHandler = Callable[ + [PostToolUseHookInput, dict[str, str]], + PostToolUseHookOutput | None | Awaitable[PostToolUseHookOutput | None], +] + + +class UserPromptSubmittedHookInput(TypedDict): + """Input for user-prompt-submitted hook""" + + timestamp: int + cwd: str + prompt: str + + +class UserPromptSubmittedHookOutput(TypedDict, total=False): + """Output for user-prompt-submitted hook""" + + modifiedPrompt: str + additionalContext: str + suppressOutput: bool + + +UserPromptSubmittedHandler = Callable[ + [UserPromptSubmittedHookInput, dict[str, str]], + UserPromptSubmittedHookOutput | None | Awaitable[UserPromptSubmittedHookOutput | None], +] + + +class SessionStartHookInput(TypedDict): + """Input for session-start hook""" + + timestamp: int + cwd: str + source: Literal["startup", "resume", "new"] + initialPrompt: NotRequired[str] + + +class SessionStartHookOutput(TypedDict, total=False): + """Output for session-start hook""" + + additionalContext: str + modifiedConfig: dict[str, Any] + + +SessionStartHandler = Callable[ + [SessionStartHookInput, dict[str, str]], + SessionStartHookOutput | None | Awaitable[SessionStartHookOutput | None], +] + + +class SessionEndHookInput(TypedDict): + """Input for session-end hook""" + + timestamp: int + cwd: str + reason: Literal["complete", "error", "abort", "timeout", "user_exit"] + finalMessage: NotRequired[str] + error: NotRequired[str] + + +class SessionEndHookOutput(TypedDict, total=False): + """Output for session-end hook""" + + suppressOutput: bool + cleanupActions: list[str] + sessionSummary: str + + +SessionEndHandler = Callable[ + [SessionEndHookInput, dict[str, str]], + SessionEndHookOutput | None | Awaitable[SessionEndHookOutput | None], +] + + +class ErrorOccurredHookInput(TypedDict): + """Input for error-occurred hook""" + + timestamp: int + cwd: str + error: str + errorContext: Literal["model_call", "tool_execution", "system", "user_input"] + recoverable: bool + + +class ErrorOccurredHookOutput(TypedDict, total=False): + """Output for error-occurred hook""" + + suppressOutput: bool + errorHandling: Literal["retry", "skip", "abort"] + retryCount: int + userNotification: str + + +ErrorOccurredHandler = Callable[ + [ErrorOccurredHookInput, dict[str, str]], + ErrorOccurredHookOutput | None | Awaitable[ErrorOccurredHookOutput | None], +] + + +class SessionHooks(TypedDict, total=False): + """Configuration for session hooks""" + + on_pre_tool_use: PreToolUseHandler + on_post_tool_use: PostToolUseHandler + on_user_prompt_submitted: UserPromptSubmittedHandler + on_session_start: SessionStartHandler + on_session_end: SessionEndHandler + on_error_occurred: ErrorOccurredHandler + + +# ============================================================================ +# MCP Server Configuration Types +# ============================================================================ + + +class MCPLocalServerConfig(TypedDict, total=False): + """Configuration for a local/stdio MCP server.""" + + tools: list[str] # List of tools to include. [] means none. "*" means all. + type: NotRequired[Literal["local", "stdio"]] # Server type + timeout: NotRequired[int] # Timeout in milliseconds + command: str # Command to run + args: list[str] # Command arguments + env: NotRequired[dict[str, str]] # Environment variables + cwd: NotRequired[str] # Working directory + + +class MCPRemoteServerConfig(TypedDict, total=False): + """Configuration for a remote MCP server (HTTP or SSE).""" + + tools: list[str] # List of tools to include. [] means none. "*" means all. + type: Literal["http", "sse"] # Server type + timeout: NotRequired[int] # Timeout in milliseconds + url: str # URL of the remote server + headers: NotRequired[dict[str, str]] # HTTP headers + + +MCPServerConfig = MCPLocalServerConfig | MCPRemoteServerConfig + +# ============================================================================ +# Custom Agent Configuration Types +# ============================================================================ + + +class CustomAgentConfig(TypedDict, total=False): + """Configuration for a custom agent.""" + + name: str # Unique name of the custom agent + display_name: NotRequired[str] # Display name for UI purposes + description: NotRequired[str] # Description of what the agent does + # List of tool names the agent can use + tools: NotRequired[list[str] | None] + prompt: str # The prompt content for the agent + # MCP servers specific to agent + mcp_servers: NotRequired[dict[str, MCPServerConfig]] + infer: NotRequired[bool] # Whether agent is available for model inference + + +class InfiniteSessionConfig(TypedDict, total=False): + """ + Configuration for infinite sessions with automatic context compaction + and workspace persistence. + + When enabled, sessions automatically manage context window limits through + background compaction and persist state to a workspace directory. + """ + + # Whether infinite sessions are enabled (default: True) + enabled: bool + # Context utilization threshold (0.0-1.0) at which background compaction starts. + # Compaction runs asynchronously, allowing the session to continue processing. + # Default: 0.80 + background_compaction_threshold: float + # Context utilization threshold (0.0-1.0) at which the session blocks until + # compaction completes. This prevents context overflow when compaction hasn't + # finished in time. Default: 0.95 + buffer_exhaustion_threshold: float + + +# ============================================================================ +# Session Configuration +# ============================================================================ + + +class AzureProviderOptions(TypedDict, total=False): + """Azure-specific provider configuration""" + + api_version: str # Azure API version. Defaults to "2024-10-21". + + +class ProviderConfig(TypedDict, total=False): + """Configuration for a custom API provider""" + + type: Literal["openai", "azure", "anthropic"] + wire_api: Literal["completions", "responses"] + base_url: str + api_key: str + # Bearer token for authentication. Sets the Authorization header directly. + # Use this for services requiring bearer token auth instead of API key. + # Takes precedence over api_key when both are set. + bearer_token: str + azure: AzureProviderOptions # Azure-specific options + + +class SessionConfig(TypedDict, total=False): + """Configuration for creating a session""" + + session_id: str # Optional custom session ID + # Client name to identify the application using the SDK. + # Included in the User-Agent header for API requests. + client_name: str + model: str # Model to use for this session. Use client.list_models() to see available models. + # Reasoning effort level for models that support it. + # Only valid for models where capabilities.supports.reasoning_effort is True. + reasoning_effort: ReasoningEffort + tools: list[Tool] + system_message: SystemMessageConfig # System message configuration + # List of tool names to allow (takes precedence over excluded_tools) + available_tools: list[str] + # List of tool names to disable (ignored if available_tools is set) + excluded_tools: list[str] + # Handler for permission requests from the server + on_permission_request: _PermissionHandlerFn + # Handler for user input requests from the agent (enables ask_user tool) + on_user_input_request: UserInputHandler + # Hook handlers for intercepting session lifecycle events + hooks: SessionHooks + # Working directory for the session. Tool operations will be relative to this directory. + working_directory: str + # Custom provider configuration (BYOK - Bring Your Own Key) + provider: ProviderConfig + # Enable streaming of assistant message and reasoning chunks + # When True, assistant.message_delta and assistant.reasoning_delta events + # with delta_content are sent as the response is generated + streaming: bool + # MCP server configurations for the session + mcp_servers: dict[str, MCPServerConfig] + # Custom agent configurations for the session + custom_agents: list[CustomAgentConfig] + # Name of the custom agent to activate when the session starts. + # Must match the name of one of the agents in custom_agents. + agent: str + # Override the default configuration directory location. + # When specified, the session will use this directory for storing config and state. + config_dir: str + # Directories to load skills from + skill_directories: list[str] + # List of skill names to disable + disabled_skills: list[str] + # Infinite session configuration for persistent workspaces and automatic compaction. + # When enabled (default), sessions automatically manage context limits and persist state. + # Set to {"enabled": False} to disable. + infinite_sessions: InfiniteSessionConfig + # Optional event handler that is registered on the session before the + # session.create RPC is issued, ensuring early events (e.g. session.start) + # are delivered. Equivalent to calling session.on(handler) immediately + # after creation, but executes earlier in the lifecycle so no events are missed. + on_event: Callable[[SessionEvent], None] + + +class ResumeSessionConfig(TypedDict, total=False): + """Configuration for resuming a session""" + + # Client name to identify the application using the SDK. + # Included in the User-Agent header for API requests. + client_name: str + # Model to use for this session. Can change the model when resuming. + model: str + tools: list[Tool] + system_message: SystemMessageConfig # System message configuration + # List of tool names to allow (takes precedence over excluded_tools) + available_tools: list[str] + # List of tool names to disable (ignored if available_tools is set) + excluded_tools: list[str] + provider: ProviderConfig + # Reasoning effort level for models that support it. + reasoning_effort: ReasoningEffort + on_permission_request: _PermissionHandlerFn + # Handler for user input requestsfrom the agent (enables ask_user tool) + on_user_input_request: UserInputHandler + # Hook handlers for intercepting session lifecycle events + hooks: SessionHooks + # Working directory for the session. Tool operations will be relative to this directory. + working_directory: str + # Override the default configuration directory location. + config_dir: str + # Enable streaming of assistant message chunks + streaming: bool + # MCP server configurations for the session + mcp_servers: dict[str, MCPServerConfig] + # Custom agent configurations for the session + custom_agents: list[CustomAgentConfig] + # Name of the custom agent to activate when the session starts. + # Must match the name of one of the agents in custom_agents. + agent: str + # Directories to load skills from + skill_directories: list[str] + # List of skill names to disable + disabled_skills: list[str] + # Infinite session configuration for persistent workspaces and automatic compaction. + infinite_sessions: InfiniteSessionConfig + # When True, skips emitting the session.resume event. + # Useful for reconnecting to a session without triggering resume-related side effects. + disable_resume: bool + # Optional event handler registered before the session.resume RPC is issued, + # ensuring early events are delivered. See SessionConfig.on_event. + on_event: Callable[[SessionEvent], None] + + +SessionEventHandler = Callable[[SessionEvent], None] class CopilotSession: @@ -708,7 +1209,7 @@ async def destroy(self) -> None: ) await self.disconnect() - async def __aenter__(self) -> "CopilotSession": + async def __aenter__(self) -> CopilotSession: """Enable use as an async context manager.""" return self diff --git a/python/copilot/tools.py b/python/copilot/tools.py index 58e58d97e..f559cfefe 100644 --- a/python/copilot/tools.py +++ b/python/copilot/tools.py @@ -9,12 +9,59 @@ import inspect import json -from collections.abc import Callable -from typing import Any, TypeVar, get_type_hints, overload +from collections.abc import Awaitable, Callable +from dataclasses import dataclass +from typing import Any, Literal, TypeVar, get_type_hints, overload from pydantic import BaseModel -from .types import Tool, ToolInvocation, ToolResult +ToolResultType = Literal["success", "failure", "rejected", "denied"] + + +@dataclass +class ToolBinaryResult: + """Binary content returned by a tool.""" + + data: str = "" + mime_type: str = "" + type: str = "" + description: str = "" + + +@dataclass +class ToolResult: + """Result of a tool invocation.""" + + text_result_for_llm: str = "" + result_type: ToolResultType = "success" + error: str | None = None + binary_results_for_llm: list[ToolBinaryResult] | None = None + session_log: str | None = None + tool_telemetry: dict[str, Any] | None = None + + +@dataclass +class ToolInvocation: + """Context passed to a tool handler when invoked.""" + + session_id: str = "" + tool_call_id: str = "" + tool_name: str = "" + arguments: Any = None + + +ToolHandler = Callable[[ToolInvocation], ToolResult | Awaitable[ToolResult]] + + +@dataclass +class Tool: + name: str + description: str + handler: ToolHandler + parameters: dict[str, Any] | None = None + overrides_built_in_tool: bool = False + skip_permission: bool = False + T = TypeVar("T", bound=BaseModel) R = TypeVar("R") diff --git a/python/copilot/types.py b/python/copilot/types.py deleted file mode 100644 index af124bb0a..000000000 --- a/python/copilot/types.py +++ /dev/null @@ -1,1155 +0,0 @@ -""" -Type definitions for the Copilot SDK -""" - -from __future__ import annotations - -from collections.abc import Awaitable, Callable -from dataclasses import KW_ONLY, dataclass, field -from typing import Any, Literal, NotRequired, TypedDict - -# Import generated SessionEvent types -from .generated.session_events import ( - PermissionRequest, - SessionEvent, -) - -# SessionEvent is now imported from generated types -# It provides proper type discrimination for all event types - -# Valid reasoning effort levels for models that support it -ReasoningEffort = Literal["low", "medium", "high", "xhigh"] - -# Connection state -ConnectionState = Literal["disconnected", "connecting", "connected", "error"] - -# Log level type -LogLevel = Literal["none", "error", "warning", "info", "debug", "all"] - - -# Selection range for text attachments -class SelectionRange(TypedDict): - line: int - character: int - - -class Selection(TypedDict): - start: SelectionRange - end: SelectionRange - - -# Attachment types - discriminated union based on 'type' field -class FileAttachment(TypedDict): - """File attachment.""" - - type: Literal["file"] - path: str - displayName: NotRequired[str] - - -class DirectoryAttachment(TypedDict): - """Directory attachment.""" - - type: Literal["directory"] - path: str - displayName: NotRequired[str] - - -class SelectionAttachment(TypedDict): - """Selection attachment with text from a file.""" - - type: Literal["selection"] - filePath: str - displayName: str - selection: NotRequired[Selection] - text: NotRequired[str] - - -# Attachment type - union of all attachment types -Attachment = FileAttachment | DirectoryAttachment | SelectionAttachment - - -# Configuration for OpenTelemetry integration with the Copilot CLI. -class TelemetryConfig(TypedDict, total=False): - """Configuration for OpenTelemetry integration with the Copilot CLI.""" - - otlp_endpoint: str - """OTLP HTTP endpoint URL for trace/metric export. Sets OTEL_EXPORTER_OTLP_ENDPOINT.""" - file_path: str - """File path for JSON-lines trace output. Sets COPILOT_OTEL_FILE_EXPORTER_PATH.""" - exporter_type: str - """Exporter backend type: "otlp-http" or "file". Sets COPILOT_OTEL_EXPORTER_TYPE.""" - source_name: str - """Instrumentation scope name. Sets COPILOT_OTEL_SOURCE_NAME.""" - capture_content: bool - """Whether to capture message content. Sets OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT.""" # noqa: E501 - - -# Configuration for CopilotClient connection modes - - -@dataclass -class SubprocessConfig: - """Config for spawning a local Copilot CLI subprocess. - - Example: - >>> config = SubprocessConfig(github_token="ghp_...") - >>> client = CopilotClient(config) - - >>> # Custom CLI path with TCP transport - >>> config = SubprocessConfig( - ... cli_path="/usr/local/bin/copilot", - ... use_stdio=False, - ... log_level="debug", - ... ) - """ - - cli_path: str | None = None - """Path to the Copilot CLI executable. ``None`` uses the bundled binary.""" - - cli_args: list[str] = field(default_factory=list) - """Extra arguments passed to the CLI executable (inserted before SDK-managed args).""" - - _: KW_ONLY - - cwd: str | None = None - """Working directory for the CLI process. ``None`` uses the current directory.""" - - use_stdio: bool = True - """Use stdio transport (``True``, default) or TCP (``False``).""" - - port: int = 0 - """TCP port for the CLI server (only when ``use_stdio=False``). 0 means random.""" - - log_level: LogLevel = "info" - """Log level for the CLI process.""" - - env: dict[str, str] | None = None - """Environment variables for the CLI process. ``None`` inherits the current env.""" - - github_token: str | None = None - """GitHub token for authentication. Takes priority over other auth methods.""" - - use_logged_in_user: bool | None = None - """Use the logged-in user for authentication. - - ``None`` (default) resolves to ``True`` unless ``github_token`` is set. - """ - - telemetry: TelemetryConfig | None = None - """OpenTelemetry configuration. Providing this enables telemetry — no separate flag needed.""" - - -@dataclass -class ExternalServerConfig: - """Config for connecting to an existing Copilot CLI server over TCP. - - Example: - >>> config = ExternalServerConfig(url="localhost:3000") - >>> client = CopilotClient(config) - """ - - url: str - """Server URL. Supports ``"host:port"``, ``"http://host:port"``, or just ``"port"``.""" - - -ToolResultType = Literal["success", "failure", "rejected", "denied"] - - -@dataclass -class ToolBinaryResult: - """Binary content returned by a tool.""" - - data: str = "" - mime_type: str = "" - type: str = "" - description: str = "" - - -@dataclass -class ToolResult: - """Result of a tool invocation.""" - - text_result_for_llm: str = "" - result_type: ToolResultType = "success" - error: str | None = None - binary_results_for_llm: list[ToolBinaryResult] | None = None - session_log: str | None = None - tool_telemetry: dict[str, Any] | None = None - - -@dataclass -class ToolInvocation: - """Context passed to a tool handler when invoked.""" - - session_id: str = "" - tool_call_id: str = "" - tool_name: str = "" - arguments: Any = None - - -ToolHandler = Callable[[ToolInvocation], ToolResult | Awaitable[ToolResult]] - - -@dataclass -class Tool: - name: str - description: str - handler: ToolHandler - parameters: dict[str, Any] | None = None - overrides_built_in_tool: bool = False - skip_permission: bool = False - - -# System message configuration (discriminated union) -# Use SystemMessageAppendConfig for default behavior, SystemMessageReplaceConfig for full control - - -class SystemMessageAppendConfig(TypedDict, total=False): - """ - Append mode: Use CLI foundation with optional appended content. - """ - - mode: NotRequired[Literal["append"]] - content: NotRequired[str] - - -class SystemMessageReplaceConfig(TypedDict): - """ - Replace mode: Use caller-provided system message entirely. - Removes all SDK guardrails including security restrictions. - """ - - mode: Literal["replace"] - content: str - - -# Union type - use one or the other -SystemMessageConfig = SystemMessageAppendConfig | SystemMessageReplaceConfig - - -# Permission result types - -PermissionRequestResultKind = Literal[ - "approved", - "denied-by-rules", - "denied-by-content-exclusion-policy", - "denied-no-approval-rule-and-could-not-request-from-user", - "denied-interactively-by-user", - "no-result", -] - - -@dataclass -class PermissionRequestResult: - """Result of a permission request.""" - - kind: PermissionRequestResultKind = "denied-no-approval-rule-and-could-not-request-from-user" - rules: list[Any] | None = None - feedback: str | None = None - message: str | None = None - path: str | None = None - - -_PermissionHandlerFn = Callable[ - [PermissionRequest, dict[str, str]], - PermissionRequestResult | Awaitable[PermissionRequestResult], -] - - -class PermissionHandler: - @staticmethod - def approve_all( - request: PermissionRequest, invocation: dict[str, str] - ) -> PermissionRequestResult: - return PermissionRequestResult(kind="approved") - - -# ============================================================================ -# User Input Request Types -# ============================================================================ - - -class UserInputRequest(TypedDict, total=False): - """Request for user input from the agent (enables ask_user tool)""" - - question: str - choices: list[str] - allowFreeform: bool - - -class UserInputResponse(TypedDict): - """Response to a user input request""" - - answer: str - wasFreeform: bool - - -UserInputHandler = Callable[ - [UserInputRequest, dict[str, str]], - UserInputResponse | Awaitable[UserInputResponse], -] - - -# ============================================================================ -# Hook Types -# ============================================================================ - - -class BaseHookInput(TypedDict): - """Base interface for all hook inputs""" - - timestamp: int - cwd: str - - -class PreToolUseHookInput(TypedDict): - """Input for pre-tool-use hook""" - - timestamp: int - cwd: str - toolName: str - toolArgs: Any - - -class PreToolUseHookOutput(TypedDict, total=False): - """Output for pre-tool-use hook""" - - permissionDecision: Literal["allow", "deny", "ask"] - permissionDecisionReason: str - modifiedArgs: Any - additionalContext: str - suppressOutput: bool - - -PreToolUseHandler = Callable[ - [PreToolUseHookInput, dict[str, str]], - PreToolUseHookOutput | None | Awaitable[PreToolUseHookOutput | None], -] - - -class PostToolUseHookInput(TypedDict): - """Input for post-tool-use hook""" - - timestamp: int - cwd: str - toolName: str - toolArgs: Any - toolResult: Any - - -class PostToolUseHookOutput(TypedDict, total=False): - """Output for post-tool-use hook""" - - modifiedResult: Any - additionalContext: str - suppressOutput: bool - - -PostToolUseHandler = Callable[ - [PostToolUseHookInput, dict[str, str]], - PostToolUseHookOutput | None | Awaitable[PostToolUseHookOutput | None], -] - - -class UserPromptSubmittedHookInput(TypedDict): - """Input for user-prompt-submitted hook""" - - timestamp: int - cwd: str - prompt: str - - -class UserPromptSubmittedHookOutput(TypedDict, total=False): - """Output for user-prompt-submitted hook""" - - modifiedPrompt: str - additionalContext: str - suppressOutput: bool - - -UserPromptSubmittedHandler = Callable[ - [UserPromptSubmittedHookInput, dict[str, str]], - UserPromptSubmittedHookOutput | None | Awaitable[UserPromptSubmittedHookOutput | None], -] - - -class SessionStartHookInput(TypedDict): - """Input for session-start hook""" - - timestamp: int - cwd: str - source: Literal["startup", "resume", "new"] - initialPrompt: NotRequired[str] - - -class SessionStartHookOutput(TypedDict, total=False): - """Output for session-start hook""" - - additionalContext: str - modifiedConfig: dict[str, Any] - - -SessionStartHandler = Callable[ - [SessionStartHookInput, dict[str, str]], - SessionStartHookOutput | None | Awaitable[SessionStartHookOutput | None], -] - - -class SessionEndHookInput(TypedDict): - """Input for session-end hook""" - - timestamp: int - cwd: str - reason: Literal["complete", "error", "abort", "timeout", "user_exit"] - finalMessage: NotRequired[str] - error: NotRequired[str] - - -class SessionEndHookOutput(TypedDict, total=False): - """Output for session-end hook""" - - suppressOutput: bool - cleanupActions: list[str] - sessionSummary: str - - -SessionEndHandler = Callable[ - [SessionEndHookInput, dict[str, str]], - SessionEndHookOutput | None | Awaitable[SessionEndHookOutput | None], -] - - -class ErrorOccurredHookInput(TypedDict): - """Input for error-occurred hook""" - - timestamp: int - cwd: str - error: str - errorContext: Literal["model_call", "tool_execution", "system", "user_input"] - recoverable: bool - - -class ErrorOccurredHookOutput(TypedDict, total=False): - """Output for error-occurred hook""" - - suppressOutput: bool - errorHandling: Literal["retry", "skip", "abort"] - retryCount: int - userNotification: str - - -ErrorOccurredHandler = Callable[ - [ErrorOccurredHookInput, dict[str, str]], - ErrorOccurredHookOutput | None | Awaitable[ErrorOccurredHookOutput | None], -] - - -class SessionHooks(TypedDict, total=False): - """Configuration for session hooks""" - - on_pre_tool_use: PreToolUseHandler - on_post_tool_use: PostToolUseHandler - on_user_prompt_submitted: UserPromptSubmittedHandler - on_session_start: SessionStartHandler - on_session_end: SessionEndHandler - on_error_occurred: ErrorOccurredHandler - - -# ============================================================================ -# MCP Server Configuration Types -# ============================================================================ - - -class MCPLocalServerConfig(TypedDict, total=False): - """Configuration for a local/stdio MCP server.""" - - tools: list[str] # List of tools to include. [] means none. "*" means all. - type: NotRequired[Literal["local", "stdio"]] # Server type - timeout: NotRequired[int] # Timeout in milliseconds - command: str # Command to run - args: list[str] # Command arguments - env: NotRequired[dict[str, str]] # Environment variables - cwd: NotRequired[str] # Working directory - - -class MCPRemoteServerConfig(TypedDict, total=False): - """Configuration for a remote MCP server (HTTP or SSE).""" - - tools: list[str] # List of tools to include. [] means none. "*" means all. - type: Literal["http", "sse"] # Server type - timeout: NotRequired[int] # Timeout in milliseconds - url: str # URL of the remote server - headers: NotRequired[dict[str, str]] # HTTP headers - - -MCPServerConfig = MCPLocalServerConfig | MCPRemoteServerConfig - - -# ============================================================================ -# Custom Agent Configuration Types -# ============================================================================ - - -class CustomAgentConfig(TypedDict, total=False): - """Configuration for a custom agent.""" - - name: str # Unique name of the custom agent - display_name: NotRequired[str] # Display name for UI purposes - description: NotRequired[str] # Description of what the agent does - # List of tool names the agent can use - tools: NotRequired[list[str] | None] - prompt: str # The prompt content for the agent - # MCP servers specific to agent - mcp_servers: NotRequired[dict[str, MCPServerConfig]] - infer: NotRequired[bool] # Whether agent is available for model inference - - -class InfiniteSessionConfig(TypedDict, total=False): - """ - Configuration for infinite sessions with automatic context compaction - and workspace persistence. - - When enabled, sessions automatically manage context window limits through - background compaction and persist state to a workspace directory. - """ - - # Whether infinite sessions are enabled (default: True) - enabled: bool - # Context utilization threshold (0.0-1.0) at which background compaction starts. - # Compaction runs asynchronously, allowing the session to continue processing. - # Default: 0.80 - background_compaction_threshold: float - # Context utilization threshold (0.0-1.0) at which the session blocks until - # compaction completes. This prevents context overflow when compaction hasn't - # finished in time. Default: 0.95 - buffer_exhaustion_threshold: float - - -# Configuration for creating a session -class SessionConfig(TypedDict, total=False): - """Configuration for creating a session""" - - session_id: str # Optional custom session ID - # Client name to identify the application using the SDK. - # Included in the User-Agent header for API requests. - client_name: str - model: str # Model to use for this session. Use client.list_models() to see available models. - # Reasoning effort level for models that support it. - # Only valid for models where capabilities.supports.reasoning_effort is True. - reasoning_effort: ReasoningEffort - tools: list[Tool] - system_message: SystemMessageConfig # System message configuration - # List of tool names to allow (takes precedence over excluded_tools) - available_tools: list[str] - # List of tool names to disable (ignored if available_tools is set) - excluded_tools: list[str] - # Handler for permission requests from the server - on_permission_request: _PermissionHandlerFn - # Handler for user input requests from the agent (enables ask_user tool) - on_user_input_request: UserInputHandler - # Hook handlers for intercepting session lifecycle events - hooks: SessionHooks - # Working directory for the session. Tool operations will be relative to this directory. - working_directory: str - # Custom provider configuration (BYOK - Bring Your Own Key) - provider: ProviderConfig - # Enable streaming of assistant message and reasoning chunks - # When True, assistant.message_delta and assistant.reasoning_delta events - # with delta_content are sent as the response is generated - streaming: bool - # MCP server configurations for the session - mcp_servers: dict[str, MCPServerConfig] - # Custom agent configurations for the session - custom_agents: list[CustomAgentConfig] - # Name of the custom agent to activate when the session starts. - # Must match the name of one of the agents in custom_agents. - agent: str - # Override the default configuration directory location. - # When specified, the session will use this directory for storing config and state. - config_dir: str - # Directories to load skills from - skill_directories: list[str] - # List of skill names to disable - disabled_skills: list[str] - # Infinite session configuration for persistent workspaces and automatic compaction. - # When enabled (default), sessions automatically manage context limits and persist state. - # Set to {"enabled": False} to disable. - infinite_sessions: InfiniteSessionConfig - # Optional event handler that is registered on the session before the - # session.create RPC is issued, ensuring early events (e.g. session.start) - # are delivered. Equivalent to calling session.on(handler) immediately - # after creation, but executes earlier in the lifecycle so no events are missed. - on_event: Callable[[SessionEvent], None] - - -class AzureProviderOptions(TypedDict, total=False): - """Azure-specific provider configuration""" - - api_version: str # Azure API version. Defaults to "2024-10-21". - - -# Configuration for a custom API provider -class ProviderConfig(TypedDict, total=False): - """Configuration for a custom API provider""" - - type: Literal["openai", "azure", "anthropic"] - wire_api: Literal["completions", "responses"] - base_url: str - api_key: str - # Bearer token for authentication. Sets the Authorization header directly. - # Use this for services requiring bearer token auth instead of API key. - # Takes precedence over api_key when both are set. - bearer_token: str - azure: AzureProviderOptions # Azure-specific options - - -# Configuration for resuming a session -class ResumeSessionConfig(TypedDict, total=False): - """Configuration for resuming a session""" - - # Client name to identify the application using the SDK. - # Included in the User-Agent header for API requests. - client_name: str - # Model to use for this session. Can change the model when resuming. - model: str - tools: list[Tool] - system_message: SystemMessageConfig # System message configuration - # List of tool names to allow (takes precedence over excluded_tools) - available_tools: list[str] - # List of tool names to disable (ignored if available_tools is set) - excluded_tools: list[str] - provider: ProviderConfig - # Reasoning effort level for models that support it. - reasoning_effort: ReasoningEffort - on_permission_request: _PermissionHandlerFn - # Handler for user input requestsfrom the agent (enables ask_user tool) - on_user_input_request: UserInputHandler - # Hook handlers for intercepting session lifecycle events - hooks: SessionHooks - # Working directory for the session. Tool operations will be relative to this directory. - working_directory: str - # Override the default configuration directory location. - config_dir: str - # Enable streaming of assistant message chunks - streaming: bool - # MCP server configurations for the session - mcp_servers: dict[str, MCPServerConfig] - # Custom agent configurations for the session - custom_agents: list[CustomAgentConfig] - # Name of the custom agent to activate when the session starts. - # Must match the name of one of the agents in custom_agents. - agent: str - # Directories to load skills from - skill_directories: list[str] - # List of skill names to disable - disabled_skills: list[str] - # Infinite session configuration for persistent workspaces and automatic compaction. - infinite_sessions: InfiniteSessionConfig - # When True, skips emitting the session.resume event. - # Useful for reconnecting to a session without triggering resume-related side effects. - disable_resume: bool - # Optional event handler registered before the session.resume RPC is issued, - # ensuring early events are delivered. See SessionConfig.on_event. - on_event: Callable[[SessionEvent], None] - - -# Event handler type -SessionEventHandler = Callable[[SessionEvent], None] - - -# Response from ping -@dataclass -class PingResponse: - """Response from ping""" - - message: str # Echo message with "pong: " prefix - timestamp: int # Server timestamp in milliseconds - protocolVersion: int # Protocol version for SDK compatibility - - @staticmethod - def from_dict(obj: Any) -> PingResponse: - assert isinstance(obj, dict) - message = obj.get("message") - timestamp = obj.get("timestamp") - protocolVersion = obj.get("protocolVersion") - if message is None or timestamp is None or protocolVersion is None: - raise ValueError( - f"Missing required fields in PingResponse: message={message}, " - f"timestamp={timestamp}, protocolVersion={protocolVersion}" - ) - return PingResponse(str(message), int(timestamp), int(protocolVersion)) - - def to_dict(self) -> dict: - result: dict = {} - result["message"] = self.message - result["timestamp"] = self.timestamp - result["protocolVersion"] = self.protocolVersion - return result - - -# Error information from client stop -@dataclass -class StopError(Exception): - """Error that occurred during client stop cleanup.""" - - message: str # Error message describing what failed during cleanup - - def __post_init__(self) -> None: - Exception.__init__(self, self.message) - - @staticmethod - def from_dict(obj: Any) -> StopError: - assert isinstance(obj, dict) - message = obj.get("message") - if message is None: - raise ValueError("Missing required field 'message' in StopError") - return StopError(str(message)) - - def to_dict(self) -> dict: - result: dict = {} - result["message"] = self.message - return result - - -# Response from status.get -@dataclass -class GetStatusResponse: - """Response from status.get""" - - version: str # Package version (e.g., "1.0.0") - protocolVersion: int # Protocol version for SDK compatibility - - @staticmethod - def from_dict(obj: Any) -> GetStatusResponse: - assert isinstance(obj, dict) - version = obj.get("version") - protocolVersion = obj.get("protocolVersion") - if version is None or protocolVersion is None: - raise ValueError( - f"Missing required fields in GetStatusResponse: version={version}, " - f"protocolVersion={protocolVersion}" - ) - return GetStatusResponse(str(version), int(protocolVersion)) - - def to_dict(self) -> dict: - result: dict = {} - result["version"] = self.version - result["protocolVersion"] = self.protocolVersion - return result - - -# Response from auth.getStatus -@dataclass -class GetAuthStatusResponse: - """Response from auth.getStatus""" - - isAuthenticated: bool # Whether the user is authenticated - authType: str | None = None # Authentication type - host: str | None = None # GitHub host URL - login: str | None = None # User login name - statusMessage: str | None = None # Human-readable status message - - @staticmethod - def from_dict(obj: Any) -> GetAuthStatusResponse: - assert isinstance(obj, dict) - isAuthenticated = obj.get("isAuthenticated") - if isAuthenticated is None: - raise ValueError("Missing required field 'isAuthenticated' in GetAuthStatusResponse") - authType = obj.get("authType") - host = obj.get("host") - login = obj.get("login") - statusMessage = obj.get("statusMessage") - return GetAuthStatusResponse( - isAuthenticated=bool(isAuthenticated), - authType=authType, - host=host, - login=login, - statusMessage=statusMessage, - ) - - def to_dict(self) -> dict: - result: dict = {} - result["isAuthenticated"] = self.isAuthenticated - if self.authType is not None: - result["authType"] = self.authType - if self.host is not None: - result["host"] = self.host - if self.login is not None: - result["login"] = self.login - if self.statusMessage is not None: - result["statusMessage"] = self.statusMessage - return result - - -# Model capabilities -@dataclass -class ModelVisionLimits: - """Vision-specific limits""" - - supported_media_types: list[str] | None = None - max_prompt_images: int | None = None - max_prompt_image_size: int | None = None - - @staticmethod - def from_dict(obj: Any) -> ModelVisionLimits: - assert isinstance(obj, dict) - supported_media_types = obj.get("supported_media_types") - max_prompt_images = obj.get("max_prompt_images") - max_prompt_image_size = obj.get("max_prompt_image_size") - return ModelVisionLimits( - supported_media_types=supported_media_types, - max_prompt_images=max_prompt_images, - max_prompt_image_size=max_prompt_image_size, - ) - - def to_dict(self) -> dict: - result: dict = {} - if self.supported_media_types is not None: - result["supported_media_types"] = self.supported_media_types - if self.max_prompt_images is not None: - result["max_prompt_images"] = self.max_prompt_images - if self.max_prompt_image_size is not None: - result["max_prompt_image_size"] = self.max_prompt_image_size - return result - - -@dataclass -class ModelLimits: - """Model limits""" - - max_prompt_tokens: int | None = None - max_context_window_tokens: int | None = None - vision: ModelVisionLimits | None = None - - @staticmethod - def from_dict(obj: Any) -> ModelLimits: - assert isinstance(obj, dict) - max_prompt_tokens = obj.get("max_prompt_tokens") - max_context_window_tokens = obj.get("max_context_window_tokens") - vision_dict = obj.get("vision") - vision = ModelVisionLimits.from_dict(vision_dict) if vision_dict else None - return ModelLimits( - max_prompt_tokens=max_prompt_tokens, - max_context_window_tokens=max_context_window_tokens, - vision=vision, - ) - - def to_dict(self) -> dict: - result: dict = {} - if self.max_prompt_tokens is not None: - result["max_prompt_tokens"] = self.max_prompt_tokens - if self.max_context_window_tokens is not None: - result["max_context_window_tokens"] = self.max_context_window_tokens - if self.vision is not None: - result["vision"] = self.vision.to_dict() - return result - - -@dataclass -class ModelSupports: - """Model support flags""" - - vision: bool - reasoning_effort: bool = False # Whether this model supports reasoning effort - - @staticmethod - def from_dict(obj: Any) -> ModelSupports: - assert isinstance(obj, dict) - vision = obj.get("vision") - if vision is None: - raise ValueError("Missing required field 'vision' in ModelSupports") - reasoning_effort = obj.get("reasoningEffort", False) - return ModelSupports(vision=bool(vision), reasoning_effort=bool(reasoning_effort)) - - def to_dict(self) -> dict: - result: dict = {} - result["vision"] = self.vision - result["reasoningEffort"] = self.reasoning_effort - return result - - -@dataclass -class ModelCapabilities: - """Model capabilities and limits""" - - supports: ModelSupports - limits: ModelLimits - - @staticmethod - def from_dict(obj: Any) -> ModelCapabilities: - assert isinstance(obj, dict) - supports_dict = obj.get("supports") - limits_dict = obj.get("limits") - if supports_dict is None or limits_dict is None: - raise ValueError( - f"Missing required fields in ModelCapabilities: supports={supports_dict}, " - f"limits={limits_dict}" - ) - supports = ModelSupports.from_dict(supports_dict) - limits = ModelLimits.from_dict(limits_dict) - return ModelCapabilities(supports=supports, limits=limits) - - def to_dict(self) -> dict: - result: dict = {} - result["supports"] = self.supports.to_dict() - result["limits"] = self.limits.to_dict() - return result - - -@dataclass -class ModelPolicy: - """Model policy state""" - - state: str # "enabled", "disabled", or "unconfigured" - terms: str - - @staticmethod - def from_dict(obj: Any) -> ModelPolicy: - assert isinstance(obj, dict) - state = obj.get("state") - terms = obj.get("terms") - if state is None or terms is None: - raise ValueError( - f"Missing required fields in ModelPolicy: state={state}, terms={terms}" - ) - return ModelPolicy(state=str(state), terms=str(terms)) - - def to_dict(self) -> dict: - result: dict = {} - result["state"] = self.state - result["terms"] = self.terms - return result - - -@dataclass -class ModelBilling: - """Model billing information""" - - multiplier: float - - @staticmethod - def from_dict(obj: Any) -> ModelBilling: - assert isinstance(obj, dict) - multiplier = obj.get("multiplier") - if multiplier is None: - raise ValueError("Missing required field 'multiplier' in ModelBilling") - return ModelBilling(multiplier=float(multiplier)) - - def to_dict(self) -> dict: - result: dict = {} - result["multiplier"] = self.multiplier - return result - - -@dataclass -class ModelInfo: - """Information about an available model""" - - id: str # Model identifier (e.g., "claude-sonnet-4.5") - name: str # Display name - capabilities: ModelCapabilities # Model capabilities and limits - policy: ModelPolicy | None = None # Policy state - billing: ModelBilling | None = None # Billing information - # Supported reasoning effort levels (only present if model supports reasoning effort) - supported_reasoning_efforts: list[str] | None = None - # Default reasoning effort level (only present if model supports reasoning effort) - default_reasoning_effort: str | None = None - - @staticmethod - def from_dict(obj: Any) -> ModelInfo: - assert isinstance(obj, dict) - id = obj.get("id") - name = obj.get("name") - capabilities_dict = obj.get("capabilities") - if id is None or name is None or capabilities_dict is None: - raise ValueError( - f"Missing required fields in ModelInfo: id={id}, name={name}, " - f"capabilities={capabilities_dict}" - ) - capabilities = ModelCapabilities.from_dict(capabilities_dict) - policy_dict = obj.get("policy") - policy = ModelPolicy.from_dict(policy_dict) if policy_dict else None - billing_dict = obj.get("billing") - billing = ModelBilling.from_dict(billing_dict) if billing_dict else None - supported_reasoning_efforts = obj.get("supportedReasoningEfforts") - default_reasoning_effort = obj.get("defaultReasoningEffort") - return ModelInfo( - id=str(id), - name=str(name), - capabilities=capabilities, - policy=policy, - billing=billing, - supported_reasoning_efforts=supported_reasoning_efforts, - default_reasoning_effort=default_reasoning_effort, - ) - - def to_dict(self) -> dict: - result: dict = {} - result["id"] = self.id - result["name"] = self.name - result["capabilities"] = self.capabilities.to_dict() - if self.policy is not None: - result["policy"] = self.policy.to_dict() - if self.billing is not None: - result["billing"] = self.billing.to_dict() - if self.supported_reasoning_efforts is not None: - result["supportedReasoningEfforts"] = self.supported_reasoning_efforts - if self.default_reasoning_effort is not None: - result["defaultReasoningEffort"] = self.default_reasoning_effort - return result - - -@dataclass -class SessionContext: - """Working directory context for a session""" - - cwd: str # Working directory where the session was created - gitRoot: str | None = None # Git repository root (if in a git repo) - repository: str | None = None # GitHub repository in "owner/repo" format - branch: str | None = None # Current git branch - - @staticmethod - def from_dict(obj: Any) -> SessionContext: - assert isinstance(obj, dict) - cwd = obj.get("cwd") - if cwd is None: - raise ValueError("Missing required field 'cwd' in SessionContext") - return SessionContext( - cwd=str(cwd), - gitRoot=obj.get("gitRoot"), - repository=obj.get("repository"), - branch=obj.get("branch"), - ) - - def to_dict(self) -> dict: - result: dict = {"cwd": self.cwd} - if self.gitRoot is not None: - result["gitRoot"] = self.gitRoot - if self.repository is not None: - result["repository"] = self.repository - if self.branch is not None: - result["branch"] = self.branch - return result - - -@dataclass -class SessionListFilter: - """Filter options for listing sessions""" - - cwd: str | None = None # Filter by exact cwd match - gitRoot: str | None = None # Filter by git root - repository: str | None = None # Filter by repository (owner/repo format) - branch: str | None = None # Filter by branch - - def to_dict(self) -> dict: - result: dict = {} - if self.cwd is not None: - result["cwd"] = self.cwd - if self.gitRoot is not None: - result["gitRoot"] = self.gitRoot - if self.repository is not None: - result["repository"] = self.repository - if self.branch is not None: - result["branch"] = self.branch - return result - - -@dataclass -class SessionMetadata: - """Metadata about a session""" - - sessionId: str # Session identifier - startTime: str # ISO 8601 timestamp when session was created - modifiedTime: str # ISO 8601 timestamp when session was last modified - isRemote: bool # Whether the session is remote - summary: str | None = None # Optional summary of the session - context: SessionContext | None = None # Working directory context - - @staticmethod - def from_dict(obj: Any) -> SessionMetadata: - assert isinstance(obj, dict) - sessionId = obj.get("sessionId") - startTime = obj.get("startTime") - modifiedTime = obj.get("modifiedTime") - isRemote = obj.get("isRemote") - if sessionId is None or startTime is None or modifiedTime is None or isRemote is None: - raise ValueError( - f"Missing required fields in SessionMetadata: sessionId={sessionId}, " - f"startTime={startTime}, modifiedTime={modifiedTime}, isRemote={isRemote}" - ) - summary = obj.get("summary") - context_dict = obj.get("context") - context = SessionContext.from_dict(context_dict) if context_dict else None - return SessionMetadata( - sessionId=str(sessionId), - startTime=str(startTime), - modifiedTime=str(modifiedTime), - isRemote=bool(isRemote), - summary=summary, - context=context, - ) - - def to_dict(self) -> dict: - result: dict = {} - result["sessionId"] = self.sessionId - result["startTime"] = self.startTime - result["modifiedTime"] = self.modifiedTime - result["isRemote"] = self.isRemote - if self.summary is not None: - result["summary"] = self.summary - if self.context is not None: - result["context"] = self.context.to_dict() - return result - - -# Session Lifecycle Types (for TUI+server mode) - -SessionLifecycleEventType = Literal[ - "session.created", - "session.deleted", - "session.updated", - "session.foreground", - "session.background", -] - - -@dataclass -class SessionLifecycleEventMetadata: - """Metadata for session lifecycle events.""" - - startTime: str - modifiedTime: str - summary: str | None = None - - @staticmethod - def from_dict(data: dict) -> SessionLifecycleEventMetadata: - return SessionLifecycleEventMetadata( - startTime=data.get("startTime", ""), - modifiedTime=data.get("modifiedTime", ""), - summary=data.get("summary"), - ) - - -@dataclass -class SessionLifecycleEvent: - """Session lifecycle event notification.""" - - type: SessionLifecycleEventType - sessionId: str - metadata: SessionLifecycleEventMetadata | None = None - - @staticmethod - def from_dict(data: dict) -> SessionLifecycleEvent: - metadata = None - if "metadata" in data and data["metadata"]: - metadata = SessionLifecycleEventMetadata.from_dict(data["metadata"]) - return SessionLifecycleEvent( - type=data.get("type", "session.updated"), - sessionId=data.get("sessionId", ""), - metadata=metadata, - ) - - -# Handler types for session lifecycle events -SessionLifecycleHandler = Callable[[SessionLifecycleEvent], None] diff --git a/python/e2e/test_agent_and_compact_rpc.py b/python/e2e/test_agent_and_compact_rpc.py index ec5958676..29377d1cb 100644 --- a/python/e2e/test_agent_and_compact_rpc.py +++ b/python/e2e/test_agent_and_compact_rpc.py @@ -2,8 +2,10 @@ import pytest -from copilot import CopilotClient, PermissionHandler, SubprocessConfig +from copilot import CopilotClient +from copilot.client import SubprocessConfig from copilot.generated.rpc import SessionAgentSelectParams +from copilot.session import PermissionHandler from .testharness import CLI_PATH, E2ETestContext diff --git a/python/e2e/test_ask_user.py b/python/e2e/test_ask_user.py index b9800156b..55073ccfc 100644 --- a/python/e2e/test_ask_user.py +++ b/python/e2e/test_ask_user.py @@ -4,7 +4,7 @@ import pytest -from copilot import PermissionHandler +from copilot.session import PermissionHandler from .testharness import E2ETestContext diff --git a/python/e2e/test_client.py b/python/e2e/test_client.py index d7ec39dcd..29fd632c1 100644 --- a/python/e2e/test_client.py +++ b/python/e2e/test_client.py @@ -2,7 +2,9 @@ import pytest -from copilot import CopilotClient, PermissionHandler, StopError, SubprocessConfig +from copilot import CopilotClient +from copilot.client import StopError, SubprocessConfig +from copilot.session import PermissionHandler from .testharness import CLI_PATH diff --git a/python/e2e/test_compaction.py b/python/e2e/test_compaction.py index 131040705..a4dcd132d 100644 --- a/python/e2e/test_compaction.py +++ b/python/e2e/test_compaction.py @@ -2,8 +2,8 @@ import pytest -from copilot import PermissionHandler from copilot.generated.session_events import SessionEventType +from copilot.session import PermissionHandler from .testharness import E2ETestContext diff --git a/python/e2e/test_hooks.py b/python/e2e/test_hooks.py index a4956482c..8aa0f1a9f 100644 --- a/python/e2e/test_hooks.py +++ b/python/e2e/test_hooks.py @@ -4,7 +4,7 @@ import pytest -from copilot import PermissionHandler +from copilot.session import PermissionHandler from .testharness import E2ETestContext from .testharness.helper import write_file diff --git a/python/e2e/test_mcp_and_agents.py b/python/e2e/test_mcp_and_agents.py index 8fffbe889..a1ab8e2c0 100644 --- a/python/e2e/test_mcp_and_agents.py +++ b/python/e2e/test_mcp_and_agents.py @@ -6,7 +6,7 @@ import pytest -from copilot import CustomAgentConfig, MCPServerConfig, PermissionHandler +from copilot.session import CustomAgentConfig, MCPServerConfig, PermissionHandler from .testharness import E2ETestContext, get_final_assistant_message diff --git a/python/e2e/test_multi_client.py b/python/e2e/test_multi_client.py index cb5d90cd2..9a087ce11 100644 --- a/python/e2e/test_multi_client.py +++ b/python/e2e/test_multi_client.py @@ -13,15 +13,10 @@ import pytest_asyncio from pydantic import BaseModel, Field -from copilot import ( - CopilotClient, - ExternalServerConfig, - PermissionHandler, - PermissionRequestResult, - SubprocessConfig, - ToolInvocation, - define_tool, -) +from copilot import CopilotClient, define_tool +from copilot.client import ExternalServerConfig, SubprocessConfig +from copilot.session import PermissionHandler, PermissionRequestResult +from copilot.tools import ToolInvocation from .testharness import get_final_assistant_message from .testharness.proxy import CapiProxy diff --git a/python/e2e/test_permissions.py b/python/e2e/test_permissions.py index d18b15b2d..8b3aaffba 100644 --- a/python/e2e/test_permissions.py +++ b/python/e2e/test_permissions.py @@ -6,7 +6,7 @@ import pytest -from copilot import PermissionHandler, PermissionRequest, PermissionRequestResult +from copilot.session import PermissionHandler, PermissionRequest, PermissionRequestResult from .testharness import E2ETestContext from .testharness.helper import read_file, write_file diff --git a/python/e2e/test_rpc.py b/python/e2e/test_rpc.py index ddf843ba4..0c140f3cc 100644 --- a/python/e2e/test_rpc.py +++ b/python/e2e/test_rpc.py @@ -2,8 +2,10 @@ import pytest -from copilot import CopilotClient, PermissionHandler, SubprocessConfig +from copilot import CopilotClient +from copilot.client import SubprocessConfig from copilot.generated.rpc import PingParams +from copilot.session import PermissionHandler from .testharness import CLI_PATH, E2ETestContext diff --git a/python/e2e/test_session.py b/python/e2e/test_session.py index a2bc33bdb..ef03289b7 100644 --- a/python/e2e/test_session.py +++ b/python/e2e/test_session.py @@ -4,8 +4,10 @@ import pytest -from copilot import CopilotClient, PermissionHandler, SubprocessConfig -from copilot.types import Tool, ToolResult +from copilot import CopilotClient +from copilot.client import SubprocessConfig +from copilot.session import PermissionHandler +from copilot.tools import Tool, ToolResult from .testharness import E2ETestContext, get_final_assistant_message, get_next_event_of_type diff --git a/python/e2e/test_skills.py b/python/e2e/test_skills.py index 066669f29..058c3a616 100644 --- a/python/e2e/test_skills.py +++ b/python/e2e/test_skills.py @@ -7,7 +7,7 @@ import pytest -from copilot import PermissionHandler +from copilot.session import PermissionHandler from .testharness import E2ETestContext diff --git a/python/e2e/test_streaming_fidelity.py b/python/e2e/test_streaming_fidelity.py index 7f0d47e29..ae6f44e80 100644 --- a/python/e2e/test_streaming_fidelity.py +++ b/python/e2e/test_streaming_fidelity.py @@ -4,7 +4,9 @@ import pytest -from copilot import CopilotClient, PermissionHandler, SubprocessConfig +from copilot import CopilotClient +from copilot.client import SubprocessConfig +from copilot.session import PermissionHandler from .testharness import E2ETestContext diff --git a/python/e2e/test_tools.py b/python/e2e/test_tools.py index 5d5823d98..5a4eec4e9 100644 --- a/python/e2e/test_tools.py +++ b/python/e2e/test_tools.py @@ -5,12 +5,9 @@ import pytest from pydantic import BaseModel, Field -from copilot import ( - PermissionHandler, - PermissionRequestResult, - ToolInvocation, - define_tool, -) +from copilot import define_tool +from copilot.session import PermissionHandler, PermissionRequestResult +from copilot.tools import ToolInvocation from .testharness import E2ETestContext, get_final_assistant_message diff --git a/python/e2e/test_tools_unit.py b/python/e2e/test_tools_unit.py index c1a9163e1..c9c996f0e 100644 --- a/python/e2e/test_tools_unit.py +++ b/python/e2e/test_tools_unit.py @@ -5,8 +5,8 @@ import pytest from pydantic import BaseModel, Field -from copilot import ToolInvocation, ToolResult, define_tool -from copilot.tools import _normalize_result +from copilot import define_tool +from copilot.tools import ToolInvocation, ToolResult, _normalize_result class TestDefineTool: diff --git a/python/e2e/testharness/context.py b/python/e2e/testharness/context.py index 27dce38a1..6a4bac6d2 100644 --- a/python/e2e/testharness/context.py +++ b/python/e2e/testharness/context.py @@ -10,7 +10,8 @@ import tempfile from pathlib import Path -from copilot import CopilotClient, SubprocessConfig +from copilot import CopilotClient +from copilot.client import SubprocessConfig from .proxy import CapiProxy diff --git a/python/samples/chat.py b/python/samples/chat.py index 908a125d7..f9127d195 100644 --- a/python/samples/chat.py +++ b/python/samples/chat.py @@ -1,6 +1,7 @@ import asyncio -from copilot import CopilotClient, PermissionHandler +from copilot import CopilotClient +from copilot.session import PermissionHandler BLUE = "\033[34m" RESET = "\033[0m" diff --git a/python/test_client.py b/python/test_client.py index 9b7e8eb0f..44f5b82e2 100644 --- a/python/test_client.py +++ b/python/test_client.py @@ -6,15 +6,16 @@ import pytest -from copilot import ( - CopilotClient, +from copilot import CopilotClient, define_tool +from copilot.client import ( ExternalServerConfig, - PermissionHandler, - PermissionRequestResult, + ModelCapabilities, + ModelInfo, + ModelLimits, + ModelSupports, SubprocessConfig, - define_tool, ) -from copilot.types import ModelCapabilities, ModelInfo, ModelLimits, ModelSupports +from copilot.session import PermissionHandler, PermissionRequestResult from e2e.testharness import CLI_PATH diff --git a/python/test_telemetry.py b/python/test_telemetry.py index 2b4649011..8710d166d 100644 --- a/python/test_telemetry.py +++ b/python/test_telemetry.py @@ -4,8 +4,8 @@ from unittest.mock import patch +from copilot.client import SubprocessConfig, TelemetryConfig from copilot.telemetry import get_trace_context, trace_context -from copilot.types import SubprocessConfig, TelemetryConfig class TestGetTraceContext: diff --git a/test/scenarios/auth/byok-anthropic/python/main.py b/test/scenarios/auth/byok-anthropic/python/main.py index b76a82e2a..3ad893ba5 100644 --- a/test/scenarios/auth/byok-anthropic/python/main.py +++ b/test/scenarios/auth/byok-anthropic/python/main.py @@ -1,7 +1,8 @@ import asyncio import os import sys -from copilot import CopilotClient, SubprocessConfig +from copilot import CopilotClient +from copilot.client import SubprocessConfig ANTHROPIC_API_KEY = os.environ.get("ANTHROPIC_API_KEY") ANTHROPIC_MODEL = os.environ.get("ANTHROPIC_MODEL", "claude-sonnet-4-20250514") diff --git a/test/scenarios/auth/byok-azure/python/main.py b/test/scenarios/auth/byok-azure/python/main.py index f19729ab2..1ae214261 100644 --- a/test/scenarios/auth/byok-azure/python/main.py +++ b/test/scenarios/auth/byok-azure/python/main.py @@ -1,7 +1,8 @@ import asyncio import os import sys -from copilot import CopilotClient, SubprocessConfig +from copilot import CopilotClient +from copilot.client import SubprocessConfig AZURE_OPENAI_ENDPOINT = os.environ.get("AZURE_OPENAI_ENDPOINT") AZURE_OPENAI_API_KEY = os.environ.get("AZURE_OPENAI_API_KEY") diff --git a/test/scenarios/auth/byok-ollama/python/main.py b/test/scenarios/auth/byok-ollama/python/main.py index 517c1bee1..78019acd7 100644 --- a/test/scenarios/auth/byok-ollama/python/main.py +++ b/test/scenarios/auth/byok-ollama/python/main.py @@ -1,7 +1,8 @@ import asyncio import os import sys -from copilot import CopilotClient, SubprocessConfig +from copilot import CopilotClient +from copilot.client import SubprocessConfig OLLAMA_BASE_URL = os.environ.get("OLLAMA_BASE_URL", "http://localhost:11434/v1") OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "llama3.2:3b") diff --git a/test/scenarios/auth/byok-openai/python/main.py b/test/scenarios/auth/byok-openai/python/main.py index 7717982a0..8362963b2 100644 --- a/test/scenarios/auth/byok-openai/python/main.py +++ b/test/scenarios/auth/byok-openai/python/main.py @@ -1,7 +1,8 @@ import asyncio import os import sys -from copilot import CopilotClient, SubprocessConfig +from copilot import CopilotClient +from copilot.client import SubprocessConfig OPENAI_BASE_URL = os.environ.get("OPENAI_BASE_URL", "https://api.openai.com/v1") OPENAI_MODEL = os.environ.get("OPENAI_MODEL", "claude-haiku-4.5") diff --git a/test/scenarios/auth/gh-app/python/main.py b/test/scenarios/auth/gh-app/python/main.py index f4ea5a2e8..afba29254 100644 --- a/test/scenarios/auth/gh-app/python/main.py +++ b/test/scenarios/auth/gh-app/python/main.py @@ -4,7 +4,8 @@ import time import urllib.request -from copilot import CopilotClient, SubprocessConfig +from copilot import CopilotClient +from copilot.client import SubprocessConfig DEVICE_CODE_URL = "https://github.com/login/device/code" diff --git a/test/scenarios/bundling/app-backend-to-server/python/main.py b/test/scenarios/bundling/app-backend-to-server/python/main.py index 730fba01b..2684a30b8 100644 --- a/test/scenarios/bundling/app-backend-to-server/python/main.py +++ b/test/scenarios/bundling/app-backend-to-server/python/main.py @@ -5,7 +5,8 @@ import urllib.request from flask import Flask, request, jsonify -from copilot import CopilotClient, ExternalServerConfig +from copilot import CopilotClient +from copilot.client import ExternalServerConfig app = Flask(__name__) diff --git a/test/scenarios/bundling/app-direct-server/python/main.py b/test/scenarios/bundling/app-direct-server/python/main.py index ca366d93d..b441bec51 100644 --- a/test/scenarios/bundling/app-direct-server/python/main.py +++ b/test/scenarios/bundling/app-direct-server/python/main.py @@ -1,6 +1,7 @@ import asyncio import os -from copilot import CopilotClient, ExternalServerConfig +from copilot import CopilotClient +from copilot.client import ExternalServerConfig async def main(): diff --git a/test/scenarios/bundling/container-proxy/python/main.py b/test/scenarios/bundling/container-proxy/python/main.py index ca366d93d..b441bec51 100644 --- a/test/scenarios/bundling/container-proxy/python/main.py +++ b/test/scenarios/bundling/container-proxy/python/main.py @@ -1,6 +1,7 @@ import asyncio import os -from copilot import CopilotClient, ExternalServerConfig +from copilot import CopilotClient +from copilot.client import ExternalServerConfig async def main(): diff --git a/test/scenarios/bundling/fully-bundled/python/main.py b/test/scenarios/bundling/fully-bundled/python/main.py index 947e698ce..39ce2bb81 100644 --- a/test/scenarios/bundling/fully-bundled/python/main.py +++ b/test/scenarios/bundling/fully-bundled/python/main.py @@ -1,6 +1,7 @@ import asyncio import os -from copilot import CopilotClient, SubprocessConfig +from copilot import CopilotClient +from copilot.client import SubprocessConfig async def main(): diff --git a/test/scenarios/callbacks/hooks/python/main.py b/test/scenarios/callbacks/hooks/python/main.py index 4d0463b9d..dbfceb22a 100644 --- a/test/scenarios/callbacks/hooks/python/main.py +++ b/test/scenarios/callbacks/hooks/python/main.py @@ -1,6 +1,7 @@ import asyncio import os -from copilot import CopilotClient, SubprocessConfig +from copilot import CopilotClient +from copilot.client import SubprocessConfig hook_log: list[str] = [] diff --git a/test/scenarios/callbacks/permissions/python/main.py b/test/scenarios/callbacks/permissions/python/main.py index 3c4cb6625..de788e5fb 100644 --- a/test/scenarios/callbacks/permissions/python/main.py +++ b/test/scenarios/callbacks/permissions/python/main.py @@ -1,6 +1,7 @@ import asyncio import os -from copilot import CopilotClient, SubprocessConfig +from copilot import CopilotClient +from copilot.client import SubprocessConfig # Track which tools requested permission permission_log: list[str] = [] diff --git a/test/scenarios/callbacks/user-input/python/main.py b/test/scenarios/callbacks/user-input/python/main.py index 7a50431d7..0c23e6b15 100644 --- a/test/scenarios/callbacks/user-input/python/main.py +++ b/test/scenarios/callbacks/user-input/python/main.py @@ -1,6 +1,7 @@ import asyncio import os -from copilot import CopilotClient, SubprocessConfig +from copilot import CopilotClient +from copilot.client import SubprocessConfig input_log: list[str] = [] diff --git a/test/scenarios/modes/default/python/main.py b/test/scenarios/modes/default/python/main.py index 848076792..ece50a662 100644 --- a/test/scenarios/modes/default/python/main.py +++ b/test/scenarios/modes/default/python/main.py @@ -1,6 +1,7 @@ import asyncio import os -from copilot import CopilotClient, SubprocessConfig +from copilot import CopilotClient +from copilot.client import SubprocessConfig async def main(): diff --git a/test/scenarios/modes/minimal/python/main.py b/test/scenarios/modes/minimal/python/main.py index b225e6937..722c1e5e1 100644 --- a/test/scenarios/modes/minimal/python/main.py +++ b/test/scenarios/modes/minimal/python/main.py @@ -1,6 +1,7 @@ import asyncio import os -from copilot import CopilotClient, SubprocessConfig +from copilot import CopilotClient +from copilot.client import SubprocessConfig async def main(): diff --git a/test/scenarios/prompts/attachments/python/main.py b/test/scenarios/prompts/attachments/python/main.py index b51f95f75..fdf259c6a 100644 --- a/test/scenarios/prompts/attachments/python/main.py +++ b/test/scenarios/prompts/attachments/python/main.py @@ -1,6 +1,7 @@ import asyncio import os -from copilot import CopilotClient, SubprocessConfig +from copilot import CopilotClient +from copilot.client import SubprocessConfig SYSTEM_PROMPT = """You are a helpful assistant. Answer questions about attached files concisely.""" diff --git a/test/scenarios/prompts/reasoning-effort/python/main.py b/test/scenarios/prompts/reasoning-effort/python/main.py index 0900c7001..122f44895 100644 --- a/test/scenarios/prompts/reasoning-effort/python/main.py +++ b/test/scenarios/prompts/reasoning-effort/python/main.py @@ -1,6 +1,7 @@ import asyncio import os -from copilot import CopilotClient, SubprocessConfig +from copilot import CopilotClient +from copilot.client import SubprocessConfig async def main(): diff --git a/test/scenarios/prompts/system-message/python/main.py b/test/scenarios/prompts/system-message/python/main.py index 1fb1337ee..b77c1e4a1 100644 --- a/test/scenarios/prompts/system-message/python/main.py +++ b/test/scenarios/prompts/system-message/python/main.py @@ -1,6 +1,7 @@ import asyncio import os -from copilot import CopilotClient, SubprocessConfig +from copilot import CopilotClient +from copilot.client import SubprocessConfig PIRATE_PROMPT = """You are a pirate. Always respond in pirate speak. Say 'Arrr!' in every response. Use nautical terms and pirate slang throughout.""" diff --git a/test/scenarios/sessions/concurrent-sessions/python/main.py b/test/scenarios/sessions/concurrent-sessions/python/main.py index 4c053d730..a32dc5e10 100644 --- a/test/scenarios/sessions/concurrent-sessions/python/main.py +++ b/test/scenarios/sessions/concurrent-sessions/python/main.py @@ -1,6 +1,7 @@ import asyncio import os -from copilot import CopilotClient, SubprocessConfig +from copilot import CopilotClient +from copilot.client import SubprocessConfig PIRATE_PROMPT = "You are a pirate. Always say Arrr!" ROBOT_PROMPT = "You are a robot. Always say BEEP BOOP!" diff --git a/test/scenarios/sessions/infinite-sessions/python/main.py b/test/scenarios/sessions/infinite-sessions/python/main.py index 96135df31..724dc155d 100644 --- a/test/scenarios/sessions/infinite-sessions/python/main.py +++ b/test/scenarios/sessions/infinite-sessions/python/main.py @@ -1,6 +1,7 @@ import asyncio import os -from copilot import CopilotClient, SubprocessConfig +from copilot import CopilotClient +from copilot.client import SubprocessConfig async def main(): diff --git a/test/scenarios/sessions/session-resume/python/main.py b/test/scenarios/sessions/session-resume/python/main.py index 818f5adb8..ccb9c69f0 100644 --- a/test/scenarios/sessions/session-resume/python/main.py +++ b/test/scenarios/sessions/session-resume/python/main.py @@ -1,6 +1,7 @@ import asyncio import os -from copilot import CopilotClient, SubprocessConfig +from copilot import CopilotClient +from copilot.client import SubprocessConfig async def main(): diff --git a/test/scenarios/sessions/streaming/python/main.py b/test/scenarios/sessions/streaming/python/main.py index 610d5f08d..e2312cd14 100644 --- a/test/scenarios/sessions/streaming/python/main.py +++ b/test/scenarios/sessions/streaming/python/main.py @@ -1,6 +1,7 @@ import asyncio import os -from copilot import CopilotClient, SubprocessConfig +from copilot import CopilotClient +from copilot.client import SubprocessConfig async def main(): diff --git a/test/scenarios/tools/custom-agents/python/main.py b/test/scenarios/tools/custom-agents/python/main.py index 97762bb10..d4c45950f 100644 --- a/test/scenarios/tools/custom-agents/python/main.py +++ b/test/scenarios/tools/custom-agents/python/main.py @@ -1,6 +1,7 @@ import asyncio import os -from copilot import CopilotClient, SubprocessConfig +from copilot import CopilotClient +from copilot.client import SubprocessConfig async def main(): diff --git a/test/scenarios/tools/mcp-servers/python/main.py b/test/scenarios/tools/mcp-servers/python/main.py index 5d17903dc..2fa81b82d 100644 --- a/test/scenarios/tools/mcp-servers/python/main.py +++ b/test/scenarios/tools/mcp-servers/python/main.py @@ -1,6 +1,7 @@ import asyncio import os -from copilot import CopilotClient, SubprocessConfig +from copilot import CopilotClient +from copilot.client import SubprocessConfig async def main(): diff --git a/test/scenarios/tools/no-tools/python/main.py b/test/scenarios/tools/no-tools/python/main.py index 1cd2e1438..c3eeb6a17 100644 --- a/test/scenarios/tools/no-tools/python/main.py +++ b/test/scenarios/tools/no-tools/python/main.py @@ -1,6 +1,7 @@ import asyncio import os -from copilot import CopilotClient, SubprocessConfig +from copilot import CopilotClient +from copilot.client import SubprocessConfig SYSTEM_PROMPT = """You are a minimal assistant with no tools available. You cannot execute code, read files, edit files, search, or perform any actions. diff --git a/test/scenarios/tools/skills/python/main.py b/test/scenarios/tools/skills/python/main.py index 00e8506a7..1608a7a9e 100644 --- a/test/scenarios/tools/skills/python/main.py +++ b/test/scenarios/tools/skills/python/main.py @@ -2,7 +2,8 @@ import os from pathlib import Path -from copilot import CopilotClient, SubprocessConfig +from copilot import CopilotClient +from copilot.client import SubprocessConfig async def main(): diff --git a/test/scenarios/tools/tool-filtering/python/main.py b/test/scenarios/tools/tool-filtering/python/main.py index 95c22dda1..9da4ca571 100644 --- a/test/scenarios/tools/tool-filtering/python/main.py +++ b/test/scenarios/tools/tool-filtering/python/main.py @@ -1,6 +1,7 @@ import asyncio import os -from copilot import CopilotClient, SubprocessConfig +from copilot import CopilotClient +from copilot.client import SubprocessConfig SYSTEM_PROMPT = """You are a helpful assistant. You have access to a limited set of tools. When asked about your tools, list exactly which tools you have available.""" diff --git a/test/scenarios/tools/tool-overrides/python/main.py b/test/scenarios/tools/tool-overrides/python/main.py index 2170fbe62..c1d65baf3 100644 --- a/test/scenarios/tools/tool-overrides/python/main.py +++ b/test/scenarios/tools/tool-overrides/python/main.py @@ -3,7 +3,9 @@ from pydantic import BaseModel, Field -from copilot import CopilotClient, PermissionHandler, SubprocessConfig, define_tool +from copilot import CopilotClient, define_tool +from copilot.client import SubprocessConfig +from copilot.session import PermissionHandler class GrepParams(BaseModel): diff --git a/test/scenarios/tools/virtual-filesystem/python/main.py b/test/scenarios/tools/virtual-filesystem/python/main.py index 9aba683cc..6bb5e3d2f 100644 --- a/test/scenarios/tools/virtual-filesystem/python/main.py +++ b/test/scenarios/tools/virtual-filesystem/python/main.py @@ -1,6 +1,7 @@ import asyncio import os -from copilot import CopilotClient, SubprocessConfig, define_tool +from copilot import CopilotClient, define_tool +from copilot.client import SubprocessConfig from pydantic import BaseModel, Field # In-memory virtual filesystem diff --git a/test/scenarios/transport/reconnect/python/main.py b/test/scenarios/transport/reconnect/python/main.py index 4c5b39b83..d1d4505a8 100644 --- a/test/scenarios/transport/reconnect/python/main.py +++ b/test/scenarios/transport/reconnect/python/main.py @@ -1,7 +1,8 @@ import asyncio import os import sys -from copilot import CopilotClient, ExternalServerConfig +from copilot import CopilotClient +from copilot.client import ExternalServerConfig async def main(): diff --git a/test/scenarios/transport/stdio/python/main.py b/test/scenarios/transport/stdio/python/main.py index 947e698ce..39ce2bb81 100644 --- a/test/scenarios/transport/stdio/python/main.py +++ b/test/scenarios/transport/stdio/python/main.py @@ -1,6 +1,7 @@ import asyncio import os -from copilot import CopilotClient, SubprocessConfig +from copilot import CopilotClient +from copilot.client import SubprocessConfig async def main(): diff --git a/test/scenarios/transport/tcp/python/main.py b/test/scenarios/transport/tcp/python/main.py index ca366d93d..b441bec51 100644 --- a/test/scenarios/transport/tcp/python/main.py +++ b/test/scenarios/transport/tcp/python/main.py @@ -1,6 +1,7 @@ import asyncio import os -from copilot import CopilotClient, ExternalServerConfig +from copilot import CopilotClient +from copilot.client import ExternalServerConfig async def main(): From 0e7279a666b32c0e47d5a3c75b1365c150d4911d Mon Sep 17 00:00:00 2001 From: Brett Cannon Date: Thu, 19 Mar 2026 10:48:57 -0700 Subject: [PATCH 2/8] fix: reorder import statements in test_telemetry.py --- python/test_telemetry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/test_telemetry.py b/python/test_telemetry.py index 228004700..d10ffeb9f 100644 --- a/python/test_telemetry.py +++ b/python/test_telemetry.py @@ -4,8 +4,8 @@ from unittest.mock import patch -from copilot.client import SubprocessConfig, TelemetryConfig from copilot._telemetry import get_trace_context, trace_context +from copilot.client import SubprocessConfig, TelemetryConfig class TestGetTraceContext: From 6301535cd2711f193206c887bf60c02d0dd10de4 Mon Sep 17 00:00:00 2001 From: Steve Sanderson Date: Fri, 20 Mar 2026 16:22:29 +0000 Subject: [PATCH 3/8] fix: ruff format client.py and session.py Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- python/copilot/client.py | 43 +++++++++++++++++++++++---------------- python/copilot/session.py | 6 +----- 2 files changed, 27 insertions(+), 22 deletions(-) diff --git a/python/copilot/client.py b/python/copilot/client.py index 4a5632e5e..61603e597 100644 --- a/python/copilot/client.py +++ b/python/copilot/client.py @@ -686,10 +686,12 @@ class CopilotClient: >>> await client.start() >>> >>> # Create a session and send a message - >>> session = await client.create_session({ - ... "on_permission_request": PermissionHandler.approve_all, - ... "model": "gpt-4", - ... }) + >>> session = await client.create_session( + ... { + ... "on_permission_request": PermissionHandler.approve_all, + ... "model": "gpt-4", + ... } + ... ) >>> session.on(lambda event: print(event.type)) >>> await session.send("Hello!") >>> @@ -728,10 +730,12 @@ def __init__( >>> client = CopilotClient(ExternalServerConfig(url="localhost:3000")) >>> >>> # Custom CLI path with specific log level - >>> client = CopilotClient(SubprocessConfig( - ... cli_path="/usr/local/bin/copilot", - ... log_level="debug", - ... )) + >>> client = CopilotClient( + ... SubprocessConfig( + ... cli_path="/usr/local/bin/copilot", + ... log_level="debug", + ... ) + ... ) """ if config is None: config = SubprocessConfig() @@ -1033,11 +1037,13 @@ async def create_session(self, config: SessionConfig) -> CopilotSession: >>> session = await client.create_session(config) >>> >>> # Session with model and streaming - >>> session = await client.create_session({ - ... "on_permission_request": PermissionHandler.approve_all, - ... "model": "gpt-4", - ... "streaming": True - ... }) + >>> session = await client.create_session( + ... { + ... "on_permission_request": PermissionHandler.approve_all, + ... "model": "gpt-4", + ... "streaming": True, + ... } + ... ) """ if not self._client: if self._auto_start: @@ -1230,10 +1236,13 @@ async def resume_session(self, session_id: str, config: ResumeSessionConfig) -> >>> session = await client.resume_session("session-123", config) >>> >>> # Resume with new tools - >>> session = await client.resume_session("session-123", { - ... "on_permission_request": PermissionHandler.approve_all, - ... "tools": [my_new_tool] - ... }) + >>> session = await client.resume_session( + ... "session-123", + ... { + ... "on_permission_request": PermissionHandler.approve_all, + ... "tools": [my_new_tool], + ... }, + ... ) """ if not self._client: if self._auto_start: diff --git a/python/copilot/session.py b/python/copilot/session.py index 936c07d6c..602c31cb7 100644 --- a/python/copilot/session.py +++ b/python/copilot/session.py @@ -757,9 +757,7 @@ def on(self, handler: Callable[[SessionEvent], None]) -> Callable[[], None]: ... print(f"Assistant: {event.data.content}") ... elif event.type == "session.error": ... print(f"Error: {event.data.message}") - ... >>> unsubscribe = session.on(handle_event) - ... >>> # Later, to stop receiving events: >>> unsubscribe() """ @@ -1242,9 +1240,7 @@ async def abort(self) -> None: >>> import asyncio >>> >>> # Start a long-running request - >>> task = asyncio.create_task( - ... session.send("Write a very long story...") - ... ) + >>> task = asyncio.create_task(session.send("Write a very long story...")) >>> >>> # Abort after 5 seconds >>> await asyncio.sleep(5) From 355e11c5774e73bf3d78d88b7d261985a19a7df6 Mon Sep 17 00:00:00 2001 From: Steve Sanderson Date: Fri, 20 Mar 2026 16:25:07 +0000 Subject: [PATCH 4/8] fix: update PermissionHandler import path in transform test Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- python/e2e/test_system_message_transform.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/e2e/test_system_message_transform.py b/python/e2e/test_system_message_transform.py index 9ae170637..8c7014445 100644 --- a/python/e2e/test_system_message_transform.py +++ b/python/e2e/test_system_message_transform.py @@ -6,7 +6,7 @@ import pytest -from copilot import PermissionHandler +from copilot.session import PermissionHandler from .testharness import E2ETestContext from .testharness.helper import write_file From 7a8535f7c66472f2bb6b23eb911b8f90dbb4e6c2 Mon Sep 17 00:00:00 2001 From: Steve Sanderson Date: Fri, 20 Mar 2026 16:47:19 +0000 Subject: [PATCH 5/8] Fixes after rebase --- docs/hooks/error-handling.md | 4 +- docs/hooks/post-tool-use.md | 4 +- docs/hooks/pre-tool-use.md | 4 +- docs/hooks/session-lifecycle.md | 8 +- docs/hooks/user-prompt-submitted.md | 4 +- python/copilot/client.py | 217 +++++++++++++++++++++++----- 6 files changed, 189 insertions(+), 52 deletions(-) diff --git a/docs/hooks/error-handling.md b/docs/hooks/error-handling.md index da97d5107..0cbebcbaa 100644 --- a/docs/hooks/error-handling.md +++ b/docs/hooks/error-handling.md @@ -39,14 +39,14 @@ from copilot.session import ErrorOccurredHookInput, ErrorOccurredHookOutput from typing import Callable, Awaitable ErrorOccurredHandler = Callable[ - [ErrorOccurredHookInput, HookInvocation], + [ErrorOccurredHookInput, dict[str, str]], Awaitable[ErrorOccurredHookOutput | None] ] ``` ```python ErrorOccurredHandler = Callable[ - [ErrorOccurredHookInput, HookInvocation], + [ErrorOccurredHookInput, dict[str, str]], Awaitable[ErrorOccurredHookOutput | None] ] ``` diff --git a/docs/hooks/post-tool-use.md b/docs/hooks/post-tool-use.md index fed1af727..5c4872f83 100644 --- a/docs/hooks/post-tool-use.md +++ b/docs/hooks/post-tool-use.md @@ -39,14 +39,14 @@ from copilot.session import PostToolUseHookInput, PostToolUseHookOutput from typing import Callable, Awaitable PostToolUseHandler = Callable[ - [PostToolUseHookInput, HookInvocation], + [PostToolUseHookInput, dict[str, str]], Awaitable[PostToolUseHookOutput | None] ] ``` ```python PostToolUseHandler = Callable[ - [PostToolUseHookInput, HookInvocation], + [PostToolUseHookInput, dict[str, str]], Awaitable[PostToolUseHookOutput | None] ] ``` diff --git a/docs/hooks/pre-tool-use.md b/docs/hooks/pre-tool-use.md index df23bbd55..16d485778 100644 --- a/docs/hooks/pre-tool-use.md +++ b/docs/hooks/pre-tool-use.md @@ -39,14 +39,14 @@ from copilot.session import PreToolUseHookInput, PreToolUseHookOutput from typing import Callable, Awaitable PreToolUseHandler = Callable[ - [PreToolUseHookInput, HookInvocation], + [PreToolUseHookInput, dict[str, str]], Awaitable[PreToolUseHookOutput | None] ] ``` ```python PreToolUseHandler = Callable[ - [PreToolUseHookInput, HookInvocation], + [PreToolUseHookInput, dict[str, str]], Awaitable[PreToolUseHookOutput | None] ] ``` diff --git a/docs/hooks/session-lifecycle.md b/docs/hooks/session-lifecycle.md index e86f5d7a9..6949de66d 100644 --- a/docs/hooks/session-lifecycle.md +++ b/docs/hooks/session-lifecycle.md @@ -43,14 +43,14 @@ from copilot.session import SessionStartHookInput, SessionStartHookOutput from typing import Callable, Awaitable SessionStartHandler = Callable[ - [SessionStartHookInput, HookInvocation], + [SessionStartHookInput, dict[str, str]], Awaitable[SessionStartHookOutput | None] ] ``` ```python SessionStartHandler = Callable[ - [SessionStartHookInput, HookInvocation], + [SessionStartHookInput, dict[str, str]], Awaitable[SessionStartHookOutput | None] ] ``` @@ -253,14 +253,14 @@ from copilot.session import SessionEndHookInput from typing import Callable, Awaitable SessionEndHandler = Callable[ - [SessionEndHookInput, HookInvocation], + [SessionEndHookInput, dict[str, str]], Awaitable[None] ] ``` ```python SessionEndHandler = Callable[ - [SessionEndHookInput, HookInvocation], + [SessionEndHookInput, dict[str, str]], Awaitable[SessionEndHookOutput | None] ] ``` diff --git a/docs/hooks/user-prompt-submitted.md b/docs/hooks/user-prompt-submitted.md index 89831e34b..80f786eb6 100644 --- a/docs/hooks/user-prompt-submitted.md +++ b/docs/hooks/user-prompt-submitted.md @@ -39,14 +39,14 @@ from copilot.session import UserPromptSubmittedHookInput, UserPromptSubmittedHoo from typing import Callable, Awaitable UserPromptSubmittedHandler = Callable[ - [UserPromptSubmittedHookInput, HookInvocation], + [UserPromptSubmittedHookInput, dict[str, str]], Awaitable[UserPromptSubmittedHookOutput | None] ] ``` ```python UserPromptSubmittedHandler = Callable[ - [UserPromptSubmittedHookInput, HookInvocation], + [UserPromptSubmittedHookInput, dict[str, str]], Awaitable[UserPromptSubmittedHookOutput | None] ] ``` diff --git a/python/copilot/client.py b/python/copilot/client.py index 61603e597..9b6b21bfa 100644 --- a/python/copilot/client.py +++ b/python/copilot/client.py @@ -32,15 +32,20 @@ from ._sdk_protocol_version import get_sdk_protocol_version from ._telemetry import get_trace_context, trace_context from .generated.rpc import ServerRpc -from .generated.session_events import PermissionRequest, session_event_from_dict +from .generated.session_events import PermissionRequest, SessionEvent, session_event_from_dict from .session import ( CopilotSession, CustomAgentConfig, + InfiniteSessionConfig, + MCPServerConfig, ProviderConfig, - ResumeSessionConfig, - SessionConfig, + ReasoningEffort, + SessionHooks, + SystemMessageConfig, + UserInputHandler, + _PermissionHandlerFn, ) -from .tools import ToolInvocation, ToolResult +from .tools import Tool, ToolInvocation, ToolResult # ============================================================================ # Connection Types @@ -1013,7 +1018,32 @@ async def force_stop(self) -> None: if not self._is_external_server: self._actual_port = None - async def create_session(self, config: SessionConfig) -> CopilotSession: + async def create_session( + self, + *, + on_permission_request: _PermissionHandlerFn, + model: str | None = None, + session_id: str | None = None, + client_name: str | None = None, + reasoning_effort: ReasoningEffort | None = None, + tools: list[Tool] | None = None, + system_message: SystemMessageConfig | None = None, + available_tools: list[str] | None = None, + excluded_tools: list[str] | None = None, + on_user_input_request: UserInputHandler | None = None, + hooks: SessionHooks | None = None, + working_directory: str | None = None, + provider: ProviderConfig | None = None, + streaming: bool | None = None, + mcp_servers: dict[str, MCPServerConfig] | None = None, + custom_agents: list[CustomAgentConfig] | None = None, + agent: str | None = None, + config_dir: str | None = None, + skill_directories: list[str] | None = None, + disabled_skills: list[str] | None = None, + infinite_sessions: InfiniteSessionConfig | None = None, + on_event: Callable[[SessionEvent], None] | None = None, + ) -> CopilotSession: """ Create a new conversation session with the Copilot CLI. @@ -1022,43 +1052,84 @@ async def create_session(self, config: SessionConfig) -> CopilotSession: automatically start the connection. Args: - config: Optional configuration for the session, including model selection, - custom tools, system messages, and more. + on_permission_request: Handler for permission requests. Use + ``PermissionHandler.approve_all`` to allow all permissions. + model: The model to use for the session (e.g. ``"gpt-4"``). + session_id: Optional session ID. If not provided, a UUID is generated. + client_name: Optional client name for identification. + reasoning_effort: Reasoning effort level for the model. + tools: Custom tools to register with the session. + system_message: System message configuration. + available_tools: Allowlist of built-in tools to enable. + excluded_tools: List of built-in tools to disable. + on_user_input_request: Handler for user input requests. + hooks: Lifecycle hooks for the session. + working_directory: Working directory for the session. + provider: Provider configuration for Azure or custom endpoints. + streaming: Whether to enable streaming responses. + mcp_servers: MCP server configurations. + custom_agents: Custom agent configurations. + agent: Agent to use for the session. + config_dir: Override for the configuration directory. + skill_directories: Directories to search for skills. + disabled_skills: Skills to disable. + infinite_sessions: Infinite session configuration. + on_event: Callback for session events. Returns: A :class:`CopilotSession` instance for the new session. Raises: RuntimeError: If the client is not connected and auto_start is disabled. + ValueError: If ``on_permission_request`` is not a valid callable. Example: - >>> # Basic session - >>> config = {"on_permission_request": PermissionHandler.approve_all} - >>> session = await client.create_session(config) + >>> session = await client.create_session( + ... on_permission_request=PermissionHandler.approve_all, + ... ) >>> >>> # Session with model and streaming >>> session = await client.create_session( - ... { - ... "on_permission_request": PermissionHandler.approve_all, - ... "model": "gpt-4", - ... "streaming": True, - ... } + ... on_permission_request=PermissionHandler.approve_all, + ... model="gpt-4", + ... streaming=True, ... ) """ + if not on_permission_request or not callable(on_permission_request): + raise ValueError( + "A valid on_permission_request handler is required. " + "Use PermissionHandler.approve_all or provide a custom handler." + ) if not self._client: if self._auto_start: await self.start() else: raise RuntimeError("Client not connected. Call start() first.") - cfg = config - - if not cfg.get("on_permission_request"): - raise ValueError( - "An on_permission_request handler is required when creating a session. " - "For example, to allow all permissions, use " - '{"on_permission_request": PermissionHandler.approve_all}.' - ) + cfg: dict[str, Any] = { + "on_permission_request": on_permission_request, + "model": model, + "session_id": session_id, + "client_name": client_name, + "reasoning_effort": reasoning_effort, + "tools": tools, + "system_message": system_message, + "available_tools": available_tools, + "excluded_tools": excluded_tools, + "on_user_input_request": on_user_input_request, + "hooks": hooks, + "working_directory": working_directory, + "provider": provider, + "streaming": streaming, + "mcp_servers": mcp_servers, + "custom_agents": custom_agents, + "agent": agent, + "config_dir": config_dir, + "skill_directories": skill_directories, + "disabled_skills": disabled_skills, + "infinite_sessions": infinite_sessions, + "on_event": on_event, + } tool_defs = [] tools = cfg.get("tools") @@ -1212,7 +1283,32 @@ async def create_session(self, config: SessionConfig) -> CopilotSession: return session - async def resume_session(self, session_id: str, config: ResumeSessionConfig) -> CopilotSession: + async def resume_session( + self, + session_id: str, + *, + on_permission_request: _PermissionHandlerFn, + model: str | None = None, + client_name: str | None = None, + reasoning_effort: ReasoningEffort | None = None, + tools: list[Tool] | None = None, + system_message: SystemMessageConfig | None = None, + available_tools: list[str] | None = None, + excluded_tools: list[str] | None = None, + on_user_input_request: UserInputHandler | None = None, + hooks: SessionHooks | None = None, + working_directory: str | None = None, + provider: ProviderConfig | None = None, + streaming: bool | None = None, + mcp_servers: dict[str, MCPServerConfig] | None = None, + custom_agents: list[CustomAgentConfig] | None = None, + agent: str | None = None, + config_dir: str | None = None, + skill_directories: list[str] | None = None, + disabled_skills: list[str] | None = None, + infinite_sessions: InfiniteSessionConfig | None = None, + on_event: Callable[[SessionEvent], None] | None = None, + ) -> CopilotSession: """ Resume an existing conversation session by its ID. @@ -1222,42 +1318,83 @@ async def resume_session(self, session_id: str, config: ResumeSessionConfig) -> Args: session_id: The ID of the session to resume. - config: Optional configuration for the resumed session. + on_permission_request: Handler for permission requests. Use + ``PermissionHandler.approve_all`` to allow all permissions. + model: The model to use for the resumed session. + client_name: Optional client name for identification. + reasoning_effort: Reasoning effort level for the model. + tools: Custom tools to register with the session. + system_message: System message configuration. + available_tools: Allowlist of built-in tools to enable. + excluded_tools: List of built-in tools to disable. + on_user_input_request: Handler for user input requests. + hooks: Lifecycle hooks for the session. + working_directory: Working directory for the session. + provider: Provider configuration for Azure or custom endpoints. + streaming: Whether to enable streaming responses. + mcp_servers: MCP server configurations. + custom_agents: Custom agent configurations. + agent: Agent to use for the session. + config_dir: Override for the configuration directory. + skill_directories: Directories to search for skills. + disabled_skills: Skills to disable. + infinite_sessions: Infinite session configuration. + on_event: Callback for session events. Returns: A :class:`CopilotSession` instance for the resumed session. Raises: RuntimeError: If the session does not exist or the client is not connected. + ValueError: If ``on_permission_request`` is not a valid callable. Example: - >>> # Resume a previous session - >>> config = {"on_permission_request": PermissionHandler.approve_all} - >>> session = await client.resume_session("session-123", config) + >>> session = await client.resume_session( + ... "session-123", + ... on_permission_request=PermissionHandler.approve_all, + ... ) >>> >>> # Resume with new tools >>> session = await client.resume_session( ... "session-123", - ... { - ... "on_permission_request": PermissionHandler.approve_all, - ... "tools": [my_new_tool], - ... }, + ... on_permission_request=PermissionHandler.approve_all, + ... tools=[my_new_tool], ... ) """ + if not on_permission_request or not callable(on_permission_request): + raise ValueError( + "A valid on_permission_request handler is required. " + "Use PermissionHandler.approve_all or provide a custom handler." + ) if not self._client: if self._auto_start: await self.start() else: raise RuntimeError("Client not connected. Call start() first.") - cfg = config - - if not cfg.get("on_permission_request"): - raise ValueError( - "An on_permission_request handler is required when resuming a session. " - "For example, to allow all permissions, use " - '{"on_permission_request": PermissionHandler.approve_all}.' - ) + cfg: dict[str, Any] = { + "on_permission_request": on_permission_request, + "model": model, + "client_name": client_name, + "reasoning_effort": reasoning_effort, + "tools": tools, + "system_message": system_message, + "available_tools": available_tools, + "excluded_tools": excluded_tools, + "on_user_input_request": on_user_input_request, + "hooks": hooks, + "working_directory": working_directory, + "provider": provider, + "streaming": streaming, + "mcp_servers": mcp_servers, + "custom_agents": custom_agents, + "agent": agent, + "config_dir": config_dir, + "skill_directories": skill_directories, + "disabled_skills": disabled_skills, + "infinite_sessions": infinite_sessions, + "on_event": on_event, + } tool_defs = [] tools = cfg.get("tools") From 6df70f2331b76d16ed7df98458702102f4b6b0ed Mon Sep 17 00:00:00 2001 From: Steve Sanderson Date: Fri, 20 Mar 2026 16:53:57 +0000 Subject: [PATCH 6/8] fix: use keyword params directly in create_session/resume_session bodies Remove cfg dict intermediary and use keyword parameters directly, fixing ty type checker errors where cfg.get() returned Any | None and shadowed the typed parameter variables. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- python/copilot/client.py | 154 +++++---------------------------------- 1 file changed, 18 insertions(+), 136 deletions(-) diff --git a/python/copilot/client.py b/python/copilot/client.py index 9b6b21bfa..f50408eb5 100644 --- a/python/copilot/client.py +++ b/python/copilot/client.py @@ -1106,33 +1106,7 @@ async def create_session( else: raise RuntimeError("Client not connected. Call start() first.") - cfg: dict[str, Any] = { - "on_permission_request": on_permission_request, - "model": model, - "session_id": session_id, - "client_name": client_name, - "reasoning_effort": reasoning_effort, - "tools": tools, - "system_message": system_message, - "available_tools": available_tools, - "excluded_tools": excluded_tools, - "on_user_input_request": on_user_input_request, - "hooks": hooks, - "working_directory": working_directory, - "provider": provider, - "streaming": streaming, - "mcp_servers": mcp_servers, - "custom_agents": custom_agents, - "agent": agent, - "config_dir": config_dir, - "skill_directories": skill_directories, - "disabled_skills": disabled_skills, - "infinite_sessions": infinite_sessions, - "on_event": on_event, - } - tool_defs = [] - tools = cfg.get("tools") if tools: for tool in tools: definition: dict[str, Any] = { @@ -1148,92 +1122,74 @@ async def create_session( tool_defs.append(definition) payload: dict[str, Any] = {} - if cfg.get("model"): - payload["model"] = cfg["model"] - if cfg.get("client_name"): - payload["clientName"] = cfg["client_name"] - if cfg.get("reasoning_effort"): - payload["reasoningEffort"] = cfg["reasoning_effort"] + if model: + payload["model"] = model + if client_name: + payload["clientName"] = client_name + if reasoning_effort: + payload["reasoningEffort"] = reasoning_effort if tool_defs: payload["tools"] = tool_defs - # Add system message configuration if provided - system_message = cfg.get("system_message") if system_message: payload["systemMessage"] = system_message - # Add tool filtering options - available_tools = cfg.get("available_tools") if available_tools is not None: payload["availableTools"] = available_tools - excluded_tools = cfg.get("excluded_tools") if excluded_tools is not None: payload["excludedTools"] = excluded_tools - # Always enable permission request callback (deny by default if no handler provided) - on_permission_request = cfg.get("on_permission_request") + # Always enable permission request callback payload["requestPermission"] = True # Enable user input request callback if handler provided - on_user_input_request = cfg.get("on_user_input_request") if on_user_input_request: payload["requestUserInput"] = True # Enable hooks callback if any hook handler provided - hooks = cfg.get("hooks") if hooks and any(hooks.values()): payload["hooks"] = True # Add working directory if provided - working_directory = cfg.get("working_directory") if working_directory: payload["workingDirectory"] = working_directory # Add streaming option if provided - streaming = cfg.get("streaming") if streaming is not None: payload["streaming"] = streaming # Add provider configuration if provided - provider = cfg.get("provider") if provider: payload["provider"] = self._convert_provider_to_wire_format(provider) # Add MCP servers configuration if provided - mcp_servers = cfg.get("mcp_servers") if mcp_servers: payload["mcpServers"] = mcp_servers payload["envValueMode"] = "direct" # Add custom agents configuration if provided - custom_agents = cfg.get("custom_agents") if custom_agents: payload["customAgents"] = [ self._convert_custom_agent_to_wire_format(agent) for agent in custom_agents ] # Add agent selection if provided - agent = cfg.get("agent") if agent: payload["agent"] = agent # Add config directory override if provided - config_dir = cfg.get("config_dir") if config_dir: payload["configDir"] = config_dir # Add skill directories configuration if provided - skill_directories = cfg.get("skill_directories") if skill_directories: payload["skillDirectories"] = skill_directories # Add disabled skills configuration if provided - disabled_skills = cfg.get("disabled_skills") if disabled_skills: payload["disabledSkills"] = disabled_skills # Add infinite sessions configuration if provided - infinite_sessions = cfg.get("infinite_sessions") if infinite_sessions: wire_config: dict[str, Any] = {} if "enabled" in infinite_sessions: @@ -1251,8 +1207,8 @@ async def create_session( if not self._client: raise RuntimeError("Client not connected") - session_id = cfg.get("session_id") or str(uuid.uuid4()) - payload["sessionId"] = session_id + actual_session_id = session_id or str(uuid.uuid4()) + payload["sessionId"] = actual_session_id # Propagate W3C Trace Context to CLI if OpenTelemetry is active trace_ctx = get_trace_context() @@ -1260,25 +1216,24 @@ async def create_session( # Create and register the session before issuing the RPC so that # events emitted by the CLI (e.g. session.start) are not dropped. - session = CopilotSession(session_id, self._client, None) + session = CopilotSession(actual_session_id, self._client, None) session._register_tools(tools) session._register_permission_handler(on_permission_request) if on_user_input_request: session._register_user_input_handler(on_user_input_request) if hooks: session._register_hooks(hooks) - on_event = cfg.get("on_event") if on_event: session.on(on_event) with self._sessions_lock: - self._sessions[session_id] = session + self._sessions[actual_session_id] = session try: response = await self._client.request("session.create", payload) session._workspace_path = response.get("workspacePath") except BaseException: with self._sessions_lock: - self._sessions.pop(session_id, None) + self._sessions.pop(actual_session_id, None) raise return session @@ -1372,32 +1327,7 @@ async def resume_session( else: raise RuntimeError("Client not connected. Call start() first.") - cfg: dict[str, Any] = { - "on_permission_request": on_permission_request, - "model": model, - "client_name": client_name, - "reasoning_effort": reasoning_effort, - "tools": tools, - "system_message": system_message, - "available_tools": available_tools, - "excluded_tools": excluded_tools, - "on_user_input_request": on_user_input_request, - "hooks": hooks, - "working_directory": working_directory, - "provider": provider, - "streaming": streaming, - "mcp_servers": mcp_servers, - "custom_agents": custom_agents, - "agent": agent, - "config_dir": config_dir, - "skill_directories": skill_directories, - "disabled_skills": disabled_skills, - "infinite_sessions": infinite_sessions, - "on_event": on_event, - } - tool_defs = [] - tools = cfg.get("tools") if tools: for tool in tools: definition: dict[str, Any] = { @@ -1414,103 +1344,56 @@ async def resume_session( payload: dict[str, Any] = {"sessionId": session_id} - # Add client name if provided - client_name = cfg.get("client_name") if client_name: payload["clientName"] = client_name - - # Add model if provided - model = cfg.get("model") if model: payload["model"] = model - - if cfg.get("reasoning_effort"): - payload["reasoningEffort"] = cfg["reasoning_effort"] + if reasoning_effort: + payload["reasoningEffort"] = reasoning_effort if tool_defs: payload["tools"] = tool_defs - - # Add system message configuration if provided - system_message = cfg.get("system_message") if system_message: payload["systemMessage"] = system_message - - # Add available/excluded tools if provided - available_tools = cfg.get("available_tools") if available_tools is not None: payload["availableTools"] = available_tools - - excluded_tools = cfg.get("excluded_tools") if excluded_tools is not None: payload["excludedTools"] = excluded_tools - - provider = cfg.get("provider") if provider: payload["provider"] = self._convert_provider_to_wire_format(provider) - - # Add streaming option if provided - streaming = cfg.get("streaming") if streaming is not None: payload["streaming"] = streaming - # Always enable permission request callback (deny by default if no handler provided) - on_permission_request = cfg.get("on_permission_request") + # Always enable permission request callback payload["requestPermission"] = True - # Enable user input request callback if handler provided - on_user_input_request = cfg.get("on_user_input_request") if on_user_input_request: payload["requestUserInput"] = True - # Enable hooks callback if any hook handler provided - hooks = cfg.get("hooks") if hooks and any(hooks.values()): payload["hooks"] = True - # Add working directory if provided - working_directory = cfg.get("working_directory") if working_directory: payload["workingDirectory"] = working_directory - - # Add config directory if provided - config_dir = cfg.get("config_dir") if config_dir: payload["configDir"] = config_dir - # Add disable resume flag if provided - disable_resume = cfg.get("disable_resume") - if disable_resume: - payload["disableResume"] = True - - # Add MCP servers configuration if provided - mcp_servers = cfg.get("mcp_servers") + # TODO: disable_resume is not a keyword arg yet; keeping for future use if mcp_servers: payload["mcpServers"] = mcp_servers payload["envValueMode"] = "direct" - # Add custom agents configuration if provided - custom_agents = cfg.get("custom_agents") if custom_agents: payload["customAgents"] = [ - self._convert_custom_agent_to_wire_format(agent) for agent in custom_agents + self._convert_custom_agent_to_wire_format(a) for a in custom_agents ] - # Add agent selection if provided - agent = cfg.get("agent") if agent: payload["agent"] = agent - - # Add skill directories configuration if provided - skill_directories = cfg.get("skill_directories") if skill_directories: payload["skillDirectories"] = skill_directories - - # Add disabled skills configuration if provided - disabled_skills = cfg.get("disabled_skills") if disabled_skills: payload["disabledSkills"] = disabled_skills - # Add infinite sessions configuration if provided - infinite_sessions = cfg.get("infinite_sessions") if infinite_sessions: wire_config: dict[str, Any] = {} if "enabled" in infinite_sessions: @@ -1535,13 +1418,12 @@ async def resume_session( # Create and register the session before issuing the RPC so that # events emitted by the CLI (e.g. session.start) are not dropped. session = CopilotSession(session_id, self._client, None) - session._register_tools(cfg.get("tools")) + session._register_tools(tools) session._register_permission_handler(on_permission_request) if on_user_input_request: session._register_user_input_handler(on_user_input_request) if hooks: session._register_hooks(hooks) - on_event = cfg.get("on_event") if on_event: session.on(on_event) with self._sessions_lock: From 362182f547cc165828d3280a031c2459c0f1ea50 Mon Sep 17 00:00:00 2001 From: Steve Sanderson Date: Fri, 20 Mar 2026 17:05:43 +0000 Subject: [PATCH 7/8] fix: restore system message transform support lost during rebase Add back SectionTransformFn type, _extract_transform_callbacks helper, _handle_system_message_transform handler, and systemMessage.transform RPC registration that were part of PR #816 but lost during rebase. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- python/copilot/client.py | 70 ++++++++++++++++++++++++++++++++++++--- python/copilot/session.py | 38 +++++++++++++++++++++ 2 files changed, 104 insertions(+), 4 deletions(-) diff --git a/python/copilot/client.py b/python/copilot/client.py index f50408eb5..c3bb0b29d 100644 --- a/python/copilot/client.py +++ b/python/copilot/client.py @@ -40,6 +40,7 @@ MCPServerConfig, ProviderConfig, ReasoningEffort, + SectionTransformFn, SessionHooks, SystemMessageConfig, UserInputHandler, @@ -674,6 +675,40 @@ def _get_bundled_cli_path() -> str | None: return None +def _extract_transform_callbacks( + system_message: dict | None, +) -> tuple[dict | None, dict[str, SectionTransformFn] | None]: + """Extract function-valued actions from system message config. + + Returns a wire-safe payload (with callable actions replaced by ``"transform"``) + and a dict of transform callbacks keyed by section ID. + """ + if ( + not system_message + or system_message.get("mode") != "customize" + or not system_message.get("sections") + ): + return system_message, None + + callbacks: dict[str, SectionTransformFn] = {} + wire_sections: dict[str, dict] = {} + for section_id, override in system_message["sections"].items(): + if not override: + continue + action = override.get("action") + if callable(action): + callbacks[section_id] = action + wire_sections[section_id] = {"action": "transform"} + else: + wire_sections[section_id] = override + + if not callbacks: + return system_message, None + + wire_payload = {**system_message, "sections": wire_sections} + return wire_payload, callbacks + + class CopilotClient: """ Main client for interacting with the Copilot CLI. @@ -1131,8 +1166,9 @@ async def create_session( if tool_defs: payload["tools"] = tool_defs - if system_message: - payload["systemMessage"] = system_message + wire_system_message, transform_callbacks = _extract_transform_callbacks(system_message) + if wire_system_message: + payload["systemMessage"] = wire_system_message if available_tools is not None: payload["availableTools"] = available_tools @@ -1223,6 +1259,8 @@ async def create_session( session._register_user_input_handler(on_user_input_request) if hooks: session._register_hooks(hooks) + if transform_callbacks: + session._register_transform_callbacks(transform_callbacks) if on_event: session.on(on_event) with self._sessions_lock: @@ -1352,8 +1390,9 @@ async def resume_session( payload["reasoningEffort"] = reasoning_effort if tool_defs: payload["tools"] = tool_defs - if system_message: - payload["systemMessage"] = system_message + wire_system_message, transform_callbacks = _extract_transform_callbacks(system_message) + if wire_system_message: + payload["systemMessage"] = wire_system_message if available_tools is not None: payload["availableTools"] = available_tools if excluded_tools is not None: @@ -1424,6 +1463,8 @@ async def resume_session( session._register_user_input_handler(on_user_input_request) if hooks: session._register_hooks(hooks) + if transform_callbacks: + session._register_transform_callbacks(transform_callbacks) if on_event: session.on(on_event) with self._sessions_lock: @@ -2069,6 +2110,9 @@ def handle_notification(method: str, params: dict): self._client.set_request_handler("permission.request", self._handle_permission_request_v2) self._client.set_request_handler("userInput.request", self._handle_user_input_request) self._client.set_request_handler("hooks.invoke", self._handle_hooks_invoke) + self._client.set_request_handler( + "systemMessage.transform", self._handle_system_message_transform + ) # Start listening for messages loop = asyncio.get_running_loop() @@ -2154,6 +2198,9 @@ def handle_notification(method: str, params: dict): self._client.set_request_handler("permission.request", self._handle_permission_request_v2) self._client.set_request_handler("userInput.request", self._handle_user_input_request) self._client.set_request_handler("hooks.invoke", self._handle_hooks_invoke) + self._client.set_request_handler( + "systemMessage.transform", self._handle_system_message_transform + ) # Start listening for messages loop = asyncio.get_running_loop() @@ -2214,6 +2261,21 @@ async def _handle_hooks_invoke(self, params: dict) -> dict: output = await session._handle_hooks_invoke(hook_type, input_data) return {"output": output} + async def _handle_system_message_transform(self, params: dict) -> dict: + """Handle a systemMessage.transform request from the CLI server.""" + session_id = params.get("sessionId") + sections = params.get("sections") + + if not session_id or not sections: + raise ValueError("invalid systemMessage.transform payload") + + with self._sessions_lock: + session = self._sessions.get(session_id) + if not session: + raise ValueError(f"unknown session {session_id}") + + return await session._handle_system_message_transform(sections) + # ======================================================================== # Protocol v2 backward-compatibility adapters # ======================================================================== diff --git a/python/copilot/session.py b/python/copilot/session.py index 602c31cb7..f2bce87af 100644 --- a/python/copilot/session.py +++ b/python/copilot/session.py @@ -150,6 +150,12 @@ class PermissionRequestResult: path: str | None = None +SectionTransformFn = Callable[[str], str | Awaitable[str]] +"""Transform callback: receives current section content, returns new content.""" + +SectionOverrideAction = Literal["replace", "remove", "append", "prepend"] | SectionTransformFn +"""Override action: a string literal for static overrides, or a callback for transforms.""" + _PermissionHandlerFn = Callable[ [PermissionRequest, dict[str, str]], PermissionRequestResult | Awaitable[PermissionRequestResult], @@ -609,6 +615,8 @@ def __init__(self, session_id: str, client: Any, workspace_path: str | None = No self._user_input_handler_lock = threading.Lock() self._hooks: SessionHooks | None = None self._hooks_lock = threading.Lock() + self._transform_callbacks: dict[str, SectionTransformFn] | None = None + self._transform_callbacks_lock = threading.Lock() self._rpc: SessionRpc | None = None @property @@ -1087,6 +1095,13 @@ async def _handle_user_input_request(self, request: dict) -> UserInputResponse: except Exception: raise + def _register_transform_callbacks( + self, callbacks: dict[str, SectionTransformFn] | None + ) -> None: + """Register transform callbacks for system message sections.""" + with self._transform_callbacks_lock: + self._transform_callbacks = callbacks + def _register_hooks(self, hooks: SessionHooks | None) -> None: """ Register hook handlers for session lifecycle events. @@ -1104,6 +1119,29 @@ def _register_hooks(self, hooks: SessionHooks | None) -> None: with self._hooks_lock: self._hooks = hooks + async def _handle_system_message_transform( + self, sections: dict[str, dict[str, str]] + ) -> dict[str, dict[str, dict[str, str]]]: + """Handle a systemMessage.transform request from the runtime.""" + with self._transform_callbacks_lock: + callbacks = self._transform_callbacks + + result: dict[str, dict[str, str]] = {} + for section_id, section_data in sections.items(): + content = section_data.get("content", "") + callback = callbacks.get(section_id) if callbacks else None + if callback: + try: + transformed = callback(content) + if inspect.isawaitable(transformed): + transformed = await transformed + result[section_id] = {"content": str(transformed)} + except Exception: + result[section_id] = {"content": content} + else: + result[section_id] = {"content": content} + return {"sections": result} + async def _handle_hooks_invoke(self, hook_type: str, input_data: Any) -> Any: """ Handle a hooks invocation from the Copilot CLI. From 2424f92d18a77a60544dda6f296849e056a34f19 Mon Sep 17 00:00:00 2001 From: Brett Cannon Date: Fri, 20 Mar 2026 12:23:46 -0700 Subject: [PATCH 8/8] fix: restore missing SystemMessageCustomizeConfig and related types The customize mode types (SystemPromptSection, SYSTEM_PROMPT_SECTIONS, SectionOverride, SystemMessageCustomizeConfig) were dropped when types.py was deleted but not re-added to session.py. This also moves SectionTransformFn and SectionOverrideAction before SectionOverride so the definitions flow in dependency order. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- python/copilot/session.py | 67 ++++++++++++++++++++++++++++++++++----- 1 file changed, 59 insertions(+), 8 deletions(-) diff --git a/python/copilot/session.py b/python/copilot/session.py index f2bce87af..d57105eaa 100644 --- a/python/copilot/session.py +++ b/python/copilot/session.py @@ -13,7 +13,7 @@ import threading from collections.abc import Awaitable, Callable from dataclasses import dataclass -from typing import Any, Literal, NotRequired, TypedDict, cast +from typing import Any, Literal, NotRequired, Required, TypedDict, cast from ._jsonrpc import JsonRpcError, ProcessExitedError from ._telemetry import get_trace_context, trace_context @@ -123,7 +123,64 @@ class SystemMessageReplaceConfig(TypedDict): content: str -SystemMessageConfig = SystemMessageAppendConfig | SystemMessageReplaceConfig +# Known system prompt section identifiers for the "customize" mode. + +SectionTransformFn = Callable[[str], str | Awaitable[str]] +"""Transform callback: receives current section content, returns new content.""" + +SectionOverrideAction = Literal["replace", "remove", "append", "prepend"] | SectionTransformFn +"""Override action: a string literal for static overrides, or a callback for transforms.""" + +SystemPromptSection = Literal[ + "identity", + "tone", + "tool_efficiency", + "environment_context", + "code_change_rules", + "guidelines", + "safety", + "tool_instructions", + "custom_instructions", + "last_instructions", +] + +SYSTEM_PROMPT_SECTIONS: dict[SystemPromptSection, str] = { + "identity": "Agent identity preamble and mode statement", + "tone": "Response style, conciseness rules, output formatting preferences", + "tool_efficiency": "Tool usage patterns, parallel calling, batching guidelines", + "environment_context": "CWD, OS, git root, directory listing, available tools", + "code_change_rules": "Coding rules, linting/testing, ecosystem tools, style", + "guidelines": "Tips, behavioral best practices, behavioral guidelines", + "safety": "Environment limitations, prohibited actions, security policies", + "tool_instructions": "Per-tool usage instructions", + "custom_instructions": "Repository and organization custom instructions", + "last_instructions": ( + "End-of-prompt instructions: parallel tool calling, persistence, task completion" + ), +} + + +class SectionOverride(TypedDict, total=False): + """Override operation for a single system prompt section.""" + + action: Required[SectionOverrideAction] + content: NotRequired[str] + + +class SystemMessageCustomizeConfig(TypedDict, total=False): + """ + Customize mode: Override individual sections of the system prompt. + Keeps the SDK-managed prompt structure while allowing targeted modifications. + """ + + mode: Required[Literal["customize"]] + sections: NotRequired[dict[SystemPromptSection, SectionOverride]] + content: NotRequired[str] + + +SystemMessageConfig = ( + SystemMessageAppendConfig | SystemMessageReplaceConfig | SystemMessageCustomizeConfig +) # ============================================================================ # Permission Types @@ -150,12 +207,6 @@ class PermissionRequestResult: path: str | None = None -SectionTransformFn = Callable[[str], str | Awaitable[str]] -"""Transform callback: receives current section content, returns new content.""" - -SectionOverrideAction = Literal["replace", "remove", "append", "prepend"] | SectionTransformFn -"""Override action: a string literal for static overrides, or a callback for transforms.""" - _PermissionHandlerFn = Callable[ [PermissionRequest, dict[str, str]], PermissionRequestResult | Awaitable[PermissionRequestResult],