forked from github/copilot-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhelper.py
More file actions
127 lines (98 loc) · 3.66 KB
/
helper.py
File metadata and controls
127 lines (98 loc) · 3.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
"""
Test helper functions for E2E tests.
"""
import asyncio
import os
from copilot import CopilotSession
async def get_final_assistant_message(session: CopilotSession, timeout: float = 10.0):
"""
Wait for and return the final assistant message from a session turn.
Args:
session: The session to wait on
timeout: Maximum time to wait in seconds
Returns:
The final assistant message event
Raises:
TimeoutError: If no message arrives within timeout
RuntimeError: If a session error occurs
"""
result_future: asyncio.Future = asyncio.get_event_loop().create_future()
final_assistant_message = None
def on_event(event):
nonlocal final_assistant_message
if result_future.done():
return
if event.type.value == "assistant.message":
final_assistant_message = event
elif event.type.value == "session.idle":
if final_assistant_message is not None:
result_future.set_result(final_assistant_message)
elif event.type.value == "session.error":
msg = event.data.message if event.data.message else "session error"
result_future.set_exception(RuntimeError(msg))
# Subscribe to future events
unsubscribe = session.on(on_event)
try:
# Also check existing messages in case the response already arrived
existing = await _get_existing_final_response(session)
if existing is not None:
return existing
return await asyncio.wait_for(result_future, timeout=timeout)
finally:
unsubscribe()
async def _get_existing_final_response(session: CopilotSession):
"""Check existing messages for a final response."""
messages = await session.get_messages()
# Find last user message
final_user_message_index = -1
for i in range(len(messages) - 1, -1, -1):
if messages[i].type.value == "user.message":
final_user_message_index = i
break
if final_user_message_index < 0:
current_turn_messages = messages
else:
current_turn_messages = messages[final_user_message_index:]
# Check for errors
for msg in current_turn_messages:
if msg.type.value == "session.error":
err_msg = msg.data.message if msg.data.message else "session error"
raise RuntimeError(err_msg)
# Find session.idle and get last assistant message before it
session_idle_index = -1
for i, msg in enumerate(current_turn_messages):
if msg.type.value == "session.idle":
session_idle_index = i
break
if session_idle_index != -1:
# Find last assistant.message before session.idle
for i in range(session_idle_index - 1, -1, -1):
if current_turn_messages[i].type.value == "assistant.message":
return current_turn_messages[i]
return None
def write_file(work_dir: str, filename: str, content: str) -> str:
"""
Write content to a file in the work directory.
Args:
work_dir: The working directory
filename: The name of the file
content: The content to write
Returns:
The full path to the created file
"""
filepath = os.path.join(work_dir, filename)
with open(filepath, "w") as f:
f.write(content)
return filepath
def read_file(work_dir: str, filename: str) -> str:
"""
Read content from a file in the work directory.
Args:
work_dir: The working directory
filename: The name of the file
Returns:
The content of the file
"""
filepath = os.path.join(work_dir, filename)
with open(filepath) as f:
return f.read()