feat(workflow): enable parallel branch fanout with streamed resume#11
feat(workflow): enable parallel branch fanout with streamed resume#11matheusgalvao1 merged 3 commits intomainfrom
Conversation
|
Hey @maria-codesignal, can you give it a try to the parallel workflows to see if you spot any bugs? We should be able to launch multiple concurrent agents from any node now. |
📝 WalkthroughWalkthroughAdds streaming resume endpoints and per-run resume concurrency control on the server; wires optional engine log handlers and centralizes log wiring/cleanup. Introduces resumeWorkflowStream and a shared requestWorkflowStream on the client, refactors runWorkflowStream to use it. Updates the web editor to per-node spinner tracking, makes ports optionally non-connectable with disabled styling, and deduplicates connections. Migrates and expands the workflow engine implementation to TypeScript with approval contexts, deferred node queues, previousOutput propagation, and a setOnLog setter. Removes types package declarations and updates tsconfig path mappings. Possibly related PRs
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (2)
packages/workflow-engine/src/index.ts (1)
667-675: Consider adding therunningcheck explicitly for clarity.
shouldSkipPostNodePropagationimplicitly allows propagation whenstatus === 'running'or whenstatus === 'paused' && waitingForInput. The latter case relies onprocessConnectionsto defer work. A short comment explaining whypaused && waitingForInputis intentionally allowed through would improve maintainability.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/workflow-engine/src/index.ts` around lines 667 - 675, Update shouldSkipPostNodePropagation to explicitly handle the 'running' status (e.g., check for this.status === 'running' rather than relying on implicit fall-through) and add a short inline comment explaining why the case 'paused && waitingForInput' is intentionally allowed through — note that processConnections will defer work in that scenario; reference the shouldSkipPostNodePropagation function and processConnections to locate where to add the explicit check and comment.apps/server/src/routes/workflows.ts (1)
220-222: Clearing the log handler in the non-stream/resumepath is correct but worth noting.
setEngineOnLog(engine)(no handler) clears any previously-wired streaming listener. This is fine for the non-stream path but means a race between/resumeand an active/resume-streamon the samerunIdwould cause the stream to silently lose log events. If this is a concern, consider guarding or documenting the mutual exclusion.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/server/src/routes/workflows.ts` around lines 220 - 222, setEngineOnLog(engine) without a handler clears any streaming listener, which can drop logs if a concurrent /resume-stream is active; update the /resume flow (around setEngineOnLog(engine) and engine.resume) to detect or guard against an active stream for the same runId before clearing the handler—either by checking an "active stream" flag on the engine or run record, acquiring a short mutex/lock for that runId, or refusing/queuing the non-stream resume when a stream is active (and ensure the check references setEngineOnLog(engine), engine.resume and the runId used by /resume-stream).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@apps/web/src/app/workflow-editor.ts`:
- Around line 2558-2564: The call to handleRunResult in submitApprovalDecision
is missing the fromStream flag so post-streaming it re-renders the chat and
duplicates messages; update the call in submitApprovalDecision to pass
fromStream = true when calling handleRunResult(result) (same approach used by
runWorkflow), so that handleRunResult knows logs were streamed via onLogEntry
and skips re-rendering via renderChatFromLogs; locate resumeWorkflowStream,
onLogEntry, and handleRunResult in the diff and add the fromStream argument
accordingly.
- Around line 2180-2188: Delete the dead helper method getApprovalNextNodes from
the workflow editor class: remove its entire definition (the function that
computes targetNodeIds from getRunConnections and filters getRunNodes) since it
has no call sites; ensure you don't alter submitApprovalDecision or
resumeWorkflowStream (which already handle approval flow) and run tests/TS
compile to confirm no references remain to getApprovalNextNodes.
In `@packages/workflow-engine/src/index.ts`:
- Around line 355-361: The parallel branch race is caused by processNode writing
to the shared this.state.previous_output while nextNodes are awaited in
Promise.all; change processNode usage so concurrent branches do not mutate
shared state: have processNode return its output (instead of writing directly to
this.state.previous_output) and in the caller (where nextNodes is iterated)
collect the returned outputs from Promise.all, then decide deterministically
how/when to set this.state.previous_output (e.g., aggregate, pick a
deterministic index, or set only when there is a single nextNode); update or
remove the write to this.state.previous_output inside processNode and adjust any
callers that rely on the previous_output default parameter to read the returned
value or the decided aggregated value instead.
- Around line 207-210: The current parallel execution of
this.processConnections(...) and this.drainDeferredNodes() can orphan nodes
deferred by processConnections; change the call order to run sequentially: first
await this.processConnections(currentNode.id, connections, previousOutput) and
then await this.drainDeferredNodes() so any nodes deferred during
processConnections are immediately drained; mirror the approach used in run()
where processNode is handled before drainDeferredNodes() to avoid leaving
deferred nodes until the next resume().
---
Nitpick comments:
In `@apps/server/src/routes/workflows.ts`:
- Around line 220-222: setEngineOnLog(engine) without a handler clears any
streaming listener, which can drop logs if a concurrent /resume-stream is
active; update the /resume flow (around setEngineOnLog(engine) and
engine.resume) to detect or guard against an active stream for the same runId
before clearing the handler—either by checking an "active stream" flag on the
engine or run record, acquiring a short mutex/lock for that runId, or
refusing/queuing the non-stream resume when a stream is active (and ensure the
check references setEngineOnLog(engine), engine.resume and the runId used by
/resume-stream).
In `@packages/workflow-engine/src/index.ts`:
- Around line 667-675: Update shouldSkipPostNodePropagation to explicitly handle
the 'running' status (e.g., check for this.status === 'running' rather than
relying on implicit fall-through) and add a short inline comment explaining why
the case 'paused && waitingForInput' is intentionally allowed through — note
that processConnections will defer work in that scenario; reference the
shouldSkipPostNodePropagation function and processConnections to locate where to
add the explicit check and comment.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
apps/server/src/routes/workflows.tsapps/web/src/app/workflow-editor.tsapps/web/src/services/api.tsapps/web/src/workflow-editor.csspackages/types/src/index.d.tspackages/types/src/index.jspackages/workflow-engine/src/index.jspackages/workflow-engine/src/index.tstsconfig.base.json
💤 Files with no reviewable changes (3)
- packages/types/src/index.js
- packages/workflow-engine/src/index.js
- packages/types/src/index.d.ts
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (2)
packages/workflow-engine/src/index.ts (2)
207-222: SequentialprocessConnections→drainDeferredNodescorrectly addresses the previously reported orphaned-deferred-node risk.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/workflow-engine/src/index.ts` around lines 207 - 222, The review comment contains a duplicate/contradictory meta tag—remove the redundant "[duplicate_comment]" (or other duplicate review markers) from the PR comment while leaving the code as-is; no functional changes are needed to the sequential flow that calls processConnections, drainDeferredNodes, dequeuePendingApproval, sets currentNodeId/waitingForInput/status, or to the log(...) invocation.
255-331:writeSharedPreviousOutput=falsein parallel paths correctly addresses theprevious_outputmutation race reported in the prior review.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/workflow-engine/src/index.ts` around lines 255 - 331, Remove the duplicated review/approval marker in the PR comment and verify the processNode logic around parallel branches: ensure the 'if' case returns early (so it doesn't mutate state.previous_output) and that processConnections is invoked with the writeSharedPreviousOutput flag passed through (see processNode, evaluateIfNodeConnections, and processConnections) so that callers can set writeSharedPreviousOutput=false for parallel paths; delete the duplicate "[approve_code_changes][duplicate_comment]" line and confirm no additional mutations of state.previous_output occur outside the controlled writeSharedPreviousOutput flow.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@apps/server/src/routes/workflows.ts`:
- Around line 185-224: After successfully acquiring the resume lock with
tryAcquireResumeLock(runId, 'stream'), ensure the releaseReleaseLock always runs
by moving the try/finally to start immediately after lock acquisition (i.e.,
wrap everything from res.setHeader/.../res.flushHeaders() through the
engine.resume/persistResult/sendEvent flow) so that any throw from
res.flushHeaders() still executes releaseResumeLock(runId, 'stream'); also keep
setEngineOnLog(engine) reset in the finally block and preserve the existing
error handling that sends events and logs via logger.error.
In `@packages/workflow-engine/src/index.ts`:
- Around line 333-370: processConnections can route the same downstream node
from two parallel branches into deferConnections while the engine is paused, and
enqueueDeferredNode currently has no duplicate guard so the node can be queued
twice; fix this by making enqueueDeferredNode idempotent — change its
implementation (and the backing storage used by
deferConnections/drainDeferredNodes) to track deferred node ids (e.g., a Set or
a map keyed by WorkflowNode.id) and only add a node if its id is not already
present, ensuring deferConnections/processConnections keep calling
enqueueDeferredNode but duplicates are prevented and drainDeferredNodes
processes each deferred node exactly once.
---
Duplicate comments:
In `@packages/workflow-engine/src/index.ts`:
- Around line 207-222: The review comment contains a duplicate/contradictory
meta tag—remove the redundant "[duplicate_comment]" (or other duplicate review
markers) from the PR comment while leaving the code as-is; no functional changes
are needed to the sequential flow that calls processConnections,
drainDeferredNodes, dequeuePendingApproval, sets
currentNodeId/waitingForInput/status, or to the log(...) invocation.
- Around line 255-331: Remove the duplicated review/approval marker in the PR
comment and verify the processNode logic around parallel branches: ensure the
'if' case returns early (so it doesn't mutate state.previous_output) and that
processConnections is invoked with the writeSharedPreviousOutput flag passed
through (see processNode, evaluateIfNodeConnections, and processConnections) so
that callers can set writeSharedPreviousOutput=false for parallel paths; delete
the duplicate "[approve_code_changes][duplicate_comment]" line and confirm no
additional mutations of state.previous_output occur outside the controlled
writeSharedPreviousOutput flow.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
apps/server/src/routes/workflows.tsapps/web/src/app/workflow-editor.tspackages/workflow-engine/src/index.ts
| if (!tryAcquireResumeLock(runId, 'stream')) { | ||
| res.status(409).json({ error: 'Run is already being resumed. Use a single resume endpoint per run.' }); | ||
| return; | ||
| } | ||
|
|
||
| res.setHeader('Content-Type', 'text/event-stream'); | ||
| res.setHeader('Cache-Control', 'no-cache'); | ||
| res.setHeader('Connection', 'keep-alive'); | ||
| res.flushHeaders(); | ||
|
|
||
| let clientDisconnected = false; | ||
| res.on('close', () => { clientDisconnected = true; }); | ||
|
|
||
| const sendEvent = (data: object) => { | ||
| if (clientDisconnected) return; | ||
| res.write(`data: ${JSON.stringify(data)}\n\n`); | ||
| }; | ||
|
|
||
| try { | ||
| setEngineOnLog(engine, (entry) => sendEvent({ type: 'log', entry })); | ||
| sendEvent({ type: 'start', runId }); | ||
|
|
||
| const result = await engine.resume(input as Record<string, unknown>); | ||
| await persistResult(engine, result); | ||
|
|
||
| if (result.status !== 'paused') { | ||
| removeWorkflow(runId); | ||
| } | ||
|
|
||
| const workflow = getEngineWorkflow(engine); | ||
| sendEvent({ type: 'done', result: workflow ? { ...result, workflow } : result }); | ||
| } catch (error) { | ||
| const message = error instanceof Error ? error.message : String(error); | ||
| logger.error('Failed to resume workflow stream', message); | ||
| sendEvent({ type: 'error', message }); | ||
| } finally { | ||
| releaseResumeLock(runId, 'stream'); | ||
| setEngineOnLog(engine); | ||
| res.end(); | ||
| } |
There was a problem hiding this comment.
Potential resume lock leak if res.flushHeaders() throws.
The lock is acquired at Line 185, but the try/finally that guarantees releaseResumeLock starts at Line 203. If res.flushHeaders() (Line 193) throws before the try block is entered, the lock is never released, permanently blocking any future resume of this runId.
🔒 Proposed fix — wrap the full post-lock body in the try/finally
if (!tryAcquireResumeLock(runId, 'stream')) {
res.status(409).json({ error: 'Run is already being resumed. Use a single resume endpoint per run.' });
return;
}
- res.setHeader('Content-Type', 'text/event-stream');
- res.setHeader('Cache-Control', 'no-cache');
- res.setHeader('Connection', 'keep-alive');
- res.flushHeaders();
-
- let clientDisconnected = false;
- res.on('close', () => { clientDisconnected = true; });
-
- const sendEvent = (data: object) => {
- if (clientDisconnected) return;
- res.write(`data: ${JSON.stringify(data)}\n\n`);
- };
-
try {
+ res.setHeader('Content-Type', 'text/event-stream');
+ res.setHeader('Cache-Control', 'no-cache');
+ res.setHeader('Connection', 'keep-alive');
+ res.flushHeaders();
+
+ let clientDisconnected = false;
+ res.on('close', () => { clientDisconnected = true; });
+
+ const sendEvent = (data: object) => {
+ if (clientDisconnected) return;
+ res.write(`data: ${JSON.stringify(data)}\n\n`);
+ };
+
setEngineOnLog(engine, (entry) => sendEvent({ type: 'log', entry }));🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/server/src/routes/workflows.ts` around lines 185 - 224, After
successfully acquiring the resume lock with tryAcquireResumeLock(runId,
'stream'), ensure the releaseReleaseLock always runs by moving the try/finally
to start immediately after lock acquisition (i.e., wrap everything from
res.setHeader/.../res.flushHeaders() through the
engine.resume/persistResult/sendEvent flow) so that any throw from
res.flushHeaders() still executes releaseResumeLock(runId, 'stream'); also keep
setEngineOnLog(engine) reset in the finally block and preserve the existing
error handling that sends events and logs via logger.error.
| private async processConnections( | ||
| sourceNodeId: string, | ||
| connections: WorkflowConnection[], | ||
| previousOutput: unknown, | ||
| writeSharedPreviousOutput = true | ||
| ): Promise<void> { | ||
| if (connections.length === 0) { | ||
| return; | ||
| } | ||
|
|
||
| if (this.status !== 'running') { | ||
| if (this.status === 'paused' && this.waitingForInput) { | ||
| this.deferConnections(sourceNodeId, connections, previousOutput); | ||
| } | ||
| return; | ||
| } | ||
|
|
||
| const nextNodes: WorkflowNode[] = []; | ||
| for (const connection of connections) { | ||
| const nextNode = this.graph.nodes.find((n) => n.id === connection.target); | ||
| if (!nextNode) { | ||
| this.log(sourceNodeId, 'warn', `Connection target "${connection.target}" not found`); | ||
| continue; | ||
| } | ||
| nextNodes.push(nextNode); | ||
| } | ||
|
|
||
| if (nextNodes.length === 0) { | ||
| return; | ||
| } | ||
|
|
||
| if (nextNodes.length === 1) { | ||
| await this.processNode(nextNodes[0], previousOutput, writeSharedPreviousOutput); | ||
| return; | ||
| } | ||
|
|
||
| await Promise.all(nextNodes.map((nextNode) => this.processNode(nextNode, previousOutput, false))); | ||
| } |
There was a problem hiding this comment.
Potential duplicate deferred-node enqueue in diamond-shaped parallel topologies.
enqueueDeferredNode has no duplicate guard. If two concurrent parallel branches both converge on the same downstream node while the engine is in paused + waitingForInput state, deferConnections will enqueue that node twice, causing it to execute twice when drainDeferredNodes runs.
🛡️ Proposed fix in `enqueueDeferredNode`
private enqueueDeferredNode(nodeId: string, previousOutput: unknown): void {
const queue = this.getDeferredNodeQueue();
+ if (queue.some((entry) => entry.nodeId === nodeId)) return;
queue.push({ nodeId, previousOutput });
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@packages/workflow-engine/src/index.ts` around lines 333 - 370,
processConnections can route the same downstream node from two parallel branches
into deferConnections while the engine is paused, and enqueueDeferredNode
currently has no duplicate guard so the node can be queued twice; fix this by
making enqueueDeferredNode idempotent — change its implementation (and the
backing storage used by deferConnections/drainDeferredNodes) to track deferred
node ids (e.g., a Set or a map keyed by WorkflowNode.id) and only add a node if
its id is not already present, ensuring deferConnections/processConnections keep
calling enqueueDeferredNode but duplicates are prevented and drainDeferredNodes
processes each deferred node exactly once.
Summary
Validation