-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathtest_canvas_e2e.py
More file actions
169 lines (143 loc) · 5.77 KB
/
test_canvas_e2e.py
File metadata and controls
169 lines (143 loc) · 5.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
"""E2E tests for canvas RPCs."""
from __future__ import annotations
import pytest
from copilot import (
CanvasAction,
CanvasDeclaration,
CanvasHandler,
)
from copilot.generated.rpc import (
CanvasActionInvokeRequest,
CanvasCloseRequest,
CanvasOpenRequest,
CanvasProviderCloseRequest,
CanvasProviderInvokeActionRequest,
CanvasProviderOpenRequest,
CanvasProviderOpenResult,
)
from copilot.session import CopilotSession, PermissionHandler
from .testharness import E2ETestContext
pytestmark = pytest.mark.asyncio(loop_scope="module")
class _CounterCanvasHandler(CanvasHandler):
def __init__(self) -> None:
self.open_calls: list[CanvasProviderOpenRequest] = []
self.action_calls: list[CanvasProviderInvokeActionRequest] = []
self.close_calls: list[CanvasProviderCloseRequest] = []
async def on_open(self, ctx: CanvasProviderOpenRequest) -> CanvasProviderOpenResult:
self.open_calls.append(ctx)
return CanvasProviderOpenResult(
url="https://example.test/counter",
title="Counter Canvas",
status="ready",
)
async def on_close(self, ctx: CanvasProviderCloseRequest) -> None:
self.close_calls.append(ctx)
async def on_action(self, ctx: CanvasProviderInvokeActionRequest) -> dict[str, int]:
self.action_calls.append(ctx)
return {"newValue": 42}
def _counter_canvas() -> CanvasDeclaration:
return CanvasDeclaration(
id="counter",
display_name="Counter",
description="A simple counter canvas for e2e testing",
input_schema={
"type": "object",
"properties": {"startValue": {"type": "number"}},
},
actions=[
CanvasAction(
name="increment",
description="Increment the counter",
input_schema={
"type": "object",
"properties": {"amount": {"type": "number"}},
},
)
],
)
async def _create_counter_session(
ctx: E2ETestContext,
) -> tuple[_CounterCanvasHandler, CopilotSession]:
handler = _CounterCanvasHandler()
session = await ctx.client.create_session(
on_permission_request=PermissionHandler.approve_all,
canvases=[_counter_canvas()],
canvas_handler=handler,
)
return handler, session
class TestCanvasRpc:
async def test_should_list_canvases(self, ctx: E2ETestContext):
_handler, session = await _create_counter_session(ctx)
try:
result = await session.rpc.canvas.list()
assert len(result.canvases) == 1
assert result.canvases[0].canvas_id == "counter"
assert result.canvases[0].display_name == "Counter"
assert result.canvases[0].description == "A simple counter canvas for e2e testing"
finally:
await session.disconnect()
async def test_should_round_trip_canvas_open(self, ctx: E2ETestContext):
handler, session = await _create_counter_session(ctx)
try:
result = await session.rpc.canvas.open(
CanvasOpenRequest(
canvas_id="counter",
instance_id="counter-1",
input={"startValue": 10},
)
)
assert result.url == "https://example.test/counter"
assert result.title == "Counter Canvas"
assert result.status == "ready"
assert len(handler.open_calls) == 1
assert handler.open_calls[0].canvas_id == "counter"
assert handler.open_calls[0].instance_id == "counter-1"
assert handler.open_calls[0].input == {"startValue": 10}
open_list = await session.rpc.canvas.list_open()
assert len(open_list.open_canvases) == 1
assert open_list.open_canvases[0].instance_id == "counter-1"
finally:
await session.disconnect()
async def test_should_invoke_canvas_action(self, ctx: E2ETestContext):
handler, session = await _create_counter_session(ctx)
try:
await session.rpc.canvas.open(
CanvasOpenRequest(
canvas_id="counter",
instance_id="counter-2",
input={},
)
)
result = await session.rpc.canvas.action.invoke(
CanvasActionInvokeRequest(
action_name="increment",
instance_id="counter-2",
input={"amount": 5},
)
)
assert result == {"result": {"newValue": 42}}
assert len(handler.action_calls) == 1
assert handler.action_calls[0].canvas_id == "counter"
assert handler.action_calls[0].instance_id == "counter-2"
assert handler.action_calls[0].action_name == "increment"
assert handler.action_calls[0].input == {"amount": 5}
finally:
await session.disconnect()
async def test_should_run_close_lifecycle(self, ctx: E2ETestContext):
handler, session = await _create_counter_session(ctx)
try:
await session.rpc.canvas.open(
CanvasOpenRequest(
canvas_id="counter",
instance_id="counter-3",
input={},
)
)
await session.rpc.canvas.close(CanvasCloseRequest(instance_id="counter-3"))
assert len(handler.close_calls) == 1
assert handler.close_calls[0].canvas_id == "counter"
assert handler.close_calls[0].instance_id == "counter-3"
open_list = await session.rpc.canvas.list_open()
assert open_list.open_canvases == []
finally:
await session.disconnect()