-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathtest_rpc_event_log_e2e.py
More file actions
157 lines (135 loc) · 5.2 KB
/
test_rpc_event_log_e2e.py
File metadata and controls
157 lines (135 loc) · 5.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
"""E2E coverage for session.eventLog RPC methods."""
from __future__ import annotations
import asyncio
import time
import uuid
from collections.abc import Awaitable, Callable
import pytest
from copilot.generated.rpc import (
EventLogReadRequest,
EventsCursorStatus,
NameSetRequest,
PlanUpdateRequest,
RegisterEventInterestParams,
ReleaseEventInterestParams,
)
from copilot.generated.session_events import (
PlanChangedOperation,
SessionPlanChangedData,
SessionTitleChangedData,
)
from copilot.session import PermissionHandler
from .testharness import E2ETestContext
pytestmark = pytest.mark.asyncio(loop_scope="module")
async def _wait_for(
predicate: Callable[[], Awaitable[bool]],
*,
timeout: float = 30.0,
message: str,
) -> None:
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
if await predicate():
return
await asyncio.sleep(0.2)
pytest.fail(message)
class TestRpcEventLog:
async def test_should_read_persisted_events_from_beginning(self, ctx: E2ETestContext):
session = await ctx.client.create_session(
on_permission_request=PermissionHandler.approve_all,
)
try:
await session.rpc.plan.update(
PlanUpdateRequest(content="# Event log E2E plan\n- persisted event")
)
observed = None
async def has_plan_event() -> bool:
nonlocal observed
observed = await session.rpc.event_log.read(EventLogReadRequest(max=100, wait_ms=0))
return any(
isinstance(evt.data, SessionPlanChangedData)
and evt.data.operation == PlanChangedOperation.CREATE
and evt.ephemeral is not True
for evt in observed.events
)
await _wait_for(
has_plan_event,
message="Timed out waiting for persisted session.plan_changed event.",
)
assert observed is not None
assert observed.cursor_status == EventsCursorStatus.OK
assert observed.cursor
assert any(
isinstance(evt.data, SessionPlanChangedData)
and evt.data.operation == PlanChangedOperation.CREATE
for evt in observed.events
)
finally:
await session.disconnect()
async def test_should_return_tail_cursor_and_read_empty_when_no_new_events(
self, ctx: E2ETestContext
):
session = await ctx.client.create_session(
on_permission_request=PermissionHandler.approve_all,
)
try:
tail = await session.rpc.event_log.tail()
read = await session.rpc.event_log.read(
EventLogReadRequest(cursor=tail.cursor, max=10, wait_ms=0)
)
assert tail.cursor
assert read.cursor_status == EventsCursorStatus.OK
assert read.events == []
assert read.has_more is False
finally:
await session.disconnect()
async def test_should_register_and_release_event_interest_idempotently(
self, ctx: E2ETestContext
):
session = await ctx.client.create_session(
on_permission_request=PermissionHandler.approve_all,
)
try:
registered = await session.rpc.event_log.register_interest(
RegisterEventInterestParams(event_type="session.title_changed")
)
assert registered.handle
released = await session.rpc.event_log.release_interest(
ReleaseEventInterestParams(handle=registered.handle)
)
assert released.success is True
released_again = await session.rpc.event_log.release_interest(
ReleaseEventInterestParams(handle=registered.handle)
)
assert released_again.success is True
finally:
await session.disconnect()
async def test_should_long_poll_with_types_filter_for_title_changed_event(
self, ctx: E2ETestContext
):
session = await ctx.client.create_session(
on_permission_request=PermissionHandler.approve_all,
)
try:
expected_title = f"EventLogTitle-{uuid.uuid4().hex}"
tail = await session.rpc.event_log.tail()
read_task = asyncio.create_task(
session.rpc.event_log.read(
EventLogReadRequest(
cursor=tail.cursor,
max=10,
wait_ms=5000,
types=["session.title_changed"],
)
)
)
await session.rpc.name.set(NameSetRequest(name=expected_title))
read = await asyncio.wait_for(read_task, timeout=10.0)
assert read.cursor_status == EventsCursorStatus.OK
assert all(evt.type.value == "session.title_changed" for evt in read.events)
assert any(
isinstance(evt.data, SessionTitleChangedData) and evt.data.title == expected_title
for evt in read.events
)
finally:
await session.disconnect()