From fa54d9989db85961d1763000433f7b4a4de2d222 Mon Sep 17 00:00:00 2001 From: waleed Date: Sun, 18 Jan 2026 19:35:01 -0800 Subject: [PATCH 1/2] improvement(executor): upgraded abort controller to handle aborts for loops and parallels --- apps/sim/executor/execution/engine.test.ts | 599 +++++++++++++++++++++ apps/sim/executor/execution/engine.ts | 68 ++- 2 files changed, 656 insertions(+), 11 deletions(-) create mode 100644 apps/sim/executor/execution/engine.test.ts diff --git a/apps/sim/executor/execution/engine.test.ts b/apps/sim/executor/execution/engine.test.ts new file mode 100644 index 0000000000..f93ebc2068 --- /dev/null +++ b/apps/sim/executor/execution/engine.test.ts @@ -0,0 +1,599 @@ +/** + * @vitest-environment node + */ +import { loggerMock } from '@sim/testing' +import { afterEach, beforeEach, describe, expect, it, type Mock, vi } from 'vitest' + +vi.mock('@sim/logger', () => loggerMock) + +vi.mock('@/lib/execution/cancellation', () => ({ + isExecutionCancelled: vi.fn(), + isRedisCancellationEnabled: vi.fn(), +})) + +import { isExecutionCancelled, isRedisCancellationEnabled } from '@/lib/execution/cancellation' +import type { DAG, DAGNode } from '@/executor/dag/builder' +import type { EdgeManager } from '@/executor/execution/edge-manager' +import type { NodeExecutionOrchestrator } from '@/executor/orchestrators/node' +import type { ExecutionContext } from '@/executor/types' +import type { SerializedBlock } from '@/serializer/types' +import { ExecutionEngine } from './engine' + +function createMockBlock(id: string): SerializedBlock { + return { + id, + metadata: { id: 'test', name: 'Test Block' }, + position: { x: 0, y: 0 }, + config: { tool: '', params: {} }, + inputs: {}, + outputs: {}, + enabled: true, + } +} + +function createMockNode(id: string, blockType = 'test'): DAGNode { + return { + id, + block: { + ...createMockBlock(id), + metadata: { id: blockType, name: `Block ${id}` }, + }, + outgoingEdges: new Map(), + incomingEdges: new Set(), + metadata: {}, + } +} + +function createMockContext(overrides: Partial = {}): ExecutionContext { + return { + workflowId: 'test-workflow', + workspaceId: 'test-workspace', + executionId: 'test-execution', + userId: 'test-user', + blockStates: new Map(), + executedBlocks: new Set(), + blockLogs: [], + loopExecutions: new Map(), + parallelExecutions: new Map(), + completedLoops: new Set(), + activeExecutionPath: new Set(), + metadata: { + executionId: 'test-execution', + startTime: new Date().toISOString(), + pendingBlocks: [], + }, + envVars: {}, + ...overrides, + } +} + +function createMockDAG(nodes: DAGNode[]): DAG { + const nodeMap = new Map() + nodes.forEach((node) => nodeMap.set(node.id, node)) + return { + nodes: nodeMap, + loopConfigs: new Map(), + parallelConfigs: new Map(), + } +} + +interface MockEdgeManager extends EdgeManager { + processOutgoingEdges: ReturnType +} + +function createMockEdgeManager( + processOutgoingEdgesImpl?: (node: DAGNode) => string[] +): MockEdgeManager { + const mockFn = vi.fn().mockImplementation(processOutgoingEdgesImpl || (() => [])) + return { + processOutgoingEdges: mockFn, + isNodeReady: vi.fn().mockReturnValue(true), + deactivateEdgeAndDescendants: vi.fn(), + restoreIncomingEdge: vi.fn(), + clearDeactivatedEdges: vi.fn(), + clearDeactivatedEdgesForNodes: vi.fn(), + } as unknown as MockEdgeManager +} + +interface MockNodeOrchestrator extends NodeExecutionOrchestrator { + executionCount: number +} + +function createMockNodeOrchestrator(executeDelay = 0): MockNodeOrchestrator { + const mock = { + executionCount: 0, + executeNode: vi.fn().mockImplementation(async () => { + mock.executionCount++ + if (executeDelay > 0) { + await new Promise((resolve) => setTimeout(resolve, executeDelay)) + } + return { nodeId: 'test', output: {}, isFinalOutput: false } + }), + handleNodeCompletion: vi.fn(), + } + return mock as unknown as MockNodeOrchestrator +} + +describe('ExecutionEngine', () => { + beforeEach(() => { + vi.clearAllMocks() + ;(isExecutionCancelled as Mock).mockResolvedValue(false) + ;(isRedisCancellationEnabled as Mock).mockReturnValue(false) + }) + + afterEach(() => { + vi.useRealTimers() + }) + + describe('Normal execution', () => { + it('should execute a simple linear workflow', async () => { + const startNode = createMockNode('start', 'starter') + const endNode = createMockNode('end', 'function') + startNode.outgoingEdges.set('edge1', { target: 'end' }) + endNode.incomingEdges.add('start') + + const dag = createMockDAG([startNode, endNode]) + const context = createMockContext() + const edgeManager = createMockEdgeManager((node) => { + if (node.id === 'start') return ['end'] + return [] + }) + const nodeOrchestrator = createMockNodeOrchestrator() + + const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) + const result = await engine.run('start') + + expect(result.success).toBe(true) + expect(nodeOrchestrator.executionCount).toBe(2) + }) + + it('should mark execution as successful when completed without cancellation', async () => { + const startNode = createMockNode('start', 'starter') + const dag = createMockDAG([startNode]) + const context = createMockContext() + const edgeManager = createMockEdgeManager() + const nodeOrchestrator = createMockNodeOrchestrator() + + const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) + const result = await engine.run('start') + + expect(result.success).toBe(true) + expect(result.status).toBeUndefined() + }) + + it('should execute all nodes in a multi-node workflow', async () => { + const nodes = [ + createMockNode('start', 'starter'), + createMockNode('middle1', 'function'), + createMockNode('middle2', 'function'), + createMockNode('end', 'function'), + ] + + nodes[0].outgoingEdges.set('e1', { target: 'middle1' }) + nodes[1].outgoingEdges.set('e2', { target: 'middle2' }) + nodes[2].outgoingEdges.set('e3', { target: 'end' }) + + const dag = createMockDAG(nodes) + const context = createMockContext() + const edgeManager = createMockEdgeManager((node) => { + if (node.id === 'start') return ['middle1'] + if (node.id === 'middle1') return ['middle2'] + if (node.id === 'middle2') return ['end'] + return [] + }) + const nodeOrchestrator = createMockNodeOrchestrator() + + const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) + const result = await engine.run('start') + + expect(result.success).toBe(true) + expect(nodeOrchestrator.executionCount).toBe(4) + }) + }) + + describe('Cancellation via AbortSignal', () => { + it('should stop execution immediately when aborted before start', async () => { + const abortController = new AbortController() + abortController.abort() + + const startNode = createMockNode('start', 'starter') + const dag = createMockDAG([startNode]) + const context = createMockContext({ abortSignal: abortController.signal }) + const edgeManager = createMockEdgeManager() + const nodeOrchestrator = createMockNodeOrchestrator() + + const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) + const result = await engine.run('start') + + expect(result.status).toBe('cancelled') + expect(nodeOrchestrator.executionCount).toBe(0) + }) + + it('should stop execution when aborted mid-workflow', async () => { + const abortController = new AbortController() + + const nodes = Array.from({ length: 5 }, (_, i) => createMockNode(`node${i}`, 'function')) + for (let i = 0; i < nodes.length - 1; i++) { + nodes[i].outgoingEdges.set(`e${i}`, { target: `node${i + 1}` }) + } + + const dag = createMockDAG(nodes) + const context = createMockContext({ abortSignal: abortController.signal }) + + let callCount = 0 + const edgeManager = createMockEdgeManager((node) => { + callCount++ + if (callCount === 2) abortController.abort() + const idx = Number.parseInt(node.id.replace('node', '')) + if (idx < 4) return [`node${idx + 1}`] + return [] + }) + const nodeOrchestrator = createMockNodeOrchestrator() + + const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) + const result = await engine.run('node0') + + expect(result.success).toBe(false) + expect(result.status).toBe('cancelled') + expect(nodeOrchestrator.executionCount).toBeLessThan(5) + }) + + it('should not wait for slow executions when cancelled', async () => { + const abortController = new AbortController() + + const startNode = createMockNode('start', 'starter') + const slowNode = createMockNode('slow', 'function') + startNode.outgoingEdges.set('edge1', { target: 'slow' }) + + const dag = createMockDAG([startNode, slowNode]) + const context = createMockContext({ abortSignal: abortController.signal }) + const edgeManager = createMockEdgeManager((node) => { + if (node.id === 'start') return ['slow'] + return [] + }) + const nodeOrchestrator = createMockNodeOrchestrator(500) + + const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) + + const executionPromise = engine.run('start') + setTimeout(() => abortController.abort(), 50) + + const startTime = Date.now() + const result = await executionPromise + const duration = Date.now() - startTime + + expect(result.status).toBe('cancelled') + expect(duration).toBeLessThan(400) + }) + + it('should return cancelled status even if error thrown during cancellation', async () => { + const abortController = new AbortController() + abortController.abort() + + const startNode = createMockNode('start', 'starter') + const dag = createMockDAG([startNode]) + const context = createMockContext({ abortSignal: abortController.signal }) + const edgeManager = createMockEdgeManager() + const nodeOrchestrator = createMockNodeOrchestrator() + + const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) + const result = await engine.run('start') + + expect(result.status).toBe('cancelled') + expect(result.success).toBe(false) + }) + }) + + describe('Cancellation via Redis', () => { + it('should check Redis for cancellation when enabled', async () => { + ;(isRedisCancellationEnabled as Mock).mockReturnValue(true) + ;(isExecutionCancelled as Mock).mockResolvedValue(false) + + const startNode = createMockNode('start', 'starter') + const dag = createMockDAG([startNode]) + const context = createMockContext() + const edgeManager = createMockEdgeManager() + const nodeOrchestrator = createMockNodeOrchestrator() + + const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) + await engine.run('start') + + expect(isExecutionCancelled as Mock).toHaveBeenCalled() + }) + + it('should stop execution when Redis reports cancellation', async () => { + ;(isRedisCancellationEnabled as Mock).mockReturnValue(true) + + let checkCount = 0 + ;(isExecutionCancelled as Mock).mockImplementation(async () => { + checkCount++ + return checkCount > 1 + }) + + const nodes = Array.from({ length: 5 }, (_, i) => createMockNode(`node${i}`, 'function')) + for (let i = 0; i < nodes.length - 1; i++) { + nodes[i].outgoingEdges.set(`e${i}`, { target: `node${i + 1}` }) + } + + const dag = createMockDAG(nodes) + const context = createMockContext() + const edgeManager = createMockEdgeManager((node) => { + const idx = Number.parseInt(node.id.replace('node', '')) + if (idx < 4) return [`node${idx + 1}`] + return [] + }) + const nodeOrchestrator = createMockNodeOrchestrator(150) + + const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) + const result = await engine.run('node0') + + expect(result.success).toBe(false) + expect(result.status).toBe('cancelled') + }) + + it('should respect cancellation check interval', async () => { + ;(isRedisCancellationEnabled as Mock).mockReturnValue(true) + ;(isExecutionCancelled as Mock).mockResolvedValue(false) + + const startNode = createMockNode('start', 'starter') + const dag = createMockDAG([startNode]) + const context = createMockContext() + const edgeManager = createMockEdgeManager() + const nodeOrchestrator = createMockNodeOrchestrator() + + const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) + await engine.run('start') + + expect((isExecutionCancelled as Mock).mock.calls.length).toBeGreaterThanOrEqual(1) + }) + }) + + describe('Loop execution with cancellation', () => { + it('should break out of loop when cancelled mid-iteration', async () => { + const abortController = new AbortController() + + const loopStartNode = createMockNode('loop-start', 'loop_sentinel') + loopStartNode.metadata = { isSentinel: true, sentinelType: 'start', loopId: 'loop1' } + + const loopBodyNode = createMockNode('loop-body', 'function') + loopBodyNode.metadata = { isLoopNode: true, loopId: 'loop1' } + + const loopEndNode = createMockNode('loop-end', 'loop_sentinel') + loopEndNode.metadata = { isSentinel: true, sentinelType: 'end', loopId: 'loop1' } + + loopStartNode.outgoingEdges.set('edge1', { target: 'loop-body' }) + loopBodyNode.outgoingEdges.set('edge2', { target: 'loop-end' }) + loopEndNode.outgoingEdges.set('loop_continue', { + target: 'loop-start', + sourceHandle: 'loop_continue', + }) + + const dag = createMockDAG([loopStartNode, loopBodyNode, loopEndNode]) + const context = createMockContext({ abortSignal: abortController.signal }) + + let iterationCount = 0 + const edgeManager = createMockEdgeManager((node) => { + if (node.id === 'loop-start') return ['loop-body'] + if (node.id === 'loop-body') return ['loop-end'] + if (node.id === 'loop-end') { + iterationCount++ + if (iterationCount === 3) abortController.abort() + return ['loop-start'] + } + return [] + }) + const nodeOrchestrator = createMockNodeOrchestrator(5) + + const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) + const result = await engine.run('loop-start') + + expect(result.status).toBe('cancelled') + expect(iterationCount).toBeLessThan(100) + }) + }) + + describe('Parallel execution with cancellation', () => { + it('should stop queueing parallel branches when cancelled', async () => { + const abortController = new AbortController() + + const startNode = createMockNode('start', 'starter') + const parallelNodes = Array.from({ length: 10 }, (_, i) => + createMockNode(`parallel${i}`, 'function') + ) + + parallelNodes.forEach((_, i) => { + startNode.outgoingEdges.set(`edge${i}`, { target: `parallel${i}` }) + }) + + const dag = createMockDAG([startNode, ...parallelNodes]) + const context = createMockContext({ abortSignal: abortController.signal }) + const edgeManager = createMockEdgeManager((node) => { + if (node.id === 'start') { + return parallelNodes.map((_, i) => `parallel${i}`) + } + return [] + }) + const nodeOrchestrator = createMockNodeOrchestrator(50) + + const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) + + const executionPromise = engine.run('start') + setTimeout(() => abortController.abort(), 30) + + const result = await executionPromise + + expect(result.status).toBe('cancelled') + expect(nodeOrchestrator.executionCount).toBeLessThan(11) + }) + + it('should not wait for all parallel branches when cancelled', async () => { + const abortController = new AbortController() + + const startNode = createMockNode('start', 'starter') + const slowNodes = Array.from({ length: 5 }, (_, i) => createMockNode(`slow${i}`, 'function')) + + slowNodes.forEach((_, i) => { + startNode.outgoingEdges.set(`edge${i}`, { target: `slow${i}` }) + }) + + const dag = createMockDAG([startNode, ...slowNodes]) + const context = createMockContext({ abortSignal: abortController.signal }) + const edgeManager = createMockEdgeManager((node) => { + if (node.id === 'start') return slowNodes.map((_, i) => `slow${i}`) + return [] + }) + const nodeOrchestrator = createMockNodeOrchestrator(200) + + const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) + + const executionPromise = engine.run('start') + setTimeout(() => abortController.abort(), 50) + + const startTime = Date.now() + const result = await executionPromise + const duration = Date.now() - startTime + + expect(result.status).toBe('cancelled') + expect(duration).toBeLessThan(500) + }) + }) + + describe('Edge cases', () => { + it('should handle empty DAG gracefully', async () => { + const dag = createMockDAG([]) + const context = createMockContext() + const edgeManager = createMockEdgeManager() + const nodeOrchestrator = createMockNodeOrchestrator() + + const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) + const result = await engine.run() + + expect(result.success).toBe(true) + expect(nodeOrchestrator.executionCount).toBe(0) + }) + + it('should preserve partial output when cancelled', async () => { + const abortController = new AbortController() + + const startNode = createMockNode('start', 'starter') + const endNode = createMockNode('end', 'function') + endNode.outgoingEdges = new Map() + + startNode.outgoingEdges.set('edge1', { target: 'end' }) + + const dag = createMockDAG([startNode, endNode]) + const context = createMockContext({ abortSignal: abortController.signal }) + const edgeManager = createMockEdgeManager((node) => { + if (node.id === 'start') return ['end'] + return [] + }) + + const nodeOrchestrator = { + executionCount: 0, + executeNode: vi.fn().mockImplementation(async (_ctx: ExecutionContext, nodeId: string) => { + if (nodeId === 'start') { + return { nodeId: 'start', output: { startData: 'value' }, isFinalOutput: false } + } + abortController.abort() + return { nodeId: 'end', output: { endData: 'value' }, isFinalOutput: true } + }), + handleNodeCompletion: vi.fn(), + } as unknown as MockNodeOrchestrator + + const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) + const result = await engine.run('start') + + expect(result.status).toBe('cancelled') + expect(result.output).toBeDefined() + }) + + it('should populate metadata on cancellation', async () => { + const abortController = new AbortController() + abortController.abort() + + const startNode = createMockNode('start', 'starter') + const dag = createMockDAG([startNode]) + const context = createMockContext({ abortSignal: abortController.signal }) + const edgeManager = createMockEdgeManager() + const nodeOrchestrator = createMockNodeOrchestrator() + + const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) + const result = await engine.run('start') + + expect(result.metadata).toBeDefined() + expect(result.metadata.endTime).toBeDefined() + expect(result.metadata.duration).toBeDefined() + }) + + it('should return logs even when cancelled', async () => { + const abortController = new AbortController() + + const startNode = createMockNode('start', 'starter') + const dag = createMockDAG([startNode]) + const context = createMockContext({ abortSignal: abortController.signal }) + context.blockLogs.push({ + blockId: 'test', + blockName: 'Test', + blockType: 'test', + startedAt: '', + endedAt: '', + durationMs: 0, + success: true, + }) + + const edgeManager = createMockEdgeManager() + const nodeOrchestrator = createMockNodeOrchestrator() + + abortController.abort() + + const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) + const result = await engine.run('start') + + expect(result.logs).toBeDefined() + expect(result.logs.length).toBeGreaterThan(0) + }) + }) + + describe('Cancellation flag behavior', () => { + it('should set cancelledFlag when abort signal fires', async () => { + const abortController = new AbortController() + + const nodes = Array.from({ length: 3 }, (_, i) => createMockNode(`node${i}`, 'function')) + for (let i = 0; i < nodes.length - 1; i++) { + nodes[i].outgoingEdges.set(`e${i}`, { target: `node${i + 1}` }) + } + + const dag = createMockDAG(nodes) + const context = createMockContext({ abortSignal: abortController.signal }) + const edgeManager = createMockEdgeManager((node) => { + if (node.id === 'node0') { + abortController.abort() + return ['node1'] + } + return node.id === 'node1' ? ['node2'] : [] + }) + const nodeOrchestrator = createMockNodeOrchestrator() + + const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) + const result = await engine.run('node0') + + expect(result.status).toBe('cancelled') + }) + + it('should cache Redis cancellation result', async () => { + ;(isRedisCancellationEnabled as Mock).mockReturnValue(true) + ;(isExecutionCancelled as Mock).mockResolvedValue(true) + + const nodes = Array.from({ length: 5 }, (_, i) => createMockNode(`node${i}`, 'function')) + const dag = createMockDAG(nodes) + const context = createMockContext() + const edgeManager = createMockEdgeManager() + const nodeOrchestrator = createMockNodeOrchestrator() + + const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) + await engine.run('node0') + + expect((isExecutionCancelled as Mock).mock.calls.length).toBeLessThanOrEqual(3) + }) + }) +}) diff --git a/apps/sim/executor/execution/engine.ts b/apps/sim/executor/execution/engine.ts index 3ddea0ddcc..53efaa25c4 100644 --- a/apps/sim/executor/execution/engine.ts +++ b/apps/sim/executor/execution/engine.ts @@ -28,6 +28,8 @@ export class ExecutionEngine { private lastCancellationCheck = 0 private readonly useRedisCancellation: boolean private readonly CANCELLATION_CHECK_INTERVAL_MS = 500 + private abortPromise: Promise | null = null + private abortResolve: (() => void) | null = null constructor( private context: ExecutionContext, @@ -37,6 +39,34 @@ export class ExecutionEngine { ) { this.allowResumeTriggers = this.context.metadata.resumeFromSnapshot === true this.useRedisCancellation = isRedisCancellationEnabled() && !!this.context.executionId + this.initializeAbortHandler() + } + + /** + * Sets up a single abort promise that can be reused throughout execution. + * This avoids creating multiple event listeners and potential memory leaks. + */ + private initializeAbortHandler(): void { + if (!this.context.abortSignal) return + + if (this.context.abortSignal.aborted) { + this.cancelledFlag = true + this.abortPromise = Promise.resolve() + return + } + + this.abortPromise = new Promise((resolve) => { + this.abortResolve = resolve + }) + + this.context.abortSignal.addEventListener( + 'abort', + () => { + this.cancelledFlag = true + this.abortResolve?.() + }, + { once: true } + ) } private async checkCancellation(): Promise { @@ -73,12 +103,15 @@ export class ExecutionEngine { this.initializeQueue(triggerBlockId) while (this.hasWork()) { - if ((await this.checkCancellation()) && this.executing.size === 0) { + if (await this.checkCancellation()) { break } await this.processQueue() } - await this.waitForAllExecutions() + + if (!this.cancelledFlag) { + await this.waitForAllExecutions() + } if (this.pausedBlocks.size > 0) { return this.buildPausedResult(startTime) @@ -164,11 +197,7 @@ export class ExecutionEngine { private trackExecution(promise: Promise): void { this.executing.add(promise) - // Attach error handler to prevent unhandled rejection warnings - // The actual error handling happens in waitForAllExecutions/waitForAnyExecution - promise.catch(() => { - // Error will be properly handled by Promise.all/Promise.race in wait methods - }) + promise.catch(() => {}) promise.finally(() => { this.executing.delete(promise) }) @@ -176,12 +205,30 @@ export class ExecutionEngine { private async waitForAnyExecution(): Promise { if (this.executing.size > 0) { - await Promise.race(this.executing) + const abortPromise = this.getAbortPromise() + if (abortPromise) { + await Promise.race([Promise.race(this.executing), abortPromise]) + } else { + await Promise.race(this.executing) + } } } private async waitForAllExecutions(): Promise { - await Promise.all(Array.from(this.executing)) + const abortPromise = this.getAbortPromise() + if (abortPromise) { + await Promise.race([Promise.all(Array.from(this.executing)), abortPromise]) + } else { + await Promise.all(Array.from(this.executing)) + } + } + + /** + * Returns the cached abort promise. This is safe to call multiple times + * as it reuses the same promise instance created during initialization. + */ + private getAbortPromise(): Promise | null { + return this.abortPromise } private async withQueueLock(fn: () => Promise | T): Promise { @@ -277,7 +324,7 @@ export class ExecutionEngine { this.trackExecution(promise) } - if (this.executing.size > 0) { + if (this.executing.size > 0 && !this.cancelledFlag) { await this.waitForAnyExecution() } } @@ -336,7 +383,6 @@ export class ExecutionEngine { this.addMultipleToQueue(readyNodes) - // Check for dynamically added nodes (e.g., from parallel expansion) if (this.context.pendingDynamicNodes && this.context.pendingDynamicNodes.length > 0) { const dynamicNodes = this.context.pendingDynamicNodes this.context.pendingDynamicNodes = [] From 8ae55c6bd912845d02db3f41fb70c0e339d000d5 Mon Sep 17 00:00:00 2001 From: waleed Date: Sun, 18 Jan 2026 19:39:19 -0800 Subject: [PATCH 2/2] comments --- apps/sim/executor/execution/engine.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/sim/executor/execution/engine.ts b/apps/sim/executor/execution/engine.ts index 53efaa25c4..7c2317b047 100644 --- a/apps/sim/executor/execution/engine.ts +++ b/apps/sim/executor/execution/engine.ts @@ -207,7 +207,7 @@ export class ExecutionEngine { if (this.executing.size > 0) { const abortPromise = this.getAbortPromise() if (abortPromise) { - await Promise.race([Promise.race(this.executing), abortPromise]) + await Promise.race([...this.executing, abortPromise]) } else { await Promise.race(this.executing) } @@ -217,9 +217,9 @@ export class ExecutionEngine { private async waitForAllExecutions(): Promise { const abortPromise = this.getAbortPromise() if (abortPromise) { - await Promise.race([Promise.all(Array.from(this.executing)), abortPromise]) + await Promise.race([Promise.all(this.executing), abortPromise]) } else { - await Promise.all(Array.from(this.executing)) + await Promise.all(this.executing) } }