"""E2E RPC Tests""" import pytest from copilot import CopilotClient from copilot.client import SubprocessConfig from copilot.generated.rpc import ModelsListRequest, PingRequest from copilot.session import PermissionHandler from .testharness import CLI_PATH, E2ETestContext pytestmark = pytest.mark.asyncio(loop_scope="module") class TestRpc: @pytest.mark.asyncio async def test_should_call_rpc_ping_with_typed_params(self): """Test calling rpc.ping with typed params and result""" client = CopilotClient(SubprocessConfig(cli_path=CLI_PATH, use_stdio=True)) try: await client.start() result = await client.rpc.ping(PingRequest(message="typed rpc test")) assert result.message == "pong: typed rpc test" assert result.timestamp is not None await client.stop() finally: await client.force_stop() @pytest.mark.asyncio async def test_should_call_rpc_models_list(self): """Test calling rpc.models.list with typed result""" client = CopilotClient(SubprocessConfig(cli_path=CLI_PATH, use_stdio=True)) try: await client.start() auth_status = await client.get_auth_status() if not auth_status.isAuthenticated: await client.stop() return result = await client.rpc.models.list(ModelsListRequest()) assert result.models is not None assert isinstance(result.models, list) await client.stop() finally: await client.force_stop() # account.getQuota is defined in schema but not yet implemented in CLI @pytest.mark.skip(reason="account.getQuota not yet implemented in CLI") @pytest.mark.asyncio async def test_should_call_rpc_account_get_quota(self): """Test calling rpc.account.getQuota when authenticated""" client = CopilotClient(SubprocessConfig(cli_path=CLI_PATH, use_stdio=True)) try: await client.start() auth_status = await client.get_auth_status() if not auth_status.isAuthenticated: await client.stop() return result = await client.rpc.account.get_quota() assert result.quota_snapshots is not None assert isinstance(result.quota_snapshots, dict) await client.stop() finally: await client.force_stop() class TestSessionRpc: # session.model.getCurrent is defined in schema but not yet implemented in CLI @pytest.mark.skip(reason="session.model.getCurrent not yet implemented in CLI") async def test_should_call_session_rpc_model_get_current(self, ctx: E2ETestContext): """Test calling session.rpc.model.getCurrent""" session = await ctx.client.create_session( on_permission_request=PermissionHandler.approve_all, model="claude-sonnet-4.5" ) result = await session.rpc.model.get_current() assert result.model_id is not None assert isinstance(result.model_id, str) # session.model.switchTo is defined in schema but not yet implemented in CLI @pytest.mark.skip(reason="session.model.switchTo not yet implemented in CLI") async def test_should_call_session_rpc_model_switch_to(self, ctx: E2ETestContext): """Test calling session.rpc.model.switchTo""" from copilot.generated.rpc import ModelSwitchToRequest session = await ctx.client.create_session( on_permission_request=PermissionHandler.approve_all, model="claude-sonnet-4.5" ) # Get initial model before = await session.rpc.model.get_current() assert before.model_id is not None # Switch to a different model with reasoning effort result = await session.rpc.model.switch_to( ModelSwitchToRequest(model_id="gpt-4.1", reasoning_effort="high") ) assert result.model_id == "gpt-4.1" # Verify the switch persisted after = await session.rpc.model.get_current() assert after.model_id == "gpt-4.1" @pytest.mark.asyncio async def test_get_and_set_session_mode(self): """Test getting and setting session mode""" from copilot.generated.rpc import ModeSetRequest, SessionMode client = CopilotClient(SubprocessConfig(cli_path=CLI_PATH, use_stdio=True)) try: await client.start() session = await client.create_session( on_permission_request=PermissionHandler.approve_all ) # Get initial mode (default should be interactive) initial = await session.rpc.mode.get() assert initial == SessionMode.INTERACTIVE # Switch to plan mode await session.rpc.mode.set(ModeSetRequest(mode=SessionMode.PLAN)) # Verify mode persisted after_plan = await session.rpc.mode.get() assert after_plan == SessionMode.PLAN # Switch back to interactive await session.rpc.mode.set(ModeSetRequest(mode=SessionMode.INTERACTIVE)) await session.disconnect() await client.stop() finally: await client.force_stop() @pytest.mark.asyncio async def test_read_update_and_delete_plan(self): """Test reading, updating, and deleting plan""" from copilot.generated.rpc import PlanUpdateRequest client = CopilotClient(SubprocessConfig(cli_path=CLI_PATH, use_stdio=True)) try: await client.start() session = await client.create_session( on_permission_request=PermissionHandler.approve_all ) # Initially plan should not exist initial = await session.rpc.plan.read() assert initial.exists is False assert initial.content is None # Create/update plan plan_content = "# Test Plan\n\n- Step 1\n- Step 2" await session.rpc.plan.update(PlanUpdateRequest(content=plan_content)) # Verify plan exists and has correct content after_update = await session.rpc.plan.read() assert after_update.exists is True assert after_update.content == plan_content # Delete plan await session.rpc.plan.delete() # Verify plan is deleted after_delete = await session.rpc.plan.read() assert after_delete.exists is False assert after_delete.content is None await session.disconnect() await client.stop() finally: await client.force_stop() @pytest.mark.asyncio async def test_create_list_and_read_workspace_files(self): """Test creating, listing, and reading workspace files""" from copilot.generated.rpc import ( WorkspacesCreateFileRequest, WorkspacesReadFileRequest, ) client = CopilotClient(SubprocessConfig(cli_path=CLI_PATH, use_stdio=True)) try: await client.start() session = await client.create_session( on_permission_request=PermissionHandler.approve_all ) # Initially no files initial_files = await session.rpc.workspaces.list_files() assert initial_files.files == [] # Create a file file_content = "Hello, workspace!" await session.rpc.workspaces.create_file( WorkspacesCreateFileRequest(content=file_content, path="test.txt") ) # List files after_create = await session.rpc.workspaces.list_files() assert "test.txt" in after_create.files # Read file read_result = await session.rpc.workspaces.read_file( WorkspacesReadFileRequest(path="test.txt") ) assert read_result.content == file_content # Create nested file await session.rpc.workspaces.create_file( WorkspacesCreateFileRequest(content="Nested content", path="subdir/nested.txt") ) after_nested = await session.rpc.workspaces.list_files() assert "test.txt" in after_nested.files assert any("nested.txt" in f for f in after_nested.files) await session.disconnect() await client.stop() finally: await client.force_stop()