Skip to content

Fresh Postgres DB: _initialize_schema() runs migrations before creating base tables → v5 UndefinedTableError #520

Description

@yasha-dev1

Summary

On a fresh PostgreSQL database, PostgresStorageBackend._initialize_schema() runs the schema migrations before creating the base tables. As a result the very first connection aborts with:

asyncpg.exceptions.UndefinedTableError: relation "stream_subscriptions" does not exist

raised from migration v5 (ALTER TABLE stream_subscriptions ADD COLUMN IF NOT EXISTS stream_run_id ...). The schema is left half-created (only schema_versions, signals, signal_acknowledgments exist) and the worker can never run a workflow, because workflow_runs / events / steps / stream_subscriptions were never created.

  • Version: pyworkflow-engine 0.3.6 (reproduced at v0.3.6-1-g45fbe4c)
  • Backend: storage=postgres (PYWORKFLOW_STORAGE_TYPE=postgres)
  • Trigger: any first start against an empty DB — e.g. the Celery worker startup hook _run_startup_migrations() (celery/app.py), or the lazy init on first task. Both go through _initialize_schema().

Root cause

pyworkflow/storage/postgres.py, _initialize_schema() (line 394):

async def _initialize_schema(self) -> None:
    ...
    runner = PostgresMigrationRunner(pool)
    await runner.run_migrations()          # <-- line 403: migrations run FIRST

    async with pool.acquire() as conn:
        await conn.execute("""CREATE TABLE IF NOT EXISTS workflow_runs ( ... )""")  # line 408
        ...
        await conn.execute("""CREATE TABLE IF NOT EXISTS events ( ... )""")          # line 458
        ...                                                                          # base tables created AFTER

run_migrations() (storage/migrations/base.py:228) only records a baseline when it detects a pre-existing schema:

if current_version == 0:                               # base.py:257
    has_existing_schema = await self.detect_existing_schema()
    if has_existing_schema:
        await self.record_baseline_version(1, ...)     # baseline -> skip v1, apply v2..v7
        current_version = 1
pending = self.registry.get_pending(current_version)   # else: applies ALL of v2..v7

and detect_existing_schema() (storage/postgres.py:80) probes for the events table:

SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'events')

On a truly fresh DB, events does not exist yet (it's created later at line 458), so detect_existing_schema() returns False, no baseline is recorded, and the runner applies every migration v2→v7. Migration v5 (storage/postgres.py:188) then does:

await conn.execute(
    "ALTER TABLE stream_subscriptions ADD COLUMN IF NOT EXISTS stream_run_id TEXT NULL"
)

against a stream_subscriptions table that doesn't exist yet (it's part of the base block that runs after migrations) → UndefinedTableError.

In other words: the migrations are written to upgrade an existing base schema (they ALTER base tables, and detect_existing_schema() baselines them away on first run), but _initialize_schema() invokes them before that base schema exists.

Why CI doesn't catch it

The integration tests use InMemoryStorageBackend (no migration runner), so the Postgres fresh-install path is never exercised.

Reproduction

import asyncio
from pyworkflow.config import _load_env_storage_config
from pyworkflow.storage.config import _create_storage_backend

# PYWORKFLOW_STORAGE_TYPE=postgres + PYWORKFLOW_POSTGRES_* pointing at an EMPTY db
async def main():
    backend = _create_storage_backend(_load_env_storage_config())
    await backend.connect()   # -> UndefinedTableError: relation "stream_subscriptions" does not exist

asyncio.run(main())

Workaround (what we did downstream)

We can't reorder library code from the outside, so before the worker starts we run _initialize_schema() once with run_migrations stubbed to a no-op (creating only the base tables), then run the real migrations — at which point detect_existing_schema() sees events, baselines v1, and applies v2→v7 cleanly. Result: 13 tables, schema_versions at the latest version, no error.

import pyworkflow.storage.postgres as pgmod
orig = pgmod.PostgresMigrationRunner.run_migrations
async def _noop(self): return []

# phase 1: base CREATE TABLE block only
pgmod.PostgresMigrationRunner.run_migrations = _noop
await backend.connect()
pgmod.PostgresMigrationRunner.run_migrations = orig

# phase 2: real migrations now that the base tables exist (v1 baseline -> v7)
await pgmod.PostgresMigrationRunner(await backend._get_pool()).run_migrations()

This is obviously a hack (monkeypatching a private method) and shouldn't be necessary.

Proposed fix

Reorder _initialize_schema() so the base CREATE TABLE IF NOT EXISTS block runs first, then run_migrations(). On a fresh DB this makes detect_existing_schema() find events, record the v1 baseline, and apply v2→v7 against tables that now exist; on an already-migrated DB the CREATE TABLE IF NOT EXISTS statements are harmless no-ops and run_migrations() behaves exactly as today.

async def _initialize_schema(self) -> None:
    pool = await self._get_pool()
    # 1) base tables first (CREATE TABLE IF NOT EXISTS ...), so a fresh DB has the
    #    tables the upgrade migrations ALTER, and detect_existing_schema() baselines.
    async with pool.acquire() as conn:
        await conn.execute("CREATE TABLE IF NOT EXISTS workflow_runs ( ... )")
        ...  # the existing base block, moved up
    # 2) then migrations
    await PostgresMigrationRunner(pool).run_migrations()

(Alternatively: in run_migrations(), when current_version == 0 and detect_existing_schema() is False, record a baseline at the latest registry version and skip the upgrade migrations — i.e. add the missing "fresh install" branch. The reorder above is simpler and keeps the base block as the single source of truth for the latest shape.)

Happy to send a PR for the reorder if that's the preferred direction.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Fields

    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions