"""E2E Session Tests""" import base64 import os import pytest from copilot import CopilotClient from copilot.client import SubprocessConfig from copilot.generated.session_events import SessionModelChangeData from copilot.session import PermissionHandler from copilot.tools import Tool, ToolResult from .testharness import E2ETestContext, get_final_assistant_message, get_next_event_of_type pytestmark = pytest.mark.asyncio(loop_scope="module") class TestSessions: async def test_should_create_and_disconnect_sessions(self, ctx: E2ETestContext): session = await ctx.client.create_session( on_permission_request=PermissionHandler.approve_all, model="claude-sonnet-4.5" ) assert session.session_id messages = await session.get_messages() assert len(messages) > 0 assert messages[0].type.value == "session.start" assert messages[0].data.session_id == session.session_id assert messages[0].data.selected_model == "claude-sonnet-4.5" await session.disconnect() with pytest.raises(Exception, match="Session not found"): await session.get_messages() async def test_should_have_stateful_conversation(self, ctx: E2ETestContext): session = await ctx.client.create_session( on_permission_request=PermissionHandler.approve_all ) assistant_message = await session.send_and_wait("What is 1+1?") assert assistant_message is not None assert "2" in assistant_message.data.content second_message = await session.send_and_wait("Now if you double that, what do you get?") assert second_message is not None assert "4" in second_message.data.content async def test_should_create_a_session_with_appended_systemMessage_config( self, ctx: E2ETestContext ): system_message_suffix = "End each response with the phrase 'Have a nice day!'" session = await ctx.client.create_session( on_permission_request=PermissionHandler.approve_all, system_message={"mode": "append", "content": system_message_suffix}, ) await session.send("What is your full name?") assistant_message = await get_final_assistant_message(session) assert "GitHub" in assistant_message.data.content assert "Have a nice day!" in assistant_message.data.content # Also validate the underlying traffic traffic = await ctx.get_exchanges() system_message = _get_system_message(traffic[0]) assert "GitHub" in system_message assert system_message_suffix in system_message async def test_should_create_a_session_with_replaced_systemMessage_config( self, ctx: E2ETestContext ): test_system_message = "You are an assistant called Testy McTestface. Reply succinctly." session = await ctx.client.create_session( on_permission_request=PermissionHandler.approve_all, system_message={"mode": "replace", "content": test_system_message}, ) await session.send("What is your full name?") assistant_message = await get_final_assistant_message(session) assert "GitHub" not in assistant_message.data.content assert "Testy" in assistant_message.data.content # Also validate the underlying traffic traffic = await ctx.get_exchanges() system_message = _get_system_message(traffic[0]) assert system_message == test_system_message # Exact match async def test_should_create_a_session_with_customized_systemMessage_config( self, ctx: E2ETestContext ): custom_tone = "Respond in a warm, professional tone. Be thorough in explanations." appended_content = "Always mention quarterly earnings." session = await ctx.client.create_session( on_permission_request=PermissionHandler.approve_all, system_message={ "mode": "customize", "sections": { "tone": {"action": "replace", "content": custom_tone}, "code_change_rules": {"action": "remove"}, }, "content": appended_content, }, ) assistant_message = await session.send_and_wait("Who are you?") assert assistant_message is not None # Validate the system message sent to the model traffic = await ctx.get_exchanges() system_message = _get_system_message(traffic[0]) assert custom_tone in system_message assert appended_content in system_message assert "" not in system_message async def test_should_create_a_session_with_availableTools(self, ctx: E2ETestContext): session = await ctx.client.create_session( on_permission_request=PermissionHandler.approve_all, available_tools=["view", "edit"], ) await session.send("What is 1+1?") await get_final_assistant_message(session) # It only tells the model about the specified tools and no others traffic = await ctx.get_exchanges() tools = traffic[0]["request"]["tools"] tool_names = [t["function"]["name"] for t in tools] assert len(tool_names) == 2 assert "view" in tool_names assert "edit" in tool_names async def test_should_create_a_session_with_excludedTools(self, ctx: E2ETestContext): session = await ctx.client.create_session( on_permission_request=PermissionHandler.approve_all, excluded_tools=["view"] ) await session.send("What is 1+1?") await get_final_assistant_message(session) # It has other tools, but not the one we excluded traffic = await ctx.get_exchanges() tools = traffic[0]["request"]["tools"] tool_names = [t["function"]["name"] for t in tools] assert "edit" in tool_names assert "grep" in tool_names assert "view" not in tool_names # TODO: This test shows there's a race condition inside client.ts. If createSession # is called concurrently and autoStart is on, it may start multiple child processes. # This needs to be fixed. Right now it manifests as being unable to delete the temp # directories during afterAll even though we stopped all the clients. @pytest.mark.skip(reason="Known race condition - see TypeScript test") async def test_should_handle_multiple_concurrent_sessions(self, ctx: E2ETestContext): import asyncio s1, s2, s3 = await asyncio.gather( ctx.client.create_session(on_permission_request=PermissionHandler.approve_all), ctx.client.create_session(on_permission_request=PermissionHandler.approve_all), ctx.client.create_session(on_permission_request=PermissionHandler.approve_all), ) # All sessions should have unique IDs session_ids = {s1.session_id, s2.session_id, s3.session_id} assert len(session_ids) == 3 # All are connected for s in [s1, s2, s3]: messages = await s.get_messages() assert len(messages) > 0 assert messages[0].type.value == "session.start" assert messages[0].data.session_id == s.session_id # All can be disconnected await asyncio.gather(s1.disconnect(), s2.disconnect(), s3.disconnect()) for s in [s1, s2, s3]: with pytest.raises(Exception, match="Session not found"): await s.get_messages() async def test_should_resume_a_session_using_the_same_client(self, ctx: E2ETestContext): # Create initial session session1 = await ctx.client.create_session( on_permission_request=PermissionHandler.approve_all ) session_id = session1.session_id answer = await session1.send_and_wait("What is 1+1?") assert answer is not None assert "2" in answer.data.content # Resume using the same client session2 = await ctx.client.resume_session( session_id, on_permission_request=PermissionHandler.approve_all ) assert session2.session_id == session_id answer2 = await get_final_assistant_message(session2, already_idle=True) assert "2" in answer2.data.content # Can continue the conversation statefully answer3 = await session2.send_and_wait("Now if you double that, what do you get?") assert answer3 is not None assert "4" in answer3.data.content async def test_should_resume_a_session_using_a_new_client(self, ctx: E2ETestContext): # Create initial session session1 = await ctx.client.create_session( on_permission_request=PermissionHandler.approve_all ) session_id = session1.session_id answer = await session1.send_and_wait("What is 1+1?") assert answer is not None assert "2" in answer.data.content # Resume using a new client github_token = ( "fake-token-for-e2e-tests" if os.environ.get("GITHUB_ACTIONS") == "true" else None ) new_client = CopilotClient( SubprocessConfig( cli_path=ctx.cli_path, cwd=ctx.work_dir, env=ctx.get_env(), github_token=github_token, ) ) try: session2 = await new_client.resume_session( session_id, on_permission_request=PermissionHandler.approve_all ) assert session2.session_id == session_id messages = await session2.get_messages() message_types = [m.type.value for m in messages] assert "user.message" in message_types assert "session.resume" in message_types # Can continue the conversation statefully answer2 = await session2.send_and_wait("Now if you double that, what do you get?") assert answer2 is not None assert "4" in answer2.data.content finally: await new_client.force_stop() async def test_should_throw_error_resuming_nonexistent_session(self, ctx: E2ETestContext): with pytest.raises(Exception): await ctx.client.resume_session( "non-existent-session-id", on_permission_request=PermissionHandler.approve_all ) async def test_should_list_sessions(self, ctx: E2ETestContext): import asyncio # Create a couple of sessions and send messages to persist them session1 = await ctx.client.create_session( on_permission_request=PermissionHandler.approve_all ) await session1.send_and_wait("Say hello") session2 = await ctx.client.create_session( on_permission_request=PermissionHandler.approve_all ) await session2.send_and_wait("Say goodbye") # Small delay to ensure session files are written to disk await asyncio.sleep(0.2) # List sessions and verify they're included sessions = await ctx.client.list_sessions() assert isinstance(sessions, list) session_ids = [s.sessionId for s in sessions] assert session1.session_id in session_ids assert session2.session_id in session_ids # Verify session metadata structure for session_data in sessions: assert hasattr(session_data, "sessionId") assert hasattr(session_data, "startTime") assert hasattr(session_data, "modifiedTime") assert hasattr(session_data, "isRemote") # summary is optional assert isinstance(session_data.sessionId, str) assert isinstance(session_data.startTime, str) assert isinstance(session_data.modifiedTime, str) assert isinstance(session_data.isRemote, bool) # Verify context field is present for session_data in sessions: assert hasattr(session_data, "context") if session_data.context is not None: assert hasattr(session_data.context, "cwd") assert isinstance(session_data.context.cwd, str) async def test_should_delete_session(self, ctx: E2ETestContext): import asyncio # Create a session and send a message to persist it session = await ctx.client.create_session( on_permission_request=PermissionHandler.approve_all ) await session.send_and_wait("Hello") session_id = session.session_id # Small delay to ensure session file is written to disk await asyncio.sleep(0.2) # Verify session exists in the list sessions = await ctx.client.list_sessions() session_ids = [s.sessionId for s in sessions] assert session_id in session_ids # Delete the session await ctx.client.delete_session(session_id) # Verify session no longer exists in the list sessions_after = await ctx.client.list_sessions() session_ids_after = [s.sessionId for s in sessions_after] assert session_id not in session_ids_after # Verify we cannot resume the deleted session with pytest.raises(Exception): await ctx.client.resume_session( session_id, on_permission_request=PermissionHandler.approve_all ) async def test_should_get_session_metadata(self, ctx: E2ETestContext): import asyncio # Create a session and send a message to persist it session = await ctx.client.create_session( on_permission_request=PermissionHandler.approve_all ) await session.send_and_wait("Say hello") # Small delay to ensure session file is written to disk await asyncio.sleep(0.2) # Get metadata for the session we just created metadata = await ctx.client.get_session_metadata(session.session_id) assert metadata is not None assert metadata.sessionId == session.session_id assert isinstance(metadata.startTime, str) assert isinstance(metadata.modifiedTime, str) assert isinstance(metadata.isRemote, bool) # Verify context field is present if metadata.context is not None: assert hasattr(metadata.context, "cwd") assert isinstance(metadata.context.cwd, str) # Verify non-existent session returns None not_found = await ctx.client.get_session_metadata("non-existent-session-id") assert not_found is None async def test_should_get_last_session_id(self, ctx: E2ETestContext): import asyncio # Create a session and send a message to persist it session = await ctx.client.create_session( on_permission_request=PermissionHandler.approve_all ) await session.send_and_wait("Say hello") # Small delay to ensure session data is flushed to disk await asyncio.sleep(0.5) last_session_id = await ctx.client.get_last_session_id() assert last_session_id == session.session_id await session.disconnect() async def test_should_create_session_with_custom_tool(self, ctx: E2ETestContext): # This test uses the low-level Tool() API to show that Pydantic is optional def get_secret_number_handler(invocation): key = invocation.arguments.get("key", "") if invocation.arguments else "" return ToolResult( text_result_for_llm="54321" if key == "ALPHA" else "unknown", result_type="success", ) session = await ctx.client.create_session( on_permission_request=PermissionHandler.approve_all, tools=[ Tool( name="get_secret_number", description="Gets the secret number", handler=get_secret_number_handler, parameters={ "type": "object", "properties": {"key": {"type": "string", "description": "Key"}}, "required": ["key"], }, ) ], ) answer = await session.send_and_wait("What is the secret number for key ALPHA?") assert answer is not None assert "54321" in answer.data.content async def test_should_create_session_with_custom_provider(self, ctx: E2ETestContext): session = await ctx.client.create_session( on_permission_request=PermissionHandler.approve_all, provider={ "type": "openai", "base_url": "https://api.openai.com/v1", "api_key": "fake-key", }, ) assert session.session_id async def test_should_create_session_with_azure_provider(self, ctx: E2ETestContext): session = await ctx.client.create_session( on_permission_request=PermissionHandler.approve_all, provider={ "type": "azure", "base_url": "https://my-resource.openai.azure.com", "api_key": "fake-key", "azure": { "api_version": "2024-02-15-preview", }, }, ) assert session.session_id async def test_should_resume_session_with_custom_provider(self, ctx: E2ETestContext): session = await ctx.client.create_session( on_permission_request=PermissionHandler.approve_all ) session_id = session.session_id # Resume the session with a provider session2 = await ctx.client.resume_session( session_id, on_permission_request=PermissionHandler.approve_all, provider={ "type": "openai", "base_url": "https://api.openai.com/v1", "api_key": "fake-key", }, ) assert session2.session_id == session_id async def test_should_abort_a_session(self, ctx: E2ETestContext): import asyncio session = await ctx.client.create_session( on_permission_request=PermissionHandler.approve_all ) # Set up event listeners BEFORE sending to avoid race conditions wait_for_tool_start = asyncio.create_task( get_next_event_of_type(session, "tool.execution_start", timeout=60.0) ) wait_for_session_idle = asyncio.create_task( get_next_event_of_type(session, "session.idle", timeout=30.0) ) # Send a message that will trigger a long-running shell command await session.send( "run the shell command 'sleep 100' (note this works on both bash and PowerShell)" ) # Wait for the tool to start executing _ = await wait_for_tool_start # Abort the session while the tool is running await session.abort() # Wait for session to become idle after abort _ = await wait_for_session_idle # The session should still be alive and usable after abort messages = await session.get_messages() assert len(messages) > 0 # Verify an abort event exists in messages abort_events = [m for m in messages if m.type.value == "abort"] assert len(abort_events) > 0, "Expected an abort event in messages" # We should be able to send another message answer = await session.send_and_wait("What is 2+2?") assert "4" in answer.data.content async def test_should_receive_session_events(self, ctx: E2ETestContext): import asyncio # Use on_event to capture events dispatched during session creation. # session.start is emitted during the session.create RPC; if the session # weren't registered in the sessions map before the RPC, it would be dropped. early_events = [] def capture_early(event): early_events.append(event) session = await ctx.client.create_session( on_permission_request=PermissionHandler.approve_all, on_event=capture_early, ) assert any(e.type.value == "session.start" for e in early_events) received_events = [] idle_event = asyncio.Event() def on_event(event): received_events.append(event) if event.type.value == "session.idle": idle_event.set() session.on(on_event) # Send a message to trigger events await session.send("What is 100+200?") # Wait for session to become idle try: await asyncio.wait_for(idle_event.wait(), timeout=60) except TimeoutError: pytest.fail("Timed out waiting for session.idle") # Should have received multiple events assert len(received_events) > 0 event_types = [e.type.value for e in received_events] assert "user.message" in event_types assert "assistant.message" in event_types assert "session.idle" in event_types # Verify the assistant response contains the expected answer. # session.idle is ephemeral and not in get_messages(), but we already # confirmed idle via the live event handler above. assistant_message = await get_final_assistant_message(session, already_idle=True) assert "300" in assistant_message.data.content async def test_should_create_session_with_custom_config_dir(self, ctx: E2ETestContext): import os custom_config_dir = os.path.join(ctx.home_dir, "custom-config") session = await ctx.client.create_session( on_permission_request=PermissionHandler.approve_all, config_dir=custom_config_dir ) assert session.session_id # Session should work normally with custom config dir await session.send("What is 1+1?") assistant_message = await get_final_assistant_message(session) assert "2" in assistant_message.data.content async def test_session_log_emits_events_at_all_levels(self, ctx: E2ETestContext): import asyncio session = await ctx.client.create_session( on_permission_request=PermissionHandler.approve_all ) received_events = [] def on_event(event): if event.type.value in ("session.info", "session.warning", "session.error"): received_events.append(event) session.on(on_event) await session.log("Info message") await session.log("Warning message", level="warning") await session.log("Error message", level="error") await session.log("Ephemeral message", ephemeral=True) # Poll until all 4 notification events arrive deadline = asyncio.get_event_loop().time() + 10 while len(received_events) < 4: if asyncio.get_event_loop().time() > deadline: pytest.fail( f"Timed out waiting for 4 notification events, got {len(received_events)}" ) await asyncio.sleep(0.1) by_message = {e.data.message: e for e in received_events} assert by_message["Info message"].type.value == "session.info" assert by_message["Info message"].data.info_type == "notification" assert by_message["Warning message"].type.value == "session.warning" assert by_message["Warning message"].data.warning_type == "notification" assert by_message["Error message"].type.value == "session.error" assert by_message["Error message"].data.error_type == "notification" assert by_message["Ephemeral message"].type.value == "session.info" assert by_message["Ephemeral message"].data.info_type == "notification" async def test_should_set_model_with_reasoning_effort(self, ctx: E2ETestContext): """Test that setModel passes reasoningEffort and it appears in the model_change event.""" import asyncio session = await ctx.client.create_session( on_permission_request=PermissionHandler.approve_all ) model_change_event = asyncio.get_event_loop().create_future() def on_event(event): if model_change_event.done(): return match event.data: case SessionModelChangeData() as data: model_change_event.set_result(data) session.on(on_event) await session.set_model("gpt-4.1", reasoning_effort="high") data = await asyncio.wait_for(model_change_event, timeout=30) assert data.new_model == "gpt-4.1" assert data.reasoning_effort == "high" async def test_should_accept_blob_attachments(self, ctx: E2ETestContext): # Write the image to disk so the model can view it pixel_png = ( "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAY" "AAAAfFcSJAAAADUlEQVR42mNk+M9QDwADhg" "GAWjR9awAAAABJRU5ErkJggg==" ) png_path = os.path.join(ctx.work_dir, "test-pixel.png") with open(png_path, "wb") as f: f.write(base64.b64decode(pixel_png)) session = await ctx.client.create_session( on_permission_request=PermissionHandler.approve_all ) await session.send_and_wait( "Describe this image", attachments=[ { "type": "blob", "data": pixel_png, "mimeType": "image/png", "displayName": "test-pixel.png", }, ], ) await session.disconnect() def _get_system_message(exchange: dict) -> str: messages = exchange.get("request", {}).get("messages", []) for msg in messages: if msg.get("role") == "system": return msg.get("content", "") return ""