From d184737a23c44c15363eb8a59fe944cea0f9a918 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Mon, 19 Jan 2026 11:13:04 +0000 Subject: [PATCH] fix(fair-queue): ensure concurrency is released when a message reaches visibility timeout to prevent concurrency leaks --- packages/redis-worker/src/fair-queue/index.ts | 28 ++- .../fair-queue/tests/raceConditions.test.ts | 4 +- .../src/fair-queue/tests/visibility.test.ts | 181 +++++++++++++++++- packages/redis-worker/src/fair-queue/types.ts | 19 ++ .../redis-worker/src/fair-queue/visibility.ts | 37 +++- 5 files changed, 258 insertions(+), 11 deletions(-) diff --git a/packages/redis-worker/src/fair-queue/index.ts b/packages/redis-worker/src/fair-queue/index.ts index b2281cfa59..7ef461375c 100644 --- a/packages/redis-worker/src/fair-queue/index.ts +++ b/packages/redis-worker/src/fair-queue/index.ts @@ -1343,13 +1343,37 @@ export class FairQueue { let totalReclaimed = 0; for (let shardId = 0; shardId < this.shardCount; shardId++) { - const reclaimed = await this.visibilityManager.reclaimTimedOut(shardId, (queueId) => ({ + const reclaimedMessages = await this.visibilityManager.reclaimTimedOut(shardId, (queueId) => ({ queueKey: this.keys.queueKey(queueId), queueItemsKey: this.keys.queueItemsKey(queueId), masterQueueKey: this.keys.masterQueueKey(this.masterQueue.getShardForQueue(queueId)), })); - totalReclaimed += reclaimed; + // Release concurrency for each reclaimed message + // This is critical: when a message times out, its concurrency slot must be freed + // so the message can be processed again when it's re-claimed from the queue + if (this.concurrencyManager && reclaimedMessages.length > 0) { + for (const msg of reclaimedMessages) { + try { + await this.concurrencyManager.release( + { + id: msg.queueId, + tenantId: msg.tenantId, + metadata: msg.metadata ?? {}, + }, + msg.messageId + ); + } catch (error) { + this.logger.error("Failed to release concurrency for reclaimed message", { + messageId: msg.messageId, + queueId: msg.queueId, + error: error instanceof Error ? error.message : String(error), + }); + } + } + } + + totalReclaimed += reclaimedMessages.length; } if (totalReclaimed > 0) { diff --git a/packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts b/packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts index d6ee70a450..1222bd9e4f 100644 --- a/packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts +++ b/packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts @@ -624,12 +624,12 @@ describe("Race Condition Tests", () => { await new Promise((resolve) => setTimeout(resolve, 300)); // Try to reclaim (should find nothing because heartbeat extended the deadline) - const reclaimed = await manager.reclaimTimedOut(0, (queueId) => ({ + const reclaimedMessages = await manager.reclaimTimedOut(0, (queueId) => ({ queueKey: keys.queueKey(queueId), queueItemsKey: keys.queueItemsKey(queueId), masterQueueKey: keys.masterQueueKey(0), })); - reclaimResults.push(reclaimed); + reclaimResults.push(reclaimedMessages.length); } // Heartbeats should have kept the message alive diff --git a/packages/redis-worker/src/fair-queue/tests/visibility.test.ts b/packages/redis-worker/src/fair-queue/tests/visibility.test.ts index b35eb874fc..8a60138ee1 100644 --- a/packages/redis-worker/src/fair-queue/tests/visibility.test.ts +++ b/packages/redis-worker/src/fair-queue/tests/visibility.test.ts @@ -2,7 +2,7 @@ import { describe, expect } from "vitest"; import { redisTest } from "@internal/testcontainers"; import { createRedisClient } from "@internal/redis"; import { VisibilityManager, DefaultFairQueueKeyProducer } from "../index.js"; -import type { FairQueueKeyProducer } from "../types.js"; +import type { FairQueueKeyProducer, ReclaimedMessageInfo } from "../types.js"; describe("VisibilityManager", () => { let keys: FairQueueKeyProducer; @@ -597,5 +597,184 @@ describe("VisibilityManager", () => { } ); }); + + describe("reclaimTimedOut", () => { + redisTest( + "should return reclaimed message info with tenantId for concurrency release", + { timeout: 10000 }, + async ({ redisOptions }) => { + keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + + const manager = new VisibilityManager({ + redis: redisOptions, + keys, + shardCount: 1, + defaultTimeoutMs: 100, // Very short timeout + }); + + const redis = createRedisClient(redisOptions); + const queueId = "tenant:t1:queue:reclaim-test"; + const queueKey = keys.queueKey(queueId); + const queueItemsKey = keys.queueItemsKey(queueId); + const masterQueueKey = keys.masterQueueKey(0); + + // Add and claim a message + const messageId = "reclaim-msg"; + const storedMessage = { + id: messageId, + queueId, + tenantId: "t1", + payload: { id: 1, value: "test" }, + timestamp: Date.now() - 1000, + attempt: 1, + metadata: { orgId: "org-123" }, + }; + + await redis.zadd(queueKey, storedMessage.timestamp, messageId); + await redis.hset(queueItemsKey, messageId, JSON.stringify(storedMessage)); + + // Claim with very short timeout + const claimResult = await manager.claim(queueId, queueKey, queueItemsKey, "consumer-1", 100); + expect(claimResult.claimed).toBe(true); + + // Wait for timeout to expire + await new Promise((resolve) => setTimeout(resolve, 150)); + + // Reclaim should return the message info + const reclaimedMessages = await manager.reclaimTimedOut(0, (qId) => ({ + queueKey: keys.queueKey(qId), + queueItemsKey: keys.queueItemsKey(qId), + masterQueueKey, + })); + + expect(reclaimedMessages).toHaveLength(1); + expect(reclaimedMessages[0]).toEqual({ + messageId, + queueId, + tenantId: "t1", + metadata: { orgId: "org-123" }, + }); + + // Verify message is back in queue + const queueCount = await redis.zcard(queueKey); + expect(queueCount).toBe(1); + + // Verify message is no longer in-flight + const inflightCount = await manager.getTotalInflightCount(); + expect(inflightCount).toBe(0); + + await manager.close(); + await redis.quit(); + } + ); + + redisTest( + "should return empty array when no messages have timed out", + { timeout: 10000 }, + async ({ redisOptions }) => { + keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + + const manager = new VisibilityManager({ + redis: redisOptions, + keys, + shardCount: 1, + defaultTimeoutMs: 60000, // Long timeout + }); + + const redis = createRedisClient(redisOptions); + const queueId = "tenant:t1:queue:no-timeout"; + const queueKey = keys.queueKey(queueId); + const queueItemsKey = keys.queueItemsKey(queueId); + const masterQueueKey = keys.masterQueueKey(0); + + // Add and claim a message with long timeout + const messageId = "long-timeout-msg"; + const storedMessage = { + id: messageId, + queueId, + tenantId: "t1", + payload: { id: 1 }, + timestamp: Date.now() - 1000, + attempt: 1, + }; + + await redis.zadd(queueKey, storedMessage.timestamp, messageId); + await redis.hset(queueItemsKey, messageId, JSON.stringify(storedMessage)); + + await manager.claim(queueId, queueKey, queueItemsKey, "consumer-1"); + + // Reclaim should return empty array (message hasn't timed out) + const reclaimedMessages = await manager.reclaimTimedOut(0, (qId) => ({ + queueKey: keys.queueKey(qId), + queueItemsKey: keys.queueItemsKey(qId), + masterQueueKey, + })); + + expect(reclaimedMessages).toHaveLength(0); + + await manager.close(); + await redis.quit(); + } + ); + + redisTest( + "should reclaim multiple timed-out messages and return all their info", + { timeout: 10000 }, + async ({ redisOptions }) => { + keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + + const manager = new VisibilityManager({ + redis: redisOptions, + keys, + shardCount: 1, + defaultTimeoutMs: 100, + }); + + const redis = createRedisClient(redisOptions); + const masterQueueKey = keys.masterQueueKey(0); + + // Add and claim messages for two different tenants + for (const tenant of ["t1", "t2"]) { + const queueId = `tenant:${tenant}:queue:multi-reclaim`; + const queueKey = keys.queueKey(queueId); + const queueItemsKey = keys.queueItemsKey(queueId); + + const messageId = `msg-${tenant}`; + const storedMessage = { + id: messageId, + queueId, + tenantId: tenant, + payload: { id: 1 }, + timestamp: Date.now() - 1000, + attempt: 1, + }; + + await redis.zadd(queueKey, storedMessage.timestamp, messageId); + await redis.hset(queueItemsKey, messageId, JSON.stringify(storedMessage)); + + await manager.claim(queueId, queueKey, queueItemsKey, "consumer-1", 100); + } + + // Wait for timeout + await new Promise((resolve) => setTimeout(resolve, 150)); + + // Reclaim should return both messages + const reclaimedMessages = await manager.reclaimTimedOut(0, (qId) => ({ + queueKey: keys.queueKey(qId), + queueItemsKey: keys.queueItemsKey(qId), + masterQueueKey, + })); + + expect(reclaimedMessages).toHaveLength(2); + + // Verify both tenants are represented + const tenantIds = reclaimedMessages.map((m: ReclaimedMessageInfo) => m.tenantId).sort(); + expect(tenantIds).toEqual(["t1", "t2"]); + + await manager.close(); + await redis.quit(); + } + ); + }); }); diff --git a/packages/redis-worker/src/fair-queue/types.ts b/packages/redis-worker/src/fair-queue/types.ts index 002a2089a7..6451df1bea 100644 --- a/packages/redis-worker/src/fair-queue/types.ts +++ b/packages/redis-worker/src/fair-queue/types.ts @@ -133,6 +133,25 @@ export interface ConcurrencyCheckResult { blockedBy?: ConcurrencyState; } +// ============================================================================ +// Visibility Types +// ============================================================================ + +/** + * Information about a reclaimed message from visibility timeout. + * Used to release concurrency after a message is returned to the queue. + */ +export interface ReclaimedMessageInfo { + /** Message ID */ + messageId: string; + /** Queue ID */ + queueId: string; + /** Tenant ID for concurrency release */ + tenantId: string; + /** Additional metadata for concurrency group extraction */ + metadata?: Record; +} + // ============================================================================ // Scheduler Types // ============================================================================ diff --git a/packages/redis-worker/src/fair-queue/visibility.ts b/packages/redis-worker/src/fair-queue/visibility.ts index 114fdbb7c6..0cb348ec5e 100644 --- a/packages/redis-worker/src/fair-queue/visibility.ts +++ b/packages/redis-worker/src/fair-queue/visibility.ts @@ -1,6 +1,12 @@ import { createRedisClient, type Redis, type RedisOptions } from "@internal/redis"; import { jumpHash } from "@trigger.dev/core/v3/serverOnly"; -import type { ClaimResult, FairQueueKeyProducer, InFlightMessage } from "./types.js"; +import type { + ClaimResult, + FairQueueKeyProducer, + InFlightMessage, + ReclaimedMessageInfo, + StoredMessage, +} from "./types.js"; export interface VisibilityManagerOptions { redis: RedisOptions; @@ -368,7 +374,7 @@ export class VisibilityManager { * * @param shardId - The shard to check * @param getQueueKeys - Function to get queue keys for a queue ID - * @returns Number of messages reclaimed + * @returns Array of reclaimed message info for concurrency release */ async reclaimTimedOut( shardId: number, @@ -377,7 +383,7 @@ export class VisibilityManager { queueItemsKey: string; masterQueueKey: string; } - ): Promise { + ): Promise { const inflightKey = this.keys.inflightKey(shardId); const inflightDataKey = this.keys.inflightDataKey(shardId); const now = Date.now(); @@ -393,7 +399,7 @@ export class VisibilityManager { 100 // Process in batches ); - let reclaimed = 0; + const reclaimedMessages: ReclaimedMessageInfo[] = []; for (let i = 0; i < timedOut.length; i += 2) { const member = timedOut[i]; @@ -405,6 +411,17 @@ export class VisibilityManager { const { queueKey, queueItemsKey, masterQueueKey } = getQueueKeys(queueId); try { + // Get message data BEFORE releasing so we can extract tenantId for concurrency release + const dataJson = await this.redis.hget(inflightDataKey, messageId); + let storedMessage: StoredMessage | null = null; + if (dataJson) { + try { + storedMessage = JSON.parse(dataJson); + } catch { + // Ignore parse error, proceed with reclaim + } + } + // Re-add to queue with original score (or now if not available) const score = parseFloat(originalScore) || now; await this.redis.releaseMessage( @@ -419,7 +436,15 @@ export class VisibilityManager { queueId ); - reclaimed++; + // Track reclaimed message for concurrency release + if (storedMessage) { + reclaimedMessages.push({ + messageId, + queueId, + tenantId: storedMessage.tenantId, + metadata: storedMessage.metadata, + }); + } this.logger.debug("Reclaimed timed-out message", { messageId, @@ -435,7 +460,7 @@ export class VisibilityManager { } } - return reclaimed; + return reclaimedMessages; } /**