Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions packages/redis-worker/src/fair-queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1343,13 +1343,37 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
let totalReclaimed = 0;

for (let shardId = 0; shardId < this.shardCount; shardId++) {
const reclaimed = await this.visibilityManager.reclaimTimedOut(shardId, (queueId) => ({
const reclaimedMessages = await this.visibilityManager.reclaimTimedOut(shardId, (queueId) => ({
queueKey: this.keys.queueKey(queueId),
queueItemsKey: this.keys.queueItemsKey(queueId),
masterQueueKey: this.keys.masterQueueKey(this.masterQueue.getShardForQueue(queueId)),
}));

totalReclaimed += reclaimed;
// Release concurrency for each reclaimed message
// This is critical: when a message times out, its concurrency slot must be freed
// so the message can be processed again when it's re-claimed from the queue
if (this.concurrencyManager && reclaimedMessages.length > 0) {
for (const msg of reclaimedMessages) {
try {
await this.concurrencyManager.release(
{
id: msg.queueId,
tenantId: msg.tenantId,
metadata: msg.metadata ?? {},
},
msg.messageId
);
} catch (error) {
this.logger.error("Failed to release concurrency for reclaimed message", {
messageId: msg.messageId,
queueId: msg.queueId,
error: error instanceof Error ? error.message : String(error),
});
}
}
}

totalReclaimed += reclaimedMessages.length;
}

if (totalReclaimed > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -624,12 +624,12 @@ describe("Race Condition Tests", () => {
await new Promise((resolve) => setTimeout(resolve, 300));

// Try to reclaim (should find nothing because heartbeat extended the deadline)
const reclaimed = await manager.reclaimTimedOut(0, (queueId) => ({
const reclaimedMessages = await manager.reclaimTimedOut(0, (queueId) => ({
queueKey: keys.queueKey(queueId),
queueItemsKey: keys.queueItemsKey(queueId),
masterQueueKey: keys.masterQueueKey(0),
}));
reclaimResults.push(reclaimed);
reclaimResults.push(reclaimedMessages.length);
}

// Heartbeats should have kept the message alive
Expand Down
181 changes: 180 additions & 1 deletion packages/redis-worker/src/fair-queue/tests/visibility.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { describe, expect } from "vitest";
import { redisTest } from "@internal/testcontainers";
import { createRedisClient } from "@internal/redis";
import { VisibilityManager, DefaultFairQueueKeyProducer } from "../index.js";
import type { FairQueueKeyProducer } from "../types.js";
import type { FairQueueKeyProducer, ReclaimedMessageInfo } from "../types.js";

describe("VisibilityManager", () => {
let keys: FairQueueKeyProducer;
Expand Down Expand Up @@ -597,5 +597,184 @@ describe("VisibilityManager", () => {
}
);
});

describe("reclaimTimedOut", () => {
redisTest(
"should return reclaimed message info with tenantId for concurrency release",
{ timeout: 10000 },
async ({ redisOptions }) => {
keys = new DefaultFairQueueKeyProducer({ prefix: "test" });

const manager = new VisibilityManager({
redis: redisOptions,
keys,
shardCount: 1,
defaultTimeoutMs: 100, // Very short timeout
});

const redis = createRedisClient(redisOptions);
const queueId = "tenant:t1:queue:reclaim-test";
const queueKey = keys.queueKey(queueId);
const queueItemsKey = keys.queueItemsKey(queueId);
const masterQueueKey = keys.masterQueueKey(0);

// Add and claim a message
const messageId = "reclaim-msg";
const storedMessage = {
id: messageId,
queueId,
tenantId: "t1",
payload: { id: 1, value: "test" },
timestamp: Date.now() - 1000,
attempt: 1,
metadata: { orgId: "org-123" },
};

await redis.zadd(queueKey, storedMessage.timestamp, messageId);
await redis.hset(queueItemsKey, messageId, JSON.stringify(storedMessage));

// Claim with very short timeout
const claimResult = await manager.claim(queueId, queueKey, queueItemsKey, "consumer-1", 100);
expect(claimResult.claimed).toBe(true);

// Wait for timeout to expire
await new Promise((resolve) => setTimeout(resolve, 150));

// Reclaim should return the message info
const reclaimedMessages = await manager.reclaimTimedOut(0, (qId) => ({
queueKey: keys.queueKey(qId),
queueItemsKey: keys.queueItemsKey(qId),
masterQueueKey,
}));

expect(reclaimedMessages).toHaveLength(1);
expect(reclaimedMessages[0]).toEqual({
messageId,
queueId,
tenantId: "t1",
metadata: { orgId: "org-123" },
});

// Verify message is back in queue
const queueCount = await redis.zcard(queueKey);
expect(queueCount).toBe(1);

// Verify message is no longer in-flight
const inflightCount = await manager.getTotalInflightCount();
expect(inflightCount).toBe(0);

await manager.close();
await redis.quit();
}
);

redisTest(
"should return empty array when no messages have timed out",
{ timeout: 10000 },
async ({ redisOptions }) => {
keys = new DefaultFairQueueKeyProducer({ prefix: "test" });

const manager = new VisibilityManager({
redis: redisOptions,
keys,
shardCount: 1,
defaultTimeoutMs: 60000, // Long timeout
});

const redis = createRedisClient(redisOptions);
const queueId = "tenant:t1:queue:no-timeout";
const queueKey = keys.queueKey(queueId);
const queueItemsKey = keys.queueItemsKey(queueId);
const masterQueueKey = keys.masterQueueKey(0);

// Add and claim a message with long timeout
const messageId = "long-timeout-msg";
const storedMessage = {
id: messageId,
queueId,
tenantId: "t1",
payload: { id: 1 },
timestamp: Date.now() - 1000,
attempt: 1,
};

await redis.zadd(queueKey, storedMessage.timestamp, messageId);
await redis.hset(queueItemsKey, messageId, JSON.stringify(storedMessage));

await manager.claim(queueId, queueKey, queueItemsKey, "consumer-1");

// Reclaim should return empty array (message hasn't timed out)
const reclaimedMessages = await manager.reclaimTimedOut(0, (qId) => ({
queueKey: keys.queueKey(qId),
queueItemsKey: keys.queueItemsKey(qId),
masterQueueKey,
}));

expect(reclaimedMessages).toHaveLength(0);

await manager.close();
await redis.quit();
}
);

redisTest(
"should reclaim multiple timed-out messages and return all their info",
{ timeout: 10000 },
async ({ redisOptions }) => {
keys = new DefaultFairQueueKeyProducer({ prefix: "test" });

const manager = new VisibilityManager({
redis: redisOptions,
keys,
shardCount: 1,
defaultTimeoutMs: 100,
});

const redis = createRedisClient(redisOptions);
const masterQueueKey = keys.masterQueueKey(0);

// Add and claim messages for two different tenants
for (const tenant of ["t1", "t2"]) {
const queueId = `tenant:${tenant}:queue:multi-reclaim`;
const queueKey = keys.queueKey(queueId);
const queueItemsKey = keys.queueItemsKey(queueId);

const messageId = `msg-${tenant}`;
const storedMessage = {
id: messageId,
queueId,
tenantId: tenant,
payload: { id: 1 },
timestamp: Date.now() - 1000,
attempt: 1,
};

await redis.zadd(queueKey, storedMessage.timestamp, messageId);
await redis.hset(queueItemsKey, messageId, JSON.stringify(storedMessage));

await manager.claim(queueId, queueKey, queueItemsKey, "consumer-1", 100);
}

// Wait for timeout
await new Promise((resolve) => setTimeout(resolve, 150));

// Reclaim should return both messages
const reclaimedMessages = await manager.reclaimTimedOut(0, (qId) => ({
queueKey: keys.queueKey(qId),
queueItemsKey: keys.queueItemsKey(qId),
masterQueueKey,
}));

expect(reclaimedMessages).toHaveLength(2);

// Verify both tenants are represented
const tenantIds = reclaimedMessages.map((m: ReclaimedMessageInfo) => m.tenantId).sort();
expect(tenantIds).toEqual(["t1", "t2"]);

await manager.close();
await redis.quit();
}
);
});
});

19 changes: 19 additions & 0 deletions packages/redis-worker/src/fair-queue/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,25 @@ export interface ConcurrencyCheckResult {
blockedBy?: ConcurrencyState;
}

// ============================================================================
// Visibility Types
// ============================================================================

/**
* Information about a reclaimed message from visibility timeout.
* Used to release concurrency after a message is returned to the queue.
*/
export interface ReclaimedMessageInfo {
/** Message ID */
messageId: string;
/** Queue ID */
queueId: string;
/** Tenant ID for concurrency release */
tenantId: string;
/** Additional metadata for concurrency group extraction */
metadata?: Record<string, unknown>;
}

// ============================================================================
// Scheduler Types
// ============================================================================
Expand Down
37 changes: 31 additions & 6 deletions packages/redis-worker/src/fair-queue/visibility.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import { createRedisClient, type Redis, type RedisOptions } from "@internal/redis";
import { jumpHash } from "@trigger.dev/core/v3/serverOnly";
import type { ClaimResult, FairQueueKeyProducer, InFlightMessage } from "./types.js";
import type {
ClaimResult,
FairQueueKeyProducer,
InFlightMessage,
ReclaimedMessageInfo,
StoredMessage,
} from "./types.js";

export interface VisibilityManagerOptions {
redis: RedisOptions;
Expand Down Expand Up @@ -368,7 +374,7 @@ export class VisibilityManager {
*
* @param shardId - The shard to check
* @param getQueueKeys - Function to get queue keys for a queue ID
* @returns Number of messages reclaimed
* @returns Array of reclaimed message info for concurrency release
*/
async reclaimTimedOut(
shardId: number,
Expand All @@ -377,7 +383,7 @@ export class VisibilityManager {
queueItemsKey: string;
masterQueueKey: string;
}
): Promise<number> {
): Promise<ReclaimedMessageInfo[]> {
const inflightKey = this.keys.inflightKey(shardId);
const inflightDataKey = this.keys.inflightDataKey(shardId);
const now = Date.now();
Expand All @@ -393,7 +399,7 @@ export class VisibilityManager {
100 // Process in batches
);

let reclaimed = 0;
const reclaimedMessages: ReclaimedMessageInfo[] = [];

for (let i = 0; i < timedOut.length; i += 2) {
const member = timedOut[i];
Expand All @@ -405,6 +411,17 @@ export class VisibilityManager {
const { queueKey, queueItemsKey, masterQueueKey } = getQueueKeys(queueId);

try {
// Get message data BEFORE releasing so we can extract tenantId for concurrency release
const dataJson = await this.redis.hget(inflightDataKey, messageId);
let storedMessage: StoredMessage | null = null;
if (dataJson) {
try {
storedMessage = JSON.parse(dataJson);
} catch {
// Ignore parse error, proceed with reclaim
}
}

// Re-add to queue with original score (or now if not available)
const score = parseFloat(originalScore) || now;
await this.redis.releaseMessage(
Expand All @@ -419,7 +436,15 @@ export class VisibilityManager {
queueId
);

reclaimed++;
// Track reclaimed message for concurrency release
if (storedMessage) {
reclaimedMessages.push({
messageId,
queueId,
tenantId: storedMessage.tenantId,
metadata: storedMessage.metadata,
});
}

this.logger.debug("Reclaimed timed-out message", {
messageId,
Expand All @@ -435,7 +460,7 @@ export class VisibilityManager {
}
}

return reclaimed;
return reclaimedMessages;
}

/**
Expand Down