-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathindex.py
More file actions
333 lines (309 loc) · 11.3 KB
/
Copy pathindex.py
File metadata and controls
333 lines (309 loc) · 11.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
"""SQLite index over the ``results/`` filesystem for cross-run queries.
The filesystem is the source of truth; the database is a derived, rebuildable
index. :func:`reindex` drops and rebuilds it by scanning ``results/``.
"""
from __future__ import annotations
import json
import sqlite3
from pathlib import Path
from ._util import read_json
from .pier_results import build_pier_summary, iter_pier_trial_summaries
from .storage import Layout
SCHEMA = """
CREATE TABLE IF NOT EXISTS experiments (
slug TEXT PRIMARY KEY,
name TEXT,
description TEXT,
first_seen TEXT
);
CREATE TABLE IF NOT EXISTS runs (
run_id TEXT PRIMARY KEY,
experiment_slug TEXT,
started_at TEXT,
finished_at TEXT,
git_base TEXT,
n_variants INTEGER,
status TEXT
);
CREATE TABLE IF NOT EXISTS variants (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id TEXT,
variant_slug TEXT,
model TEXT,
reasoning_effort TEXT,
agent TEXT,
mode TEXT,
byok INTEGER,
params_json TEXT
);
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id TEXT,
variant_slug TEXT,
task_slug TEXT,
task_name TEXT,
n_trials INTEGER,
success_rate REAL,
resolved INTEGER
);
CREATE TABLE IF NOT EXISTS trials (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id TEXT,
variant_slug TEXT,
task_slug TEXT,
trial_no INTEGER,
session_id TEXT,
exit_code INTEGER,
duration_s REAL,
success INTEGER,
n_turns INTEGER,
n_tool_calls INTEGER,
n_tool_failures INTEGER,
input_tokens INTEGER,
output_tokens INTEGER,
total_tokens INTEGER,
cache_read_tokens INTEGER,
cache_write_tokens INTEGER,
input_tokens_noncached INTEGER,
reasoning_tokens INTEGER,
aiu REAL,
api_duration_ms INTEGER,
n_requests INTEGER,
peak_context_tokens INTEGER,
n_compactions INTEGER,
n_truncations INTEGER,
files_modified INTEGER,
lines_added INTEGER,
lines_removed INTEGER,
model TEXT,
status TEXT,
error TEXT
);
CREATE TABLE IF NOT EXISTS pier_jobs (
job_name TEXT PRIMARY KEY,
job_dir TEXT,
started_at TEXT,
finished_at TEXT,
n_trials INTEGER,
success_rate REAL,
status TEXT
);
CREATE TABLE IF NOT EXISTS pier_trials (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_name TEXT,
variant_slug TEXT,
task_slug TEXT,
trial_name TEXT,
success INTEGER,
status TEXT,
n_turns INTEGER,
n_tool_calls INTEGER,
total_tokens INTEGER,
aiu REAL,
model TEXT,
error TEXT
);
"""
# Columns added after the initial schema. ``connect`` ALTERs any that a pre-existing
# index.db is missing (the index is a derived cache, but this avoids a forced reindex).
_TRIAL_MIGRATIONS = {
"status": "ALTER TABLE trials ADD COLUMN status TEXT",
"error": "ALTER TABLE trials ADD COLUMN error TEXT",
}
def _migrate(conn: sqlite3.Connection) -> None:
existing = {row["name"] for row in conn.execute("PRAGMA table_info(trials)")}
for column, ddl in _TRIAL_MIGRATIONS.items():
if column not in existing:
conn.execute(ddl)
def connect(db_path: Path) -> sqlite3.Connection:
db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
conn.executescript(SCHEMA)
_migrate(conn)
return conn
def index_run_dir(conn: sqlite3.Connection, run_dir: Path) -> None:
"""Insert (or replace) one stored run into the index."""
run = read_json(run_dir / "run.json")
run_id = run["run_id"]
slug = run["experiment_slug"]
conn.execute(
"INSERT OR IGNORE INTO experiments(slug, name, description, first_seen) VALUES (?,?,?,?)",
(
slug,
run.get("experiment_name"),
run.get("experiment_description"),
run.get("started_at"),
),
)
conn.execute("DELETE FROM runs WHERE run_id=?", (run_id,))
conn.execute("DELETE FROM variants WHERE run_id=?", (run_id,))
conn.execute("DELETE FROM tasks WHERE run_id=?", (run_id,))
conn.execute("DELETE FROM trials WHERE run_id=?", (run_id,))
variants = run.get("variants", [])
conn.execute(
"INSERT INTO runs(run_id, experiment_slug, started_at, finished_at, git_base, "
"n_variants, status) VALUES (?,?,?,?,?,?,?)",
(
run_id,
slug,
run.get("started_at"),
run.get("finished_at"),
run.get("git_base"),
len(variants),
run.get("status"),
),
)
for vr in variants:
v = vr["variant"]
vslug = v.get("slug") or v.get("name")
conn.execute(
"INSERT INTO variants(run_id, variant_slug, model, reasoning_effort, agent, mode, "
"byok, params_json) VALUES (?,?,?,?,?,?,?,?)",
(
run_id,
vslug,
v.get("model"),
v.get("reasoning_effort"),
v.get("agent"),
v.get("mode"),
1 if v.get("provider") else 0,
json.dumps(v),
),
)
for tr in vr.get("tasks", []):
task_slug = tr.get("task_slug")
trials = tr.get("trials", [])
graded = [t for t in trials if t.get("success") is not None]
n_solved = sum(1 for t in graded if t.get("success"))
conn.execute(
"INSERT INTO tasks(run_id, variant_slug, task_slug, task_name, n_trials, "
"success_rate, resolved) VALUES (?,?,?,?,?,?,?)",
(
run_id,
vslug,
task_slug,
tr.get("task_name"),
len(trials),
(n_solved / len(graded)) if graded else None,
None if not graded else int(any(t.get("success") for t in graded)),
),
)
for trial in trials:
m = trial.get("metrics", {})
models = m.get("models") or []
conn.execute(
"INSERT INTO trials(run_id, variant_slug, task_slug, trial_no, session_id, "
"exit_code, duration_s, success, n_turns, n_tool_calls, n_tool_failures, "
"input_tokens, output_tokens, total_tokens, cache_read_tokens, "
"cache_write_tokens, input_tokens_noncached, reasoning_tokens, aiu, "
"api_duration_ms, n_requests, peak_context_tokens, n_compactions, "
"n_truncations, files_modified, lines_added, lines_removed, model, "
"status, error) "
"VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
(
run_id,
vslug,
task_slug,
trial.get("trial_no"),
trial.get("session_id"),
trial.get("exit_code"),
trial.get("duration_s"),
None if trial.get("success") is None else int(bool(trial.get("success"))),
m.get("n_turns"),
m.get("n_tool_calls"),
m.get("n_tool_failures"),
m.get("input_tokens"),
m.get("output_tokens"),
m.get("total_tokens"),
m.get("cache_read_tokens"),
m.get("cache_write_tokens"),
m.get("input_tokens_noncached"),
m.get("reasoning_tokens"),
m.get("aiu"),
m.get("api_duration_ms"),
m.get("n_requests"),
m.get("peak_context_tokens"),
m.get("n_compactions"),
m.get("n_truncations"),
m.get("files_modified"),
m.get("lines_added"),
m.get("lines_removed"),
models[-1] if models else v.get("model"),
trial.get("status"),
trial.get("error"),
),
)
conn.commit()
def index_pier_job_dir(conn: sqlite3.Connection, job_dir: Path) -> None:
"""Insert (or replace) one Pier job into the derived index."""
summary = build_pier_summary(job_dir)
job_name = job_dir.name
conn.execute("DELETE FROM pier_jobs WHERE job_name=?", (job_name,))
conn.execute("DELETE FROM pier_trials WHERE job_name=?", (job_name,))
conn.execute(
"INSERT INTO pier_jobs(job_name, job_dir, started_at, finished_at, n_trials, "
"success_rate, status) VALUES (?,?,?,?,?,?,?)",
(
job_name,
str(job_dir),
summary.get("started_at"),
summary.get("finished_at"),
summary.get("n_trials"),
summary.get("overall_success_rate"),
summary.get("status"),
),
)
for trial in iter_pier_trial_summaries(job_dir):
metrics = trial.get("metrics") or {}
conn.execute(
"INSERT INTO pier_trials(job_name, variant_slug, task_slug, trial_name, "
"success, status, n_turns, n_tool_calls, total_tokens, aiu, model, error) "
"VALUES (?,?,?,?,?,?,?,?,?,?,?,?)",
(
job_name,
trial.get("variant"),
trial.get("task"),
trial.get("trial_name"),
None if trial.get("success") is None else int(bool(trial.get("success"))),
trial.get("status"),
metrics.get("n_turns"),
metrics.get("n_tool_calls"),
metrics.get("total_tokens"),
metrics.get("aiu"),
trial.get("model"),
trial.get("error"),
),
)
conn.commit()
def reindex(layout: Layout) -> int:
"""Rebuild the index from scratch by scanning legacy runs and Pier jobs."""
if layout.index_db.exists():
layout.index_db.unlink()
conn = connect(layout.index_db)
count = 0
try:
for _slug, _run_id, run_dir in layout.iter_runs():
index_run_dir(conn, run_dir)
count += 1
for job_dir in layout.iter_pier_jobs():
index_pier_job_dir(conn, job_dir)
count += 1
finally:
conn.close()
return count
def list_runs(layout: Layout) -> list[dict]:
if not layout.index_db.exists():
reindex(layout)
conn = connect(layout.index_db)
try:
rows = conn.execute(
"SELECT r.*, "
"(SELECT COUNT(*) FROM trials t WHERE t.run_id=r.run_id) AS n_trials, "
"(SELECT AVG(success) FROM trials t WHERE t.run_id=r.run_id AND t.success IS NOT NULL)"
" AS success_rate "
"FROM runs r ORDER BY r.started_at"
).fetchall()
return [dict(row) for row in rows]
finally:
conn.close()