Summary
On a fresh PostgreSQL database, PostgresStorageBackend._initialize_schema() runs the schema migrations before creating the base tables. As a result the very first connection aborts with:
asyncpg.exceptions.UndefinedTableError: relation "stream_subscriptions" does not exist
raised from migration v5 (ALTER TABLE stream_subscriptions ADD COLUMN IF NOT EXISTS stream_run_id ...). The schema is left half-created (only schema_versions, signals, signal_acknowledgments exist) and the worker can never run a workflow, because workflow_runs / events / steps / stream_subscriptions were never created.
- Version:
pyworkflow-engine 0.3.6 (reproduced at v0.3.6-1-g45fbe4c)
- Backend:
storage=postgres (PYWORKFLOW_STORAGE_TYPE=postgres)
- Trigger: any first start against an empty DB — e.g. the Celery worker startup hook
_run_startup_migrations() (celery/app.py), or the lazy init on first task. Both go through _initialize_schema().
Root cause
pyworkflow/storage/postgres.py, _initialize_schema() (line 394):
async def _initialize_schema(self) -> None:
...
runner = PostgresMigrationRunner(pool)
await runner.run_migrations() # <-- line 403: migrations run FIRST
async with pool.acquire() as conn:
await conn.execute("""CREATE TABLE IF NOT EXISTS workflow_runs ( ... )""") # line 408
...
await conn.execute("""CREATE TABLE IF NOT EXISTS events ( ... )""") # line 458
... # base tables created AFTER
run_migrations() (storage/migrations/base.py:228) only records a baseline when it detects a pre-existing schema:
if current_version == 0: # base.py:257
has_existing_schema = await self.detect_existing_schema()
if has_existing_schema:
await self.record_baseline_version(1, ...) # baseline -> skip v1, apply v2..v7
current_version = 1
pending = self.registry.get_pending(current_version) # else: applies ALL of v2..v7
and detect_existing_schema() (storage/postgres.py:80) probes for the events table:
SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'events')
On a truly fresh DB, events does not exist yet (it's created later at line 458), so detect_existing_schema() returns False, no baseline is recorded, and the runner applies every migration v2→v7. Migration v5 (storage/postgres.py:188) then does:
await conn.execute(
"ALTER TABLE stream_subscriptions ADD COLUMN IF NOT EXISTS stream_run_id TEXT NULL"
)
against a stream_subscriptions table that doesn't exist yet (it's part of the base block that runs after migrations) → UndefinedTableError.
In other words: the migrations are written to upgrade an existing base schema (they ALTER base tables, and detect_existing_schema() baselines them away on first run), but _initialize_schema() invokes them before that base schema exists.
Why CI doesn't catch it
The integration tests use InMemoryStorageBackend (no migration runner), so the Postgres fresh-install path is never exercised.
Reproduction
import asyncio
from pyworkflow.config import _load_env_storage_config
from pyworkflow.storage.config import _create_storage_backend
# PYWORKFLOW_STORAGE_TYPE=postgres + PYWORKFLOW_POSTGRES_* pointing at an EMPTY db
async def main():
backend = _create_storage_backend(_load_env_storage_config())
await backend.connect() # -> UndefinedTableError: relation "stream_subscriptions" does not exist
asyncio.run(main())
Workaround (what we did downstream)
We can't reorder library code from the outside, so before the worker starts we run _initialize_schema() once with run_migrations stubbed to a no-op (creating only the base tables), then run the real migrations — at which point detect_existing_schema() sees events, baselines v1, and applies v2→v7 cleanly. Result: 13 tables, schema_versions at the latest version, no error.
import pyworkflow.storage.postgres as pgmod
orig = pgmod.PostgresMigrationRunner.run_migrations
async def _noop(self): return []
# phase 1: base CREATE TABLE block only
pgmod.PostgresMigrationRunner.run_migrations = _noop
await backend.connect()
pgmod.PostgresMigrationRunner.run_migrations = orig
# phase 2: real migrations now that the base tables exist (v1 baseline -> v7)
await pgmod.PostgresMigrationRunner(await backend._get_pool()).run_migrations()
This is obviously a hack (monkeypatching a private method) and shouldn't be necessary.
Proposed fix
Reorder _initialize_schema() so the base CREATE TABLE IF NOT EXISTS block runs first, then run_migrations(). On a fresh DB this makes detect_existing_schema() find events, record the v1 baseline, and apply v2→v7 against tables that now exist; on an already-migrated DB the CREATE TABLE IF NOT EXISTS statements are harmless no-ops and run_migrations() behaves exactly as today.
async def _initialize_schema(self) -> None:
pool = await self._get_pool()
# 1) base tables first (CREATE TABLE IF NOT EXISTS ...), so a fresh DB has the
# tables the upgrade migrations ALTER, and detect_existing_schema() baselines.
async with pool.acquire() as conn:
await conn.execute("CREATE TABLE IF NOT EXISTS workflow_runs ( ... )")
... # the existing base block, moved up
# 2) then migrations
await PostgresMigrationRunner(pool).run_migrations()
(Alternatively: in run_migrations(), when current_version == 0 and detect_existing_schema() is False, record a baseline at the latest registry version and skip the upgrade migrations — i.e. add the missing "fresh install" branch. The reorder above is simpler and keeps the base block as the single source of truth for the latest shape.)
Happy to send a PR for the reorder if that's the preferred direction.
Summary
On a fresh PostgreSQL database,
PostgresStorageBackend._initialize_schema()runs the schema migrations before creating the base tables. As a result the very first connection aborts with:raised from migration v5 (
ALTER TABLE stream_subscriptions ADD COLUMN IF NOT EXISTS stream_run_id ...). The schema is left half-created (onlyschema_versions,signals,signal_acknowledgmentsexist) and the worker can never run a workflow, becauseworkflow_runs/events/steps/stream_subscriptionswere never created.pyworkflow-engine0.3.6 (reproduced atv0.3.6-1-g45fbe4c)storage=postgres(PYWORKFLOW_STORAGE_TYPE=postgres)_run_startup_migrations()(celery/app.py), or the lazy init on first task. Both go through_initialize_schema().Root cause
pyworkflow/storage/postgres.py,_initialize_schema()(line 394):run_migrations()(storage/migrations/base.py:228) only records a baseline when it detects a pre-existing schema:and
detect_existing_schema()(storage/postgres.py:80) probes for theeventstable:On a truly fresh DB,
eventsdoes not exist yet (it's created later at line 458), sodetect_existing_schema()returnsFalse, no baseline is recorded, and the runner applies every migration v2→v7. Migration v5 (storage/postgres.py:188) then does:against a
stream_subscriptionstable that doesn't exist yet (it's part of the base block that runs after migrations) →UndefinedTableError.In other words: the migrations are written to upgrade an existing base schema (they
ALTERbase tables, anddetect_existing_schema()baselines them away on first run), but_initialize_schema()invokes them before that base schema exists.Why CI doesn't catch it
The integration tests use
InMemoryStorageBackend(no migration runner), so the Postgres fresh-install path is never exercised.Reproduction
Workaround (what we did downstream)
We can't reorder library code from the outside, so before the worker starts we run
_initialize_schema()once withrun_migrationsstubbed to a no-op (creating only the base tables), then run the real migrations — at which pointdetect_existing_schema()seesevents, baselines v1, and applies v2→v7 cleanly. Result: 13 tables,schema_versionsat the latest version, no error.This is obviously a hack (monkeypatching a private method) and shouldn't be necessary.
Proposed fix
Reorder
_initialize_schema()so the baseCREATE TABLE IF NOT EXISTSblock runs first, thenrun_migrations(). On a fresh DB this makesdetect_existing_schema()findevents, record the v1 baseline, and apply v2→v7 against tables that now exist; on an already-migrated DB theCREATE TABLE IF NOT EXISTSstatements are harmless no-ops andrun_migrations()behaves exactly as today.(Alternatively: in
run_migrations(), whencurrent_version == 0anddetect_existing_schema()isFalse, record a baseline at the latest registry version and skip the upgrade migrations — i.e. add the missing "fresh install" branch. The reorder above is simpler and keeps the base block as the single source of truth for the latest shape.)Happy to send a PR for the reorder if that's the preferred direction.