-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathtest_telemetry_e2e.py
More file actions
251 lines (211 loc) · 10.2 KB
/
test_telemetry_e2e.py
File metadata and controls
251 lines (211 loc) · 10.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
"""
E2E coverage for OpenTelemetry file-exporter integration.
Mirrors ``dotnet/test/TelemetryExportTests.cs`` (snapshot category ``telemetry``):
configures a dedicated client with file-based telemetry, runs a single SDK turn
that calls a custom tool, and validates the exported JSONL spans (root
``invoke_agent``, child ``chat`` and ``execute_tool`` spans, attributes).
Also includes the unit-style coverage from ``dotnet/test/TelemetryTests.cs``:
``TelemetryConfig`` defaults / setters, ``SubprocessConfig.telemetry`` default,
and W3C trace context propagation via ``copilot._telemetry``.
"""
from __future__ import annotations
import asyncio
import json
import os
import uuid
from pathlib import Path
from typing import Any
import pytest
from copilot import CopilotClient, RuntimeConnection, TelemetryConfig
from copilot._telemetry import get_trace_context, trace_context
from copilot.session import PermissionHandler
from copilot.tools import Tool, ToolInvocation, ToolResult
from .testharness import E2ETestContext, get_final_assistant_message
pytestmark = pytest.mark.asyncio(loop_scope="module")
def _string_attribute(entry: dict[str, Any], name: str) -> str | None:
attrs = entry.get("attributes") or {}
value = attrs.get(name)
if value is None:
return None
return value if isinstance(value, str) else json.dumps(value)
def _is_root_span(entry: dict[str, Any]) -> bool:
parent = entry.get("parentSpanId") or ""
return parent in ("", "0000000000000000")
async def _read_telemetry_entries(
path: Path, complete: Any, *, timeout: float = 30.0
) -> list[dict[str, Any]]:
deadline = asyncio.get_event_loop().time() + timeout
while asyncio.get_event_loop().time() < deadline:
if path.exists() and path.stat().st_size > 0:
entries: list[dict[str, Any]] = []
for line in path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line:
continue
entries.append(json.loads(line))
if entries and complete(entries):
return entries
await asyncio.sleep(0.1)
raise TimeoutError(f"Timed out waiting for telemetry records in '{path}'.")
class TestTelemetryExport:
async def test_should_export_file_telemetry_for_sdk_interactions(self, ctx: E2ETestContext):
telemetry_path = Path(ctx.work_dir) / f"telemetry-{uuid.uuid4().hex}.jsonl"
marker = "copilot-sdk-telemetry-e2e"
source_name = "python-sdk-telemetry-e2e"
tool_name = "echo_telemetry_marker"
prompt = (
f"Use the {tool_name} tool with value '{marker}', then respond with TELEMETRY_E2E_DONE."
)
def echo(invocation: ToolInvocation) -> ToolResult:
args = invocation.arguments or {}
return ToolResult(text_result_for_llm=str(args.get("value", "")))
github_token = (
"fake-token-for-e2e-tests" if os.environ.get("GITHUB_ACTIONS") == "true" else None
)
client = CopilotClient(
connection=RuntimeConnection.for_stdio(path=ctx.cli_path),
working_directory=ctx.work_dir,
env=ctx.get_env(),
github_token=github_token,
telemetry=TelemetryConfig(
file_path=str(telemetry_path),
exporter_type="file",
source_name=source_name,
capture_content=True,
),
)
try:
session = await client.create_session(
on_permission_request=PermissionHandler.approve_all,
tools=[
Tool(
name=tool_name,
description="Echoes a marker string for telemetry validation.",
parameters={
"type": "object",
"properties": {"value": {"type": "string", "description": "Marker"}},
"required": ["value"],
},
handler=echo,
)
],
)
session_id = session.session_id
await session.send(prompt)
answer = await get_final_assistant_message(session, timeout=60.0)
assert "TELEMETRY_E2E_DONE" in (answer.data.content or "")
await session.disconnect()
finally:
await client.stop()
entries = await _read_telemetry_entries(
telemetry_path,
lambda items: any(
item.get("type") == "span"
and _string_attribute(item, "gen_ai.operation.name") == "invoke_agent"
for item in items
),
)
spans = [item for item in entries if item.get("type") == "span"]
assert spans
for span in spans:
scope = span.get("instrumentationScope") or {}
assert scope.get("name") == source_name
trace_ids = {s.get("traceId") for s in spans if s.get("traceId")}
assert len(trace_ids) == 1
for span in spans:
status = (span.get("status") or {}).get("code", 0)
assert status != 2, f"span in error state: {span}"
invoke_agent = next(
s for s in spans if _string_attribute(s, "gen_ai.operation.name") == "invoke_agent"
)
assert _string_attribute(invoke_agent, "gen_ai.conversation.id") == session_id
assert _is_root_span(invoke_agent)
invoke_agent_span_id = invoke_agent.get("spanId")
assert invoke_agent_span_id
chat_spans = [s for s in spans if _string_attribute(s, "gen_ai.operation.name") == "chat"]
assert chat_spans
for chat in chat_spans:
assert chat.get("parentSpanId") == invoke_agent_span_id
assert any(
prompt in (_string_attribute(c, "gen_ai.input.messages") or "") for c in chat_spans
)
assert any(
"TELEMETRY_E2E_DONE" in (_string_attribute(c, "gen_ai.output.messages") or "")
for c in chat_spans
)
tool_span = next(
s for s in spans if _string_attribute(s, "gen_ai.operation.name") == "execute_tool"
)
assert tool_span.get("parentSpanId") == invoke_agent_span_id
assert _string_attribute(tool_span, "gen_ai.tool.name") == tool_name
assert (_string_attribute(tool_span, "gen_ai.tool.call.id") or "").strip()
assert (
_string_attribute(tool_span, "gen_ai.tool.call.arguments") == f'{{"value":"{marker}"}}'
)
assert _string_attribute(tool_span, "gen_ai.tool.call.result") == marker
# ---------------------------------------------------------------------------
# Unit-style tests mirroring dotnet/test/TelemetryTests.cs
# ---------------------------------------------------------------------------
class TestTelemetryConfig:
"""Mirrors TelemetryConfig_DefaultValues_AreNull / TelemetryConfig_CanSetAllProperties."""
async def test_default_values_are_unset(self):
# Python's TelemetryConfig is a TypedDict with total=False, so an empty
# constructor leaves every field unset (equivalent to C#'s null defaults).
cfg: TelemetryConfig = TelemetryConfig()
assert cfg.get("otlp_endpoint") is None
assert cfg.get("file_path") is None
assert cfg.get("exporter_type") is None
assert cfg.get("source_name") is None
assert cfg.get("capture_content") is None
async def test_can_set_all_properties(self):
cfg: TelemetryConfig = TelemetryConfig(
otlp_endpoint="http://localhost:4318",
file_path="/tmp/traces.json",
exporter_type="otlp-http",
source_name="my-app",
capture_content=True,
)
assert cfg["otlp_endpoint"] == "http://localhost:4318"
assert cfg["file_path"] == "/tmp/traces.json"
assert cfg["exporter_type"] == "otlp-http"
assert cfg["source_name"] == "my-app"
assert cfg["capture_content"] is True
class TestTelemetryHelpers:
"""Mirrors TelemetryHelpers_Restores_W3C_Trace_Context."""
async def test_restores_w3c_trace_context(self):
# The helpers are a no-op if the OpenTelemetry API is not installed;
# skip the test in that case to keep CI portable.
opentelemetry = pytest.importorskip("opentelemetry")
from opentelemetry import propagate, trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
# Configure a real tracer provider + W3C propagator so the helpers
# actually have something to inject/extract.
previous_provider = trace.get_tracer_provider()
previous_propagator = propagate.get_global_textmap()
trace.set_tracer_provider(TracerProvider())
propagate.set_global_textmap(TraceContextTextMapPropagator())
try:
tracer = trace.get_tracer("copilot-sdk-test")
with tracer.start_as_current_span("parent") as parent:
ctx = get_trace_context()
assert ctx.get("traceparent"), "expected non-empty traceparent under active span"
expected_trace_id = format(parent.get_span_context().trace_id, "032x")
assert expected_trace_id in ctx["traceparent"]
# Now outside any active span, restore the captured headers and
# verify the propagated trace id round-trips.
captured_traceparent = ctx["traceparent"]
captured_tracestate = ctx.get("tracestate")
with trace_context(captured_traceparent, captured_tracestate):
restored = get_trace_context()
assert restored.get("traceparent")
assert expected_trace_id in restored["traceparent"]
# Invalid traceparents should not raise; they simply produce no
# propagated context (matching the C# helper's null return).
with trace_context("not-a-traceparent", None):
bad = get_trace_context()
assert "traceparent" not in bad
finally:
propagate.set_global_textmap(previous_propagator)
trace.set_tracer_provider(previous_provider)
_ = opentelemetry # keep importorskip reference