Skip to content

feat(workflow): enable parallel branch fanout with streamed resume#11

Merged
matheusgalvao1 merged 3 commits intomainfrom
codex/parallel-agent-fanout
Feb 25, 2026
Merged

feat(workflow): enable parallel branch fanout with streamed resume#11
matheusgalvao1 merged 3 commits intomainfrom
codex/parallel-agent-fanout

Conversation

@matheusgalvao1
Copy link
Collaborator

Summary

  • execute branch fanout concurrently in workflow engine, including deferred branch handling around pause/resume
  • add streamed resume path server/client side so resumed runs emit live updates and keep run state consistent
  • fix parallel chat loading behavior and guard collapsed multi-condition outputs from ambiguous wiring

Validation

  • npm test
  • npm --workspace apps/web run typecheck

@matheusgalvao1
Copy link
Collaborator Author

Hey @maria-codesignal, can you give it a try to the parallel workflows to see if you spot any bugs?

We should be able to launch multiple concurrent agents from any node now.

@coderabbitai
Copy link

coderabbitai bot commented Feb 25, 2026

📝 Walkthrough

Walkthrough

Adds streaming resume endpoints and per-run resume concurrency control on the server; wires optional engine log handlers and centralizes log wiring/cleanup. Introduces resumeWorkflowStream and a shared requestWorkflowStream on the client, refactors runWorkflowStream to use it. Updates the web editor to per-node spinner tracking, makes ports optionally non-connectable with disabled styling, and deduplicates connections. Migrates and expands the workflow engine implementation to TypeScript with approval contexts, deferred node queues, previousOutput propagation, and a setOnLog setter. Removes types package declarations and updates tsconfig path mappings.

Possibly related PRs

  • PR #5: Overlaps server run-stream/resume endpoints, client streaming API changes, and WorkflowEngine.setOnLog log-wiring adjustments.
  • PR #9: Modifies the workflow editor (createPort signature, spinner APIs, approval handling) with direct overlap in editor changes.
  • PR #10: Related edits to server workflow routes and workflow run/result shaping that intersect with centralized log handling.
🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main changes: enabling parallel branch fanout and adding streamed resume capability to workflows.
Description check ✅ Passed The description is directly related to the changeset, covering the four main objectives: parallel branch fanout, streamed resume, chat loading fixes, and multi-condition output guards.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (2)
packages/workflow-engine/src/index.ts (1)

667-675: Consider adding the running check explicitly for clarity.

shouldSkipPostNodePropagation implicitly allows propagation when status === 'running' or when status === 'paused' && waitingForInput. The latter case relies on processConnections to defer work. A short comment explaining why paused && waitingForInput is intentionally allowed through would improve maintainability.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/workflow-engine/src/index.ts` around lines 667 - 675, Update
shouldSkipPostNodePropagation to explicitly handle the 'running' status (e.g.,
check for this.status === 'running' rather than relying on implicit
fall-through) and add a short inline comment explaining why the case 'paused &&
waitingForInput' is intentionally allowed through — note that processConnections
will defer work in that scenario; reference the shouldSkipPostNodePropagation
function and processConnections to locate where to add the explicit check and
comment.
apps/server/src/routes/workflows.ts (1)

220-222: Clearing the log handler in the non-stream /resume path is correct but worth noting.

setEngineOnLog(engine) (no handler) clears any previously-wired streaming listener. This is fine for the non-stream path but means a race between /resume and an active /resume-stream on the same runId would cause the stream to silently lose log events. If this is a concern, consider guarding or documenting the mutual exclusion.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/server/src/routes/workflows.ts` around lines 220 - 222,
setEngineOnLog(engine) without a handler clears any streaming listener, which
can drop logs if a concurrent /resume-stream is active; update the /resume flow
(around setEngineOnLog(engine) and engine.resume) to detect or guard against an
active stream for the same runId before clearing the handler—either by checking
an "active stream" flag on the engine or run record, acquiring a short
mutex/lock for that runId, or refusing/queuing the non-stream resume when a
stream is active (and ensure the check references setEngineOnLog(engine),
engine.resume and the runId used by /resume-stream).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@apps/web/src/app/workflow-editor.ts`:
- Around line 2558-2564: The call to handleRunResult in submitApprovalDecision
is missing the fromStream flag so post-streaming it re-renders the chat and
duplicates messages; update the call in submitApprovalDecision to pass
fromStream = true when calling handleRunResult(result) (same approach used by
runWorkflow), so that handleRunResult knows logs were streamed via onLogEntry
and skips re-rendering via renderChatFromLogs; locate resumeWorkflowStream,
onLogEntry, and handleRunResult in the diff and add the fromStream argument
accordingly.
- Around line 2180-2188: Delete the dead helper method getApprovalNextNodes from
the workflow editor class: remove its entire definition (the function that
computes targetNodeIds from getRunConnections and filters getRunNodes) since it
has no call sites; ensure you don't alter submitApprovalDecision or
resumeWorkflowStream (which already handle approval flow) and run tests/TS
compile to confirm no references remain to getApprovalNextNodes.

In `@packages/workflow-engine/src/index.ts`:
- Around line 355-361: The parallel branch race is caused by processNode writing
to the shared this.state.previous_output while nextNodes are awaited in
Promise.all; change processNode usage so concurrent branches do not mutate
shared state: have processNode return its output (instead of writing directly to
this.state.previous_output) and in the caller (where nextNodes is iterated)
collect the returned outputs from Promise.all, then decide deterministically
how/when to set this.state.previous_output (e.g., aggregate, pick a
deterministic index, or set only when there is a single nextNode); update or
remove the write to this.state.previous_output inside processNode and adjust any
callers that rely on the previous_output default parameter to read the returned
value or the decided aggregated value instead.
- Around line 207-210: The current parallel execution of
this.processConnections(...) and this.drainDeferredNodes() can orphan nodes
deferred by processConnections; change the call order to run sequentially: first
await this.processConnections(currentNode.id, connections, previousOutput) and
then await this.drainDeferredNodes() so any nodes deferred during
processConnections are immediately drained; mirror the approach used in run()
where processNode is handled before drainDeferredNodes() to avoid leaving
deferred nodes until the next resume().

---

Nitpick comments:
In `@apps/server/src/routes/workflows.ts`:
- Around line 220-222: setEngineOnLog(engine) without a handler clears any
streaming listener, which can drop logs if a concurrent /resume-stream is
active; update the /resume flow (around setEngineOnLog(engine) and
engine.resume) to detect or guard against an active stream for the same runId
before clearing the handler—either by checking an "active stream" flag on the
engine or run record, acquiring a short mutex/lock for that runId, or
refusing/queuing the non-stream resume when a stream is active (and ensure the
check references setEngineOnLog(engine), engine.resume and the runId used by
/resume-stream).

In `@packages/workflow-engine/src/index.ts`:
- Around line 667-675: Update shouldSkipPostNodePropagation to explicitly handle
the 'running' status (e.g., check for this.status === 'running' rather than
relying on implicit fall-through) and add a short inline comment explaining why
the case 'paused && waitingForInput' is intentionally allowed through — note
that processConnections will defer work in that scenario; reference the
shouldSkipPostNodePropagation function and processConnections to locate where to
add the explicit check and comment.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9a3f41e and d93f931.

📒 Files selected for processing (9)
  • apps/server/src/routes/workflows.ts
  • apps/web/src/app/workflow-editor.ts
  • apps/web/src/services/api.ts
  • apps/web/src/workflow-editor.css
  • packages/types/src/index.d.ts
  • packages/types/src/index.js
  • packages/workflow-engine/src/index.js
  • packages/workflow-engine/src/index.ts
  • tsconfig.base.json
💤 Files with no reviewable changes (3)
  • packages/types/src/index.js
  • packages/workflow-engine/src/index.js
  • packages/types/src/index.d.ts

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (2)
packages/workflow-engine/src/index.ts (2)

207-222: Sequential processConnectionsdrainDeferredNodes correctly addresses the previously reported orphaned-deferred-node risk.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/workflow-engine/src/index.ts` around lines 207 - 222, The review
comment contains a duplicate/contradictory meta tag—remove the redundant
"[duplicate_comment]" (or other duplicate review markers) from the PR comment
while leaving the code as-is; no functional changes are needed to the sequential
flow that calls processConnections, drainDeferredNodes, dequeuePendingApproval,
sets currentNodeId/waitingForInput/status, or to the log(...) invocation.

255-331: writeSharedPreviousOutput=false in parallel paths correctly addresses the previous_output mutation race reported in the prior review.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/workflow-engine/src/index.ts` around lines 255 - 331, Remove the
duplicated review/approval marker in the PR comment and verify the processNode
logic around parallel branches: ensure the 'if' case returns early (so it
doesn't mutate state.previous_output) and that processConnections is invoked
with the writeSharedPreviousOutput flag passed through (see processNode,
evaluateIfNodeConnections, and processConnections) so that callers can set
writeSharedPreviousOutput=false for parallel paths; delete the duplicate
"[approve_code_changes][duplicate_comment]" line and confirm no additional
mutations of state.previous_output occur outside the controlled
writeSharedPreviousOutput flow.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@apps/server/src/routes/workflows.ts`:
- Around line 185-224: After successfully acquiring the resume lock with
tryAcquireResumeLock(runId, 'stream'), ensure the releaseReleaseLock always runs
by moving the try/finally to start immediately after lock acquisition (i.e.,
wrap everything from res.setHeader/.../res.flushHeaders() through the
engine.resume/persistResult/sendEvent flow) so that any throw from
res.flushHeaders() still executes releaseResumeLock(runId, 'stream'); also keep
setEngineOnLog(engine) reset in the finally block and preserve the existing
error handling that sends events and logs via logger.error.

In `@packages/workflow-engine/src/index.ts`:
- Around line 333-370: processConnections can route the same downstream node
from two parallel branches into deferConnections while the engine is paused, and
enqueueDeferredNode currently has no duplicate guard so the node can be queued
twice; fix this by making enqueueDeferredNode idempotent — change its
implementation (and the backing storage used by
deferConnections/drainDeferredNodes) to track deferred node ids (e.g., a Set or
a map keyed by WorkflowNode.id) and only add a node if its id is not already
present, ensuring deferConnections/processConnections keep calling
enqueueDeferredNode but duplicates are prevented and drainDeferredNodes
processes each deferred node exactly once.

---

Duplicate comments:
In `@packages/workflow-engine/src/index.ts`:
- Around line 207-222: The review comment contains a duplicate/contradictory
meta tag—remove the redundant "[duplicate_comment]" (or other duplicate review
markers) from the PR comment while leaving the code as-is; no functional changes
are needed to the sequential flow that calls processConnections,
drainDeferredNodes, dequeuePendingApproval, sets
currentNodeId/waitingForInput/status, or to the log(...) invocation.
- Around line 255-331: Remove the duplicated review/approval marker in the PR
comment and verify the processNode logic around parallel branches: ensure the
'if' case returns early (so it doesn't mutate state.previous_output) and that
processConnections is invoked with the writeSharedPreviousOutput flag passed
through (see processNode, evaluateIfNodeConnections, and processConnections) so
that callers can set writeSharedPreviousOutput=false for parallel paths; delete
the duplicate "[approve_code_changes][duplicate_comment]" line and confirm no
additional mutations of state.previous_output occur outside the controlled
writeSharedPreviousOutput flow.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d93f931 and 908cf13.

📒 Files selected for processing (3)
  • apps/server/src/routes/workflows.ts
  • apps/web/src/app/workflow-editor.ts
  • packages/workflow-engine/src/index.ts

Comment on lines +185 to 224
if (!tryAcquireResumeLock(runId, 'stream')) {
res.status(409).json({ error: 'Run is already being resumed. Use a single resume endpoint per run.' });
return;
}

res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.flushHeaders();

let clientDisconnected = false;
res.on('close', () => { clientDisconnected = true; });

const sendEvent = (data: object) => {
if (clientDisconnected) return;
res.write(`data: ${JSON.stringify(data)}\n\n`);
};

try {
setEngineOnLog(engine, (entry) => sendEvent({ type: 'log', entry }));
sendEvent({ type: 'start', runId });

const result = await engine.resume(input as Record<string, unknown>);
await persistResult(engine, result);

if (result.status !== 'paused') {
removeWorkflow(runId);
}

const workflow = getEngineWorkflow(engine);
sendEvent({ type: 'done', result: workflow ? { ...result, workflow } : result });
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
logger.error('Failed to resume workflow stream', message);
sendEvent({ type: 'error', message });
} finally {
releaseResumeLock(runId, 'stream');
setEngineOnLog(engine);
res.end();
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Potential resume lock leak if res.flushHeaders() throws.

The lock is acquired at Line 185, but the try/finally that guarantees releaseResumeLock starts at Line 203. If res.flushHeaders() (Line 193) throws before the try block is entered, the lock is never released, permanently blocking any future resume of this runId.

🔒 Proposed fix — wrap the full post-lock body in the try/finally
     if (!tryAcquireResumeLock(runId, 'stream')) {
       res.status(409).json({ error: 'Run is already being resumed. Use a single resume endpoint per run.' });
       return;
     }

-    res.setHeader('Content-Type', 'text/event-stream');
-    res.setHeader('Cache-Control', 'no-cache');
-    res.setHeader('Connection', 'keep-alive');
-    res.flushHeaders();
-
-    let clientDisconnected = false;
-    res.on('close', () => { clientDisconnected = true; });
-
-    const sendEvent = (data: object) => {
-      if (clientDisconnected) return;
-      res.write(`data: ${JSON.stringify(data)}\n\n`);
-    };
-
     try {
+      res.setHeader('Content-Type', 'text/event-stream');
+      res.setHeader('Cache-Control', 'no-cache');
+      res.setHeader('Connection', 'keep-alive');
+      res.flushHeaders();
+
+      let clientDisconnected = false;
+      res.on('close', () => { clientDisconnected = true; });
+
+      const sendEvent = (data: object) => {
+        if (clientDisconnected) return;
+        res.write(`data: ${JSON.stringify(data)}\n\n`);
+      };
+
       setEngineOnLog(engine, (entry) => sendEvent({ type: 'log', entry }));
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/server/src/routes/workflows.ts` around lines 185 - 224, After
successfully acquiring the resume lock with tryAcquireResumeLock(runId,
'stream'), ensure the releaseReleaseLock always runs by moving the try/finally
to start immediately after lock acquisition (i.e., wrap everything from
res.setHeader/.../res.flushHeaders() through the
engine.resume/persistResult/sendEvent flow) so that any throw from
res.flushHeaders() still executes releaseResumeLock(runId, 'stream'); also keep
setEngineOnLog(engine) reset in the finally block and preserve the existing
error handling that sends events and logs via logger.error.

Comment on lines +333 to +370
private async processConnections(
sourceNodeId: string,
connections: WorkflowConnection[],
previousOutput: unknown,
writeSharedPreviousOutput = true
): Promise<void> {
if (connections.length === 0) {
return;
}

if (this.status !== 'running') {
if (this.status === 'paused' && this.waitingForInput) {
this.deferConnections(sourceNodeId, connections, previousOutput);
}
return;
}

const nextNodes: WorkflowNode[] = [];
for (const connection of connections) {
const nextNode = this.graph.nodes.find((n) => n.id === connection.target);
if (!nextNode) {
this.log(sourceNodeId, 'warn', `Connection target "${connection.target}" not found`);
continue;
}
nextNodes.push(nextNode);
}

if (nextNodes.length === 0) {
return;
}

if (nextNodes.length === 1) {
await this.processNode(nextNodes[0], previousOutput, writeSharedPreviousOutput);
return;
}

await Promise.all(nextNodes.map((nextNode) => this.processNode(nextNode, previousOutput, false)));
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Potential duplicate deferred-node enqueue in diamond-shaped parallel topologies.

enqueueDeferredNode has no duplicate guard. If two concurrent parallel branches both converge on the same downstream node while the engine is in paused + waitingForInput state, deferConnections will enqueue that node twice, causing it to execute twice when drainDeferredNodes runs.

🛡️ Proposed fix in `enqueueDeferredNode`
  private enqueueDeferredNode(nodeId: string, previousOutput: unknown): void {
    const queue = this.getDeferredNodeQueue();
+   if (queue.some((entry) => entry.nodeId === nodeId)) return;
    queue.push({ nodeId, previousOutput });
  }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/workflow-engine/src/index.ts` around lines 333 - 370,
processConnections can route the same downstream node from two parallel branches
into deferConnections while the engine is paused, and enqueueDeferredNode
currently has no duplicate guard so the node can be queued twice; fix this by
making enqueueDeferredNode idempotent — change its implementation (and the
backing storage used by deferConnections/drainDeferredNodes) to track deferred
node ids (e.g., a Set or a map keyed by WorkflowNode.id) and only add a node if
its id is not already present, ensuring deferConnections/processConnections keep
calling enqueueDeferredNode but duplicates are prevented and drainDeferredNodes
processes each deferred node exactly once.

@matheusgalvao1 matheusgalvao1 merged commit 51b1681 into main Feb 25, 2026
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant