-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrunner.py
More file actions
591 lines (524 loc) · 20.2 KB
/
Copy pathrunner.py
File metadata and controls
591 lines (524 loc) · 20.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
"""Orchestrate running an experiment: variants x trials -> result artifacts."""
from __future__ import annotations
import os
import subprocess
import tempfile
from collections.abc import Callable
from pathlib import Path
from ._util import (
force_rmtree,
iso,
new_run_id,
new_session_id,
read_json,
utcnow,
write_json,
write_text,
)
from .analysis import analyze_events
from .auth import INJECTED_TOKEN_ENV_VAR, secret_env_names
from .index import connect, index_run_dir
from .invoker import CopilotInvoker, Invocation, Invoker, MockInvoker
from .models import (
DryRunCheck,
DryRunReport,
Experiment,
ExperimentRun,
Metrics,
Task,
TaskResult,
TrialResult,
TrialStatus,
Variant,
VariantResult,
)
from .report import build_summary, summary_markdown
from .sessionlog import copy_events, load_events, parse_metrics
from .storage import Layout
from .workspace import capture_diff, provision, run_shell
def _git_head(root: Path) -> str | None:
proc = subprocess.run(
["git", "rev-parse", "HEAD"], cwd=str(root), capture_output=True, text=True
)
return proc.stdout.strip() if proc.returncode == 0 else None
def _report(progress: Callable[[str], None] | None, msg: str) -> None:
"""Forward a human-readable progress line to ``progress`` if one is set."""
if progress is not None:
progress(msg)
def run_experiment(
experiment: Experiment,
*,
root: Path | None = None,
invoker: Invoker | None = None,
results_root: Path | None = None,
session_state_root: Path | None = None,
copilot_binary: str = "copilot",
github_token: str | None = None,
progress: Callable[[str], None] | None = None,
copilot_stream: Callable[[str], None] | None = None,
) -> ExperimentRun:
"""Run every variant x trial of ``experiment`` and write result artifacts.
Parameters
----------
root:
Experiment repository root (defaults to the current directory). Fixtures
and experiment definitions are read from here.
invoker:
Strategy used to invoke Copilot. Defaults to :class:`CopilotInvoker`.
Tests pass a :class:`MockInvoker`; :func:`dry_run_experiment` uses one too.
results_root:
Where run artifacts are written. Defaults to ``root/results``. Pointed at a
throwaway temp dir by :func:`dry_run_experiment` so nothing is persisted.
session_state_root:
Where Copilot session state lives. Defaults to ``~/.copilot/session-state``.
github_token:
Token injected into every trial's environment so Copilot is authenticated
without relying on ambient login. Resolved and preflighted by the CLI (see
:mod:`copilot_experiments.auth`). It is never persisted or logged, and the
variable carrying it is added to ``copilot --secret-env-vars``.
progress:
Optional sink for high-level per-trial phase messages (``--verbose``).
copilot_stream:
Optional sink for Copilot's live output, one rendered line at a time
(``--verbose``). Only used when the default :class:`CopilotInvoker` is built.
"""
root = Path(root or Path.cwd()).resolve()
layout = Layout(root, results_root=results_root)
if invoker is None:
invoker = CopilotInvoker(binary=copilot_binary, stream=copilot_stream)
run_id = new_run_id()
run_dir = layout.run_dir(experiment.slug, run_id)
run_dir.mkdir(parents=True, exist_ok=True)
run = ExperimentRun(
run_id=run_id,
experiment_slug=experiment.slug,
experiment_name=experiment.name,
experiment_description=experiment.description,
started_at=iso(utcnow()),
git_base=_git_head(root),
)
for variant in experiment.variants:
_report(progress, f"variant {variant.slug}: {variant.trials} trial(s)")
vr = _run_variant(
experiment,
variant,
layout,
run_id,
invoker,
session_state_root,
github_token,
progress,
)
run.variants.append(vr)
write_json(
layout.variant_dir(experiment.slug, run_id, variant.slug) / "variant.json",
variant.stored(),
)
run.finished_at = iso(utcnow())
run.status = run.rollup_status()
# Write run manifest, summary, and report.
write_json(run_dir / "run.json", run.model_dump(mode="json"))
summary = build_summary(run)
write_json(run_dir / "summary.json", summary)
write_text(run_dir / "summary.md", summary_markdown(summary, experiment.description))
# Update the SQLite index.
conn = connect(layout.index_db)
try:
index_run_dir(conn, run_dir)
finally:
conn.close()
return run
def _run_variant(
experiment: Experiment,
variant: Variant,
layout: Layout,
run_id: str,
invoker: Invoker,
session_state_root: Path | None,
github_token: str | None = None,
progress: Callable[[str], None] | None = None,
) -> VariantResult:
vr = VariantResult(variant=variant)
for task_slug, task in experiment.iter_tasks():
vr.tasks.append(
_run_task(
experiment,
variant,
task_slug,
task,
layout,
run_id,
invoker,
session_state_root,
github_token,
progress,
)
)
return vr
def _run_task(
experiment: Experiment,
variant: Variant,
task_slug: str,
task: Task,
layout: Layout,
run_id: str,
invoker: Invoker,
session_state_root: Path | None,
github_token: str | None = None,
progress: Callable[[str], None] | None = None,
) -> TaskResult:
_report(progress, f"variant {variant.slug} / task {task_slug}: {variant.trials} trial(s)")
task_dir = layout.task_dir(experiment.slug, run_id, variant.slug, task_slug)
task_dir.mkdir(parents=True, exist_ok=True)
write_json(task_dir / "task.json", task.model_dump(mode="json", exclude_none=True))
tr = TaskResult(
task_slug=task_slug,
task_name=task.name,
prompt=task.prompt,
)
for trial_no in range(1, variant.trials + 1):
tr.trials.append(
_run_trial(
experiment,
variant,
task_slug,
task,
trial_no,
layout,
run_id,
invoker,
session_state_root,
github_token,
progress,
)
)
return tr
def _run_trial(
experiment: Experiment,
variant: Variant,
task_slug: str,
task: Task,
trial_no: int,
layout: Layout,
run_id: str,
invoker: Invoker,
session_state_root: Path | None,
github_token: str | None = None,
progress: Callable[[str], None] | None = None,
) -> TrialResult:
tag = f"{variant.slug}/{task_slug}/{trial_no:03d}"
trial_dir = layout.trial_dir(experiment.slug, run_id, variant.slug, task_slug, trial_no)
trial_dir.mkdir(parents=True, exist_ok=True)
workspace = trial_dir / "workspace"
# ``stdout.txt``: the raw combined stdout/stderr of the copilot process (plain text,
# which is what an auth/usage error actually is). ``session.md``: Copilot's own
# markdown transcript (``--share``). ``events.jsonl`` (copied below) stays the
# structured data source.
stdout_path = trial_dir / "stdout.txt"
share_path = trial_dir / "session.md"
# Copilot's own --log-dir debug log is large (megabytes) and echoes masked auth
# material; keep it in an ephemeral temp dir so it never lands under results/.
# The session events.jsonl (copied below) is our real data source -- see ADR-0010.
log_dir = Path(tempfile.mkdtemp(prefix="copilot-log-"))
session_id = new_session_id()
metrics = Metrics()
success: bool | None = None
exit_code = -1
duration_s = 0.0
status: TrialStatus = "ok"
error: str | None = None
error_artifact: str | None = None
try:
write_text(trial_dir / "prompt.md", task.prompt)
provision(task, workspace, layout.root)
_report(progress, f"[{tag}] workspace provisioned -> {workspace}")
env_overrides: dict[str, str] = {}
if github_token:
env_overrides[INJECTED_TOKEN_ENV_VAR] = github_token
otel_path = trial_dir / "copilot-otel.jsonl"
_configure_otel_env(
env_overrides,
variant,
session_id=session_id,
otel_path=otel_path,
experiment_slug=experiment.slug,
task_slug=task_slug,
trial_no=trial_no,
)
inv = Invocation(
prompt=task.prompt,
workspace=workspace,
session_id=session_id,
variant=variant,
log_dir=log_dir,
stdout_path=stdout_path,
session_state_root=session_state_root or _default_session_state_root(),
env_overrides=env_overrides,
share_path=share_path,
secret_env_names=secret_env_names(
variant.env, byok_secrets=variant.provider is not None
),
)
_report(progress, f"[{tag}] invoking copilot (session {session_id})")
result = invoker.run(inv)
exit_code = result.exit_code
duration_s = result.duration_s
_report(
progress,
f"[{tag}] copilot exited {exit_code} in {duration_s:.1f}s",
)
# Collect the session events and parse metrics.
copy_events(session_id, trial_dir / "events.jsonl", inv.session_state_root)
events = load_events(trial_dir / "events.jsonl")
metrics = parse_metrics(events)
if metrics.duration_s is None:
metrics.duration_s = round(duration_s, 3)
_report(
progress,
f"[{tag}] session log: {len(events)} events -> {metrics.n_turns} turns, "
f"{metrics.n_tool_calls} tool calls, {metrics.total_tokens or 0} tokens",
)
# Build and persist the richer session analysis (timeline, tool histogram).
otel_records = load_events(otel_path) if otel_path.exists() else None
analysis = analyze_events(events, otel_records)
write_json(trial_dir / "analysis.json", analysis.model_dump(mode="json"))
# Capture what changed in the workspace.
write_text(trial_dir / "workspace.diff", capture_diff(workspace))
# Run the verification command, if any.
if task.verify:
code, output = run_shell(task.verify, workspace)
success = code == 0
write_json(
trial_dir / "verify.json",
{"command": task.verify, "exit_code": code, "success": success, "output": output},
)
_report(progress, f"[{tag}] verify: {'pass' if success else 'fail'} (exit {code})")
# Copilot ran, but did it actually do anything? A non-zero exit or an empty
# session log (0 turns) means it never really started -- an infra/harness
# problem (bad auth, bad working dir), not the experiment failing on merit.
no_session_log = len(events) == 0 and metrics.n_turns == 0
if exit_code != 0 or no_session_log:
status = "copilot_failed"
reasons = []
if exit_code != 0:
reasons.append(f"copilot exited {exit_code}")
if no_session_log:
reasons.append("no session log captured (0 turns)")
error = "; ".join(reasons)
error_artifact = stdout_path.name
_report(progress, f"[{tag}] copilot did not run cleanly: {error}")
except Exception as exc: # noqa: BLE001 - any pipeline failure is a harness error
status = "harness_error"
error = f"{type(exc).__name__}: {exc}"
error_artifact = stdout_path.name if stdout_path.exists() else None
_report(progress, f"[{tag}] harness error: {error}")
finally:
force_rmtree(log_dir)
trial = TrialResult(
trial_no=trial_no,
session_id=session_id,
exit_code=exit_code,
duration_s=round(duration_s, 3),
success=success,
metrics=metrics,
status=status,
error=error,
error_artifact=error_artifact,
)
write_json(
trial_dir / "meta.json",
{
"trial_no": trial_no,
"session_id": session_id,
"exit_code": exit_code,
"duration_s": trial.duration_s,
"success": success,
"status": status,
"error": error,
"error_artifact": error_artifact,
"workspace": str(workspace),
},
)
write_json(trial_dir / "metrics.json", metrics.model_dump(mode="json"))
return trial
def _default_session_state_root() -> Path:
from .sessionlog import session_state_root
return session_state_root()
def _configure_otel_env(
env_overrides: dict[str, str],
variant: Variant,
*,
session_id: str,
otel_path: Path,
experiment_slug: str,
task_slug: str,
trial_no: int,
) -> None:
if not _otel_destination_configured(env_overrides, variant):
env_overrides["COPILOT_OTEL_FILE_EXPORTER_PATH"] = str(otel_path.resolve())
if not _otel_env_active(env_overrides, variant):
return
_setdefault_child_env(env_overrides, variant, "COPILOT_OTEL_SOURCE_NAME", "copilot-experiments")
_setdefault_child_env(env_overrides, variant, "OTEL_SERVICE_NAME", "copilot-experiments")
_append_otel_resource_attributes(
env_overrides,
variant,
{
"copilot.session_id": session_id,
"copilot.experiment": experiment_slug,
"copilot.variant": variant.slug,
"copilot.task": task_slug,
"copilot.trial": f"{trial_no:03d}",
},
)
def _env_value(env_overrides: dict[str, str], variant: Variant, name: str) -> str | None:
return env_overrides.get(name) or variant.env.get(name) or os.environ.get(name)
def _setdefault_child_env(
env_overrides: dict[str, str], variant: Variant, name: str, value: str
) -> None:
if _env_value(env_overrides, variant, name) is None:
env_overrides[name] = value
def _otel_destination_configured(env_overrides: dict[str, str], variant: Variant) -> bool:
return bool(
_env_value(env_overrides, variant, "COPILOT_OTEL_FILE_EXPORTER_PATH")
or _env_value(env_overrides, variant, "OTEL_EXPORTER_OTLP_ENDPOINT")
)
def _otel_env_active(env_overrides: dict[str, str], variant: Variant) -> bool:
return bool(
_env_value(env_overrides, variant, "COPILOT_OTEL_ENABLED")
or _env_value(env_overrides, variant, "COPILOT_OTEL_FILE_EXPORTER_PATH")
or _env_value(env_overrides, variant, "OTEL_EXPORTER_OTLP_ENDPOINT")
)
def _append_otel_resource_attributes(
env_overrides: dict[str, str], variant: Variant, attributes: dict[str, str]
) -> None:
existing = _env_value(env_overrides, variant, "OTEL_RESOURCE_ATTRIBUTES") or ""
existing_keys = {
part.split("=", 1)[0].strip()
for part in existing.split(",")
if "=" in part and part.split("=", 1)[0].strip()
}
additions = [f"{key}={value}" for key, value in attributes.items() if key not in existing_keys]
if additions:
env_overrides["OTEL_RESOURCE_ATTRIBUTES"] = ",".join(
[part for part in (existing, *additions) if part]
)
# --------------------------------------------------------------------------- #
# Dry-run: validate the whole pipeline, persist nothing
# --------------------------------------------------------------------------- #
def dry_run_experiment(
experiment: Experiment,
*,
root: Path | None = None,
invoker: Invoker | None = None,
) -> DryRunReport:
"""Validate the full run pipeline without leaving anything behind.
Runs every stage with a mock invoker inside a throwaway temp directory,
asserts that each stage produced its artifact, then deletes the temp dir.
Fixtures are still read from ``root``; only the *outputs* are redirected.
Returns a :class:`DryRunReport` -- nothing is persisted under ``root``.
"""
root = Path(root or Path.cwd())
tmp = Path(tempfile.mkdtemp(prefix="copilot-exp-dryrun-"))
try:
run = run_experiment(
experiment,
root=root,
invoker=invoker or MockInvoker(),
results_root=tmp,
session_state_root=tmp / ".session-state",
)
layout = Layout(root, results_root=tmp)
checks = _validate_plumbing(layout, experiment, run)
return DryRunReport(experiment=experiment.name, checks=checks)
finally:
force_rmtree(tmp)
def _check(name: str, ok: bool, detail: str = "") -> DryRunCheck:
return DryRunCheck(name=name, ok=ok, detail=detail)
def _validate_plumbing(
layout: Layout, experiment: Experiment, run: ExperimentRun
) -> list[DryRunCheck]:
"""Inspect the on-disk artifacts of the first trial (and the run) and report
whether each pipeline stage actually did its job."""
checks: list[DryRunCheck] = []
variant = experiment.variants[0]
task_slug, task = experiment.iter_tasks()[0]
run_dir = layout.run_dir(experiment.slug, run.run_id)
trial_dir = layout.trial_dir(experiment.slug, run.run_id, variant.slug, task_slug, 1)
workspace = trial_dir / "workspace"
# 1. Workspace provisioned with a git baseline.
head = _git_head(workspace) if workspace.exists() else None
checks.append(
_check(
"workspace provisioned",
workspace.exists() and head is not None,
f"git baseline {head[:10]}" if head else "no workspace / git HEAD",
)
)
# 2. Session log captured and parseable.
events_path = trial_dir / "events.jsonl"
n_events = 0
if events_path.exists():
try:
n_events = len(load_events(events_path))
except Exception: # pragma: no cover - defensive
n_events = 0
checks.append(
_check("session log captured", events_path.exists() and n_events >= 1, f"{n_events} events")
)
# 3. OTel file captured for per-call economics.
otel_path = trial_dir / "copilot-otel.jsonl"
n_otel = 0
if otel_path.exists():
try:
n_otel = len(load_events(otel_path))
except Exception: # pragma: no cover - defensive
n_otel = 0
checks.append(_check("otel captured", otel_path.exists() and n_otel >= 1, f"{n_otel} records"))
# 4. Metrics parsed from the session log.
metrics_path = trial_dir / "metrics.json"
n_turns = int(read_json(metrics_path).get("n_turns") or 0) if metrics_path.exists() else 0
checks.append(
_check("metrics parsed", metrics_path.exists() and n_turns >= 1, f"{n_turns} turns")
)
# 5. Session analysis written.
checks.append(_check("analysis written", (trial_dir / "analysis.json").exists()))
# 6. Workspace diff captured and non-empty -- this is what caught the MAX_PATH bug.
diff_path = trial_dir / "workspace.diff"
diff = diff_path.read_text(encoding="utf-8") if diff_path.exists() else ""
checks.append(
_check(
"workspace diff captured",
diff.strip() != "",
f"{len(diff)} bytes" if diff.strip() else "empty diff (invoker changed nothing?)",
)
)
# 6. Verification ran (we only assert it ran, not that it passed).
if task.verify:
checks.append(_check("verify ran", (trial_dir / "verify.json").exists()))
# 7. Run-level summary written.
checks.append(
_check(
"run summary written",
(run_dir / "summary.json").exists() and (run_dir / "summary.md").exists(),
)
)
# 7b. Task axis present on disk (variants/<v>/tasks/<task>/...).
checks.append(
_check(
"task dir present",
layout.task_dir(experiment.slug, run.run_id, variant.slug, task_slug).is_dir(),
f"tasks/{task_slug}",
)
)
# 8. Run recorded in the SQLite index.
indexed = False
if layout.index_db.exists():
conn = connect(layout.index_db)
try:
row = conn.execute("SELECT 1 FROM runs WHERE run_id = ?", (run.run_id,)).fetchone()
indexed = row is not None
finally:
conn.close()
checks.append(_check("indexed", indexed))
return checks