-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathtest_rpc_remote_e2e.py
More file actions
95 lines (80 loc) · 3.46 KB
/
test_rpc_remote_e2e.py
File metadata and controls
95 lines (80 loc) · 3.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
"""E2E coverage for session.remote RPC methods."""
from __future__ import annotations
import asyncio
import time
import pytest
from copilot.generated.rpc import (
RemoteEnableRequest,
RemoteNotifySteerableChangedRequest,
RemoteSessionMode,
SessionsGetPersistedRemoteSteerableRequest,
)
from copilot.generated.session_events import SessionRemoteSteerableChangedData
from copilot.session import PermissionHandler
from .testharness import E2ETestContext
pytestmark = pytest.mark.asyncio(loop_scope="module")
async def _wait_for_remote_steerable_event(session, expected: bool) -> None:
deadline = time.monotonic() + 30.0
while time.monotonic() < deadline:
events = await session.get_events()
if any(
isinstance(evt.data, SessionRemoteSteerableChangedData)
and evt.data.remote_steerable is expected
for evt in events
):
return
await asyncio.sleep(0.2)
pytest.fail(f"Timed out waiting for session.remote_steerable_changed={expected}.")
def _assert_not_unhandled(exc: Exception, method: str) -> None:
assert f"unhandled method {method}".lower() not in str(exc).lower()
class TestRpcRemote:
async def test_remote_off_is_noop_or_implemented_error(self, ctx: E2ETestContext):
session = await ctx.client.create_session(
on_permission_request=PermissionHandler.approve_all,
)
try:
try:
result = await session.rpc.remote.enable(
RemoteEnableRequest(mode=RemoteSessionMode.OFF)
)
except Exception as exc:
_assert_not_unhandled(exc, "session.remote.enable")
else:
assert result.remote_steerable is False
assert not result.url
finally:
await session.disconnect()
async def test_remote_disable_is_noop_or_implemented_error(self, ctx: E2ETestContext):
session = await ctx.client.create_session(
on_permission_request=PermissionHandler.approve_all,
)
try:
try:
await session.rpc.remote.disable()
except Exception as exc:
_assert_not_unhandled(exc, "session.remote.disable")
finally:
await session.disconnect()
async def test_notify_steerable_changed_event_and_persist_flag(self, ctx: E2ETestContext):
session = await ctx.client.create_session(
on_permission_request=PermissionHandler.approve_all,
)
try:
await session.rpc.remote.notify_steerable_changed(
RemoteNotifySteerableChangedRequest(remote_steerable=True)
)
await _wait_for_remote_steerable_event(session, True)
persisted = await ctx.client.rpc.sessions.get_persisted_remote_steerable(
SessionsGetPersistedRemoteSteerableRequest(session_id=session.session_id)
)
assert persisted.remote_steerable is True
await session.rpc.remote.notify_steerable_changed(
RemoteNotifySteerableChangedRequest(remote_steerable=False)
)
await _wait_for_remote_steerable_event(session, False)
persisted = await ctx.client.rpc.sessions.get_persisted_remote_steerable(
SessionsGetPersistedRemoteSteerableRequest(session_id=session.session_id)
)
assert persisted.remote_steerable is False
finally:
await session.disconnect()