forked from github/copilot-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_rpc.py
More file actions
236 lines (182 loc) · 8.48 KB
/
test_rpc.py
File metadata and controls
236 lines (182 loc) · 8.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
"""E2E RPC Tests"""
import pytest
from copilot import CopilotClient, PermissionHandler, SubprocessConfig
from copilot.generated.rpc import PingParams
from .testharness import CLI_PATH, E2ETestContext
pytestmark = pytest.mark.asyncio(loop_scope="module")
class TestRpc:
@pytest.mark.asyncio
async def test_should_call_rpc_ping_with_typed_params(self):
"""Test calling rpc.ping with typed params and result"""
client = CopilotClient(SubprocessConfig(cli_path=CLI_PATH, use_stdio=True))
try:
await client.start()
result = await client.rpc.ping(PingParams(message="typed rpc test"))
assert result.message == "pong: typed rpc test"
assert isinstance(result.timestamp, (int, float))
await client.stop()
finally:
await client.force_stop()
@pytest.mark.asyncio
async def test_should_call_rpc_models_list(self):
"""Test calling rpc.models.list with typed result"""
client = CopilotClient(SubprocessConfig(cli_path=CLI_PATH, use_stdio=True))
try:
await client.start()
auth_status = await client.get_auth_status()
if not auth_status.isAuthenticated:
await client.stop()
return
result = await client.rpc.models.list()
assert result.models is not None
assert isinstance(result.models, list)
await client.stop()
finally:
await client.force_stop()
# account.getQuota is defined in schema but not yet implemented in CLI
@pytest.mark.skip(reason="account.getQuota not yet implemented in CLI")
@pytest.mark.asyncio
async def test_should_call_rpc_account_get_quota(self):
"""Test calling rpc.account.getQuota when authenticated"""
client = CopilotClient(SubprocessConfig(cli_path=CLI_PATH, use_stdio=True))
try:
await client.start()
auth_status = await client.get_auth_status()
if not auth_status.isAuthenticated:
await client.stop()
return
result = await client.rpc.account.get_quota()
assert result.quota_snapshots is not None
assert isinstance(result.quota_snapshots, dict)
await client.stop()
finally:
await client.force_stop()
class TestSessionRpc:
# session.model.getCurrent is defined in schema but not yet implemented in CLI
@pytest.mark.skip(reason="session.model.getCurrent not yet implemented in CLI")
async def test_should_call_session_rpc_model_get_current(self, ctx: E2ETestContext):
"""Test calling session.rpc.model.getCurrent"""
session = await ctx.client.create_session(
{"model": "claude-sonnet-4.5", "on_permission_request": PermissionHandler.approve_all}
)
result = await session.rpc.model.get_current()
assert result.model_id is not None
assert isinstance(result.model_id, str)
# session.model.switchTo is defined in schema but not yet implemented in CLI
@pytest.mark.skip(reason="session.model.switchTo not yet implemented in CLI")
async def test_should_call_session_rpc_model_switch_to(self, ctx: E2ETestContext):
"""Test calling session.rpc.model.switchTo"""
from copilot.generated.rpc import SessionModelSwitchToParams
session = await ctx.client.create_session(
{"model": "claude-sonnet-4.5", "on_permission_request": PermissionHandler.approve_all}
)
# Get initial model
before = await session.rpc.model.get_current()
assert before.model_id is not None
# Switch to a different model with reasoning effort
result = await session.rpc.model.switch_to(
SessionModelSwitchToParams(model_id="gpt-4.1", reasoning_effort="high")
)
assert result.model_id == "gpt-4.1"
# Verify the switch persisted
after = await session.rpc.model.get_current()
assert after.model_id == "gpt-4.1"
@pytest.mark.asyncio
async def test_get_and_set_session_mode(self):
"""Test getting and setting session mode"""
from copilot.generated.rpc import Mode, SessionModeSetParams
client = CopilotClient(SubprocessConfig(cli_path=CLI_PATH, use_stdio=True))
try:
await client.start()
session = await client.create_session(
{"on_permission_request": PermissionHandler.approve_all}
)
# Get initial mode (default should be interactive)
initial = await session.rpc.mode.get()
assert initial.mode == Mode.INTERACTIVE
# Switch to plan mode
plan_result = await session.rpc.mode.set(SessionModeSetParams(mode=Mode.PLAN))
assert plan_result.mode == Mode.PLAN
# Verify mode persisted
after_plan = await session.rpc.mode.get()
assert after_plan.mode == Mode.PLAN
# Switch back to interactive
interactive_result = await session.rpc.mode.set(
SessionModeSetParams(mode=Mode.INTERACTIVE)
)
assert interactive_result.mode == Mode.INTERACTIVE
await session.disconnect()
await client.stop()
finally:
await client.force_stop()
@pytest.mark.asyncio
async def test_read_update_and_delete_plan(self):
"""Test reading, updating, and deleting plan"""
from copilot.generated.rpc import SessionPlanUpdateParams
client = CopilotClient(SubprocessConfig(cli_path=CLI_PATH, use_stdio=True))
try:
await client.start()
session = await client.create_session(
{"on_permission_request": PermissionHandler.approve_all}
)
# Initially plan should not exist
initial = await session.rpc.plan.read()
assert initial.exists is False
assert initial.content is None
# Create/update plan
plan_content = "# Test Plan\n\n- Step 1\n- Step 2"
await session.rpc.plan.update(SessionPlanUpdateParams(content=plan_content))
# Verify plan exists and has correct content
after_update = await session.rpc.plan.read()
assert after_update.exists is True
assert after_update.content == plan_content
# Delete plan
await session.rpc.plan.delete()
# Verify plan is deleted
after_delete = await session.rpc.plan.read()
assert after_delete.exists is False
assert after_delete.content is None
await session.disconnect()
await client.stop()
finally:
await client.force_stop()
@pytest.mark.asyncio
async def test_create_list_and_read_workspace_files(self):
"""Test creating, listing, and reading workspace files"""
from copilot.generated.rpc import (
SessionWorkspaceCreateFileParams,
SessionWorkspaceReadFileParams,
)
client = CopilotClient(SubprocessConfig(cli_path=CLI_PATH, use_stdio=True))
try:
await client.start()
session = await client.create_session(
{"on_permission_request": PermissionHandler.approve_all}
)
# Initially no files
initial_files = await session.rpc.workspace.list_files()
assert initial_files.files == []
# Create a file
file_content = "Hello, workspace!"
await session.rpc.workspace.create_file(
SessionWorkspaceCreateFileParams(content=file_content, path="test.txt")
)
# List files
after_create = await session.rpc.workspace.list_files()
assert "test.txt" in after_create.files
# Read file
read_result = await session.rpc.workspace.read_file(
SessionWorkspaceReadFileParams(path="test.txt")
)
assert read_result.content == file_content
# Create nested file
await session.rpc.workspace.create_file(
SessionWorkspaceCreateFileParams(content="Nested content", path="subdir/nested.txt")
)
after_nested = await session.rpc.workspace.list_files()
assert "test.txt" in after_nested.files
assert any("nested.txt" in f for f in after_nested.files)
await session.disconnect()
await client.stop()
finally:
await client.force_stop()