-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathtest_client_lifecycle_e2e.py
More file actions
259 lines (214 loc) · 8.79 KB
/
test_client_lifecycle_e2e.py
File metadata and controls
259 lines (214 loc) · 8.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
"""
Client lifecycle tests covering ``client.on(...)`` lifecycle event subscriptions
and connection-state transitions across ``start``/``stop``.
Mirrors ``dotnet/test/ClientLifecycleTests.cs`` plus the existing ``client_lifecycle``
nodejs scenarios so the YAML snapshots under ``test/snapshots/client_lifecycle/``
can be reused.
"""
from __future__ import annotations
import asyncio
import os
import pytest
from copilot import CopilotClient
from copilot.client import SubprocessConfig
from copilot.session import PermissionHandler
from .testharness import E2ETestContext
pytestmark = pytest.mark.asyncio(loop_scope="module")
async def _wait_for_condition(predicate, timeout: float = 10.0) -> None:
deadline = asyncio.get_running_loop().time() + timeout
while True:
if predicate():
return
if asyncio.get_running_loop().time() >= deadline:
raise TimeoutError("condition was not met before timeout")
await asyncio.sleep(0.05)
async def _wait_for_last_session_id(client) -> str:
last_id = None
async def poll() -> bool:
nonlocal last_id
last_id = await client.get_last_session_id()
return bool(last_id)
deadline = asyncio.get_running_loop().time() + 10.0
while True:
if await poll():
return last_id
if asyncio.get_running_loop().time() >= deadline:
raise TimeoutError("last session id was not persisted before timeout")
await asyncio.sleep(0.05)
def _make_isolated_client(ctx: E2ETestContext) -> CopilotClient:
"""Build a client with the same isolated env as ctx.client but disjoint state.
Used to exercise lifecycle tests that need a known-empty state directory
or that explicitly drive start/stop transitions.
"""
github_token = (
"fake-token-for-e2e-tests" if os.environ.get("GITHUB_ACTIONS") == "true" else None
)
return CopilotClient(
SubprocessConfig(
cli_path=ctx.cli_path,
cwd=ctx.work_dir,
env=ctx.get_env(),
github_token=github_token,
)
)
class TestClientLifecycle:
async def test_should_return_last_session_id_after_sending_a_message(self, ctx: E2ETestContext):
session = await ctx.client.create_session(
on_permission_request=PermissionHandler.approve_all,
)
try:
await session.send_and_wait("Say hello")
last_id = await _wait_for_last_session_id(ctx.client)
assert last_id
finally:
await session.disconnect()
async def test_should_emit_session_lifecycle_events(self, ctx: E2ETestContext):
events: list = []
unsubscribe = ctx.client.on(events.append)
try:
session = await ctx.client.create_session(
on_permission_request=PermissionHandler.approve_all,
)
try:
await session.send_and_wait("Say hello")
await _wait_for_condition(
lambda: any(
getattr(e, "sessionId", None) == session.session_id for e in events
),
timeout=10.0,
)
finally:
await session.disconnect()
finally:
unsubscribe()
async def test_should_receive_session_created_lifecycle_event(self, ctx: E2ETestContext):
loop = asyncio.get_event_loop()
created: asyncio.Future = loop.create_future()
def handler(event):
if event.type == "session.created" and not created.done():
created.set_result(event)
unsubscribe = ctx.client.on(handler)
try:
session = await ctx.client.create_session(
on_permission_request=PermissionHandler.approve_all,
)
try:
event = await asyncio.wait_for(created, 10.0)
assert event.type == "session.created"
assert event.sessionId == session.session_id
finally:
await session.disconnect()
finally:
unsubscribe()
async def test_should_filter_session_lifecycle_events_by_type(self, ctx: E2ETestContext):
loop = asyncio.get_event_loop()
created: asyncio.Future = loop.create_future()
def handler(event):
if not created.done():
created.set_result(event)
unsubscribe = ctx.client.on("session.created", handler)
try:
session = await ctx.client.create_session(
on_permission_request=PermissionHandler.approve_all,
)
try:
event = await asyncio.wait_for(created, 10.0)
assert event.type == "session.created"
assert event.sessionId == session.session_id
finally:
await session.disconnect()
finally:
unsubscribe()
async def test_disposing_lifecycle_subscription_stops_receiving_events(
self, ctx: E2ETestContext
):
loop = asyncio.get_event_loop()
unsubscribed_count = 0
def disposed_handler(_event):
nonlocal unsubscribed_count
unsubscribed_count += 1
unsubscribe_disposed = ctx.client.on(disposed_handler)
unsubscribe_disposed() # Immediately dispose first subscription.
active_event: asyncio.Future = loop.create_future()
unsubscribe_active = ctx.client.on(
"session.created",
lambda evt: active_event.set_result(evt) if not active_event.done() else None,
)
try:
session = await ctx.client.create_session(
on_permission_request=PermissionHandler.approve_all,
)
try:
event = await asyncio.wait_for(active_event, 10.0)
assert event.sessionId == session.session_id
assert unsubscribed_count == 0, "Disposed handler should not have fired"
finally:
await session.disconnect()
finally:
unsubscribe_active()
async def test_stop_disconnects_client_and_disposes_rpc_surface(self, ctx: E2ETestContext):
client = _make_isolated_client(ctx)
await client.start()
try:
assert client.get_state() == "connected"
finally:
await client.stop()
assert client.get_state() == "disconnected"
with pytest.raises(RuntimeError):
_ = client.rpc
async def test_should_receive_session_updated_lifecycle_event_for_non_ephemeral_activity(
self, ctx: E2ETestContext
):
"""Changing session mode emits a session.updated lifecycle event."""
from copilot.generated.rpc import ModeSetRequest, SessionMode
loop = asyncio.get_event_loop()
updated: asyncio.Future = loop.create_future()
session = await ctx.client.create_session(
on_permission_request=PermissionHandler.approve_all,
)
def handler(event):
if (
event.type == "session.updated"
and event.sessionId == session.session_id
and not updated.done()
):
updated.set_result(event)
unsubscribe = ctx.client.on(handler)
try:
await session.rpc.mode.set(ModeSetRequest(mode=SessionMode.PLAN))
event = await asyncio.wait_for(updated, timeout=15.0)
assert event.type == "session.updated"
assert event.sessionId == session.session_id
finally:
unsubscribe()
await session.disconnect()
async def test_should_receive_session_deleted_lifecycle_event_when_deleted(
self, ctx: E2ETestContext
):
"""Deleting a session emits a session.deleted lifecycle event."""
loop = asyncio.get_event_loop()
deleted: asyncio.Future = loop.create_future()
session = await ctx.client.create_session(
on_permission_request=PermissionHandler.approve_all,
)
session_id = session.session_id
# Do a turn so the session is persisted
message = await session.send_and_wait("Say SESSION_DELETED_OK exactly.", timeout=60.0)
assert message is not None
assert "SESSION_DELETED_OK" in (message.data.content or "")
def handler(event):
if (
event.type == "session.deleted"
and event.sessionId == session_id
and not deleted.done()
):
deleted.set_result(event)
unsubscribe = ctx.client.on(handler)
try:
await session.disconnect()
await ctx.client.delete_session(session_id)
event = await asyncio.wait_for(deleted, timeout=15.0)
assert event.type == "session.deleted"
assert event.sessionId == session_id
finally:
unsubscribe()