Skip to content

Per-step blocking Langfuse shutdown() + hard-coded sleep(0.5) starves the suspend-recording task → workflow freezes after a step completes #526

Description

@LivinTribunal

Summary

When step-span tracing is enabled, execute_step_task does a blocking Langfuse
flush plus a hard-coded asyncio.sleep(0.5) on the Celery worker for every
step
, before it records STEP_COMPLETED. On a small ForkPool this starves the
task that records the WORKFLOW_SUSPENDED event, so the completion handler's 500 ms
wait for that event times out. The follow-up resume then finds the run isn't
SUSPENDED (CAS fails) and skips, leaving the workflow permanently suspended —
the next step is never dispatched and the flow appears to freeze.

Observed on pyworkflow-engine==0.3.6.

Symptom

A step's work succeeds (LLM calls return, Step completed is logged), but the
workflow never advances to the next step. Intermittent, and strongly correlated
with:

  • tracing enabled (each step pays the blocking flush), and
  • multi-round-trip steps (e.g. an AI-agent step doing tool-call + structured
    output) running concurrently across chat turns, which saturates the worker
    pool with blocking flushes.

Root cause (two layers)

1. Latency source — per-step blocking tracing shutdown (the amplifier).

execute_step_task creates a fresh tracing provider and tears it down on the worker
for every step, synchronously, before recording completion:

pyworkflow/celery/tasks.py:292-307

if tracing and result is not None:
    _tp = create_tracing_provider(tracing)
    if _tp:
        _tp.record_step_span(...)
        run_async(_tp.shutdown())          # ← blocks the worker
# only then:
run_async(_record_step_completion_and_resume(...))

LangfuseTracingProvider.shutdown() does a full blocking flush and an
unconditional 0.5 s sleep
:

pyworkflow/tracing/langfuse.py:177-184

async def shutdown(self) -> None:
    if not self._langfuse:
        return
    try:
        self._langfuse.shutdown()   # drains span queue → HTTP to Langfuse host (network-bound)
        await asyncio.sleep(0.5)    # + guaranteed 0.5s worker hold, every step
    except Exception as e:
        logger.debug(...)

So every step, with tracing on, holds a worker for flush + 0.5 s. Creating and
destroying a Langfuse client per step also defeats the SDK's own background
exporter (it already ships spans asynchronously).

2. The race it exposes — completion waits, resume gives up.

_record_step_completion_and_resume waits only 500 ms for WORKFLOW_SUSPENDED
(pyworkflow/celery/tasks.py:559), then proceeds anyway
(tasks.py:595, "Timeout waiting for WORKFLOW_SUSPENDED event, proceeding with completion").
The resume task then does try_claim_run(SUSPENDED → RUNNING); if the suspend event
landed late the run isn't SUSPENDED, the CAS fails, and it returns without
rescheduling
(tasks.py:2378-2385,
"Workflow status is not SUSPENDED (already claimed) - skipping duplicate resume").
Nothing ever resumes the run.

When the worker pool is starved by (1), the WORKFLOW_SUSPENDED-recording task for a
concurrent run is delayed past the 500 ms window in (2), and the workflow freezes.

Evidence (sanitized)

Two agent runs, same code/config, one worked and one froze — the signature of a
timing race, not a deterministic bug:

# run A — completes fine: WORKFLOW_SUSPENDED recorded ~1.1s after dispatch, resumes, next step runs
... Workflow suspended on worker: step_dispatch:step_AIAgent_...
... Step completed: AIAgent
... SCHEDULE_RESUME: step_completed        → next step executes, flow completes

# run B — freezes: "Workflow suspended on worker" never logged before completion
... Step completed: AIAgent                (T)
... Timeout waiting for WORKFLOW_SUSPENDED event, proceeding with completion   (T + ~1.3s)
... Workflow status is not SUSPENDED (already claimed) - skipping duplicate resume
    → workflow stays suspended, next step never dispatched

The ~1.3 s gap between Step completed and the timeout in run B matches
blocking flush + asyncio.sleep(0.5) + the 500 ms suspend-wait — the completion
handler is delayed by the tracing shutdown while the suspend-recording task is
starved on the same pool.

Suggested fixes

Independent, either helps; both are better:

  • Tracing (biggest, safest win): stop the per-step blocking shutdown() +
    sleep(0.5). Reuse a single long-lived tracing client and either call the
    lighter flush() or rely on the SDK's async exporter; drop the hard-coded
    asyncio.sleep(0.5) entirely. This removes a guaranteed per-step worker hold.
  • Resume race (correctness): when try_claim_run fails in the resume task
    because the run isn't SUSPENDED yet (as opposed to already RUNNING/terminal),
    reschedule instead of returning, or have _record_step_completion_and_resume
    coordinate with the suspend-recording task rather than racing a 500 ms timeout.
    (Overlaps with the CAS/singleton rework in Drop singleton pattern from resume_workflow_task (use try_claim_run CAS as the only guard) #372.)

Related

Environment

  • pyworkflow-engine==0.3.6, Python 3.13, Celery ForkPool, Langfuse tracing enabled.
  • Reproduces provider-agnostically (seen with both OpenAI gpt-5 and Anthropic Opus 4.8
    agent steps); not tied to any model/provider code.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions