Skip to content

feat: WorkerRuntime — execution context SPI for orchestrating workers that need to spawn sub-cases or signal the case #485

@mdproctor

Description

@mdproctor

Context

SequenceWorker (#484) needs to spawn sub-cases as steps in a sequence and wait for them to complete before proceeding to the next step. The engine already has SubCaseTarget (declarative sub-case spawning via Binding) and PlanItemCompletionApplier (@transactional — atomic write-back when a sub-case completes). WorkItem spawning via the work-adapter is already atomic.

What is missing is the imperative API surface — a way for a Worker function (or SequenceWorker internally) to trigger a sub-case spawn and await its completion programmatically, leveraging the existing atomic machinery rather than duplicating it.

What already exists

  • SubCaseTarget — declarative sub-case binding target in casehub-engine-api
  • SubCaseCompletionService — tracks sub-case completion
  • PlanItemCompletionApplier@Transactional, atomically applies WorkItem completion to PlanItem and fires CONTEXT_CHANGED
  • WorkItemLifecycleAdapter — bridges quarkus-work WorkItem lifecycle to engine PlanItem transitions

The spawning and atomic write-back is already correct. What's missing is a caller-facing handle.

What is needed

A WorkerRuntime handle injected into orchestrating workers, backed by the existing Sub-case and WorkItem machinery:

// Orchestrating worker that needs to spawn a sub-case step
Worker expansionOrchestrator = Worker.builder()
    .function((CaseContext ctx, WorkerRuntime runtime) -> {
        // Uses existing SubCaseTarget + SubCaseCompletionService under the hood
        CaseContext subResult = runtime.spawnAndAwait("economy-expansion", ctx.getData(), Duration.ofMinutes(5));
        return Map.of("expansion.result", subResult.get("expansion.status"));
    })
    .build();

spawnAndAwait() should delegate to SubCaseTarget + SubCaseCompletionService — not reimplement spawning. The transaction boundary stays with PlanItemCompletionApplier.

Proposed API (imperative surface only — no new spawning logic)

public interface WorkerRuntime {
    /** Spawn a sub-case using existing SubCaseTarget machinery; block until terminal. */
    CaseContext spawnAndAwait(String caseType, Map<String, Object> input, Duration timeout);

    /** Non-blocking spawn — use awaitSubCase() separately. */
    UUID spawn(String caseType, Map<String, Object> input);

    /** Block until the given sub-case reaches a terminal state. */
    CaseContext awaitSubCase(UUID subCaseId, Duration timeout);

    /** Signal a context change on the current case. */
    void signal(String path, Object value);

    /** Event log snapshot for the current case. */
    List<CaseEventLogRecord> eventLog();

    UUID caseId();
}

Acceptance criteria

  • WorkerRuntime interface in casehub-engine-api
  • Engine injects WorkerRuntime when worker function declares the two-arg (CaseContext, WorkerRuntime) form
  • spawnAndAwait() delegates to existing SubCaseTarget + SubCaseCompletionService — no new spawning logic
  • Transaction boundary remains in PlanItemCompletionApplier — not duplicated here
  • Leaf workers (single-arg Function<CaseContext, Map>) unaffected — zero overhead
  • Integration test: orchestrating worker spawns sub-case, reads result, returns merged output

Related

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions