-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathtest_event_fidelity_e2e.py
More file actions
251 lines (208 loc) · 9.85 KB
/
test_event_fidelity_e2e.py
File metadata and controls
251 lines (208 loc) · 9.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
"""E2E tests for session event ordering and required event fields."""
from __future__ import annotations
from pathlib import Path
import pytest
from copilot.generated.session_events import (
AssistantMessageData,
AssistantUsageData,
PendingMessagesModifiedData,
SessionUsageInfoData,
ToolExecutionCompleteData,
ToolExecutionStartData,
UserMessageData,
)
from copilot.session import PermissionHandler
from .testharness import E2ETestContext
pytestmark = pytest.mark.asyncio(loop_scope="module")
class TestEventFidelity:
async def test_should_emit_events_in_correct_order_for_tool_using_conversation(
self, ctx: E2ETestContext
):
Path(ctx.work_dir, "hello.txt").write_text("Hello World", encoding="utf-8")
session = await ctx.client.create_session(
on_permission_request=PermissionHandler.approve_all
)
events = []
unsubscribe = session.on(events.append)
try:
await session.send_and_wait("Read the file 'hello.txt' and tell me its contents.")
types = [event.type.value for event in events]
assert "user.message" in types
assert "assistant.message" in types
user_idx = types.index("user.message")
assistant_idx = len(types) - 1 - types[::-1].index("assistant.message")
assert user_idx < assistant_idx
idle_idx = len(types) - 1 - types[::-1].index("session.idle")
assert idle_idx == len(types) - 1
finally:
unsubscribe()
await session.disconnect()
async def test_should_include_valid_fields_on_all_events(self, ctx: E2ETestContext):
session = await ctx.client.create_session(
on_permission_request=PermissionHandler.approve_all
)
events = []
unsubscribe = session.on(events.append)
try:
await session.send_and_wait("What is 5+5? Reply with just the number.")
for event in events:
assert event.id is not None
assert str(event.id)
assert event.timestamp is not None
user_event = next(
(event for event in events if isinstance(event.data, UserMessageData)), None
)
assert user_event is not None
assert user_event.data.content
assistant_event = next(
(event for event in events if isinstance(event.data, AssistantMessageData)),
None,
)
assert assistant_event is not None
assert assistant_event.data.message_id
assert assistant_event.data.content is not None
finally:
unsubscribe()
await session.disconnect()
async def test_should_emit_tool_execution_events_with_correct_fields(self, ctx: E2ETestContext):
Path(ctx.work_dir, "data.txt").write_text("test data", encoding="utf-8")
session = await ctx.client.create_session(
on_permission_request=PermissionHandler.approve_all
)
events = []
unsubscribe = session.on(events.append)
try:
await session.send_and_wait("Read the file 'data.txt'.")
tool_starts = [
event for event in events if isinstance(event.data, ToolExecutionStartData)
]
tool_completes = [
event for event in events if isinstance(event.data, ToolExecutionCompleteData)
]
assert len(tool_starts) >= 1
assert len(tool_completes) >= 1
assert tool_starts[0].data.tool_call_id
assert tool_starts[0].data.tool_name
assert tool_completes[0].data.tool_call_id
finally:
unsubscribe()
await session.disconnect()
async def test_should_emit_assistant_message_with_messageid(self, ctx: E2ETestContext):
session = await ctx.client.create_session(
on_permission_request=PermissionHandler.approve_all
)
events = []
unsubscribe = session.on(events.append)
try:
await session.send_and_wait("Say 'pong'.")
assistant_events = [
event for event in events if isinstance(event.data, AssistantMessageData)
]
assert len(assistant_events) >= 1
message = assistant_events[0]
assert message.data.message_id
assert "pong" in message.data.content
finally:
unsubscribe()
await session.disconnect()
async def test_should_emit_assistant_usage_event_after_model_call(self, ctx: E2ETestContext):
session = await ctx.client.create_session(
on_permission_request=PermissionHandler.approve_all
)
events = []
unsubscribe = session.on(events.append)
try:
await session.send_and_wait("What is 5+5? Reply with just the number.")
usage_events = [e for e in events if isinstance(e.data, AssistantUsageData)]
assert len(usage_events) >= 1, "Expected at least one assistant.usage event"
last_usage = usage_events[-1]
assert last_usage.id is not None
assert last_usage.timestamp is not None
assert last_usage.data.model
finally:
unsubscribe()
await session.disconnect()
async def test_should_emit_session_usage_info_event_after_model_call(self, ctx: E2ETestContext):
session = await ctx.client.create_session(
on_permission_request=PermissionHandler.approve_all
)
events = []
unsubscribe = session.on(events.append)
try:
await session.send_and_wait("What is 5+5? Reply with just the number.")
usage_info_events = [e for e in events if isinstance(e.data, SessionUsageInfoData)]
assert len(usage_info_events) >= 1, "Expected at least one session.usage_info event"
last_info = usage_info_events[-1]
assert last_info.data.current_tokens > 0
assert last_info.data.messages_length > 0
assert last_info.data.token_limit > 0
finally:
unsubscribe()
await session.disconnect()
async def test_should_emit_pending_messages_modified_event_when_message_queue_changes(
self, ctx: E2ETestContext
):
session = await ctx.client.create_session(
on_permission_request=PermissionHandler.approve_all
)
events = []
unsubscribe = session.on(events.append)
try:
# send_and_wait collects everything in one round trip and matches the
# pattern of every other test in this file (and the Rust E2E equivalent),
# avoiding the split fire-and-forget + helper pattern that previously
# made this test prone to flakes.
answer = await session.send_and_wait("What is 9+9? Reply with just the number.")
pending_event = next(
(e for e in events if isinstance(e.data, PendingMessagesModifiedData)), None
)
assert pending_event is not None
assert answer is not None
assert "18" in (answer.data.content or "")
finally:
unsubscribe()
await session.disconnect()
async def test_should_preserve_message_order_in_getmessages_after_tool_use(
self, ctx: E2ETestContext
):
Path(ctx.work_dir, "order.txt").write_text("ORDER_CONTENT_42", encoding="utf-8")
session = await ctx.client.create_session(
on_permission_request=PermissionHandler.approve_all
)
try:
await session.send_and_wait("Read the file 'order.txt' and tell me what the number is.")
messages = await session.get_events()
types = [m.type.value for m in messages]
# Verify complete event ordering contract:
# session.start → user.message → tool.execution_start → tool.execution_complete
# → assistant.message
def first_index(t: str) -> int:
return types.index(t) if t in types else -1
def last_index(t: str) -> int:
return len(types) - 1 - types[::-1].index(t) if t in types else -1
session_start_idx = first_index("session.start")
user_msg_idx = first_index("user.message")
tool_start_idx = first_index("tool.execution_start")
tool_complete_idx = first_index("tool.execution_complete")
assistant_msg_idx = last_index("assistant.message")
assert session_start_idx >= 0, "Expected session.start event"
assert user_msg_idx >= 0, "Expected user.message event"
assert tool_start_idx >= 0, "Expected tool.execution_start event"
assert tool_complete_idx >= 0, "Expected tool.execution_complete event"
assert assistant_msg_idx >= 0, "Expected assistant.message event"
assert session_start_idx < user_msg_idx, "session.start should precede user.message"
assert user_msg_idx < tool_start_idx, "user.message should precede tool.execution_start"
assert tool_start_idx < tool_complete_idx, (
"tool.execution_start should precede tool.execution_complete"
)
assert tool_complete_idx < assistant_msg_idx, (
"tool.execution_complete should precede final assistant.message"
)
# Verify user.message has our content
user_events = [m for m in messages if isinstance(m.data, UserMessageData)]
assert any("order.txt" in (e.data.content or "") for e in user_events)
# Verify assistant.message references the file content
assistant_events = [m for m in messages if isinstance(m.data, AssistantMessageData)]
assert any("42" in (e.data.content or "") for e in assistant_events)
finally:
await session.disconnect()