Skip to content

Conversation

@ericallam
Copy link
Member

@ericallam ericallam commented Jan 19, 2026

Summary

Fixes a concurrency leak in the batch queue where visibility timeout reclaims do not release concurrency slots.

The bug: When a message visibility timeout expires (60s), reclaimTimedOut puts the message back in the queue but does NOT release the concurrency slot. The messageId stays in the concurrency set (engine:batch:concurrency:tenant:{envId}), counting against the tenant limit even though the message is no longer in-flight.

This causes:

  1. Tenant appears at capacity when checking SCARD >= limit
  2. New messages get released back to queue instead of being processed
  3. Messages stuck in infinite loop, master queue grows indefinitely

The fix:

  • Modified reclaimTimedOut to capture message data (including tenantId) BEFORE releasing from in-flight
  • Returns ReclaimedMessageInfo[] with messageId, queueId, tenantId, and metadata
  • #reclaimTimedOutMessages now iterates over reclaimed messages and calls concurrencyManager.release() for each

Test plan

  • Added test: should return reclaimed message info with tenantId for concurrency release
  • Added test: should return empty array when no messages have timed out
  • Added test: should reclaim multiple timed-out messages and return all their info
  • Updated raceConditions.test.ts for new return type
  • All tests passing
  • Monitor production after deploy for concurrency leak recurrence

refs TRI-7049

…s visibility timeout to prevent concurrency leaks
@changeset-bot
Copy link

changeset-bot bot commented Jan 19, 2026

⚠️ No Changeset found

Latest commit: d184737

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 19, 2026

Walkthrough

The pull request refactors the message reclaim mechanism in the fair queue system. The reclaimTimedOut method signature changes from returning a numeric count to returning an array of ReclaimedMessageInfo objects containing messageId, queueId, tenantId, and metadata. The visibility manager now fetches in-flight message data during reclamation and includes tenant and metadata information in the returned array. The calling code in the fair queue index processes each reclaimed message individually to release concurrency slots with proper tenant awareness. A new type definition supports this structural change, and test coverage validates the new behavior across multiple scenarios.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Description check ⚠️ Warning The pull request description is entirely empty, missing all required template sections including checklist, testing steps, and changelog details. Complete the pull request description by filling out all required sections: checklist verification, testing steps, changelog summary, and any relevant screenshots.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and specifically describes the main change: fixing concurrency release when messages reach visibility timeout to prevent leaks.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
packages/redis-worker/src/fair-queue/visibility.ts (1)

414-447: Silent fallback when stored message data is unavailable.

When storedMessage is null (either because dataJson is missing or JSON parsing fails), the message is still reclaimed back to the queue, but no ReclaimedMessageInfo is added to the result array. This means the calling code in index.ts won't release the concurrency slot for that message.

This could lead to a concurrency leak if the in-flight data hash gets corrupted or out of sync with the in-flight sorted set. Consider logging a warning when this happens to aid debugging:

💡 Suggested improvement
         // Track reclaimed message for concurrency release
         if (storedMessage) {
           reclaimedMessages.push({
             messageId,
             queueId,
             tenantId: storedMessage.tenantId,
             metadata: storedMessage.metadata,
           });
+        } else {
+          this.logger.error("Cannot release concurrency for reclaimed message: stored data unavailable", {
+            messageId,
+            queueId,
+          });
         }
🧹 Nitpick comments (2)
packages/redis-worker/src/fair-queue/types.ts (1)

144-153: Consider using type instead of interface per coding guidelines.

The coding guidelines specify "Use types over interfaces for TypeScript". While this is a minor stylistic point, consider converting to a type alias for consistency:

♻️ Suggested change
-export interface ReclaimedMessageInfo {
+export type ReclaimedMessageInfo = {
   /** Message ID */
   messageId: string;
   /** Queue ID */
   queueId: string;
   /** Tenant ID for concurrency release */
   tenantId: string;
   /** Additional metadata for concurrency group extraction */
   metadata?: Record<string, unknown>;
-}
+};
packages/redis-worker/src/fair-queue/tests/visibility.test.ts (1)

720-777: Consider adding a test for the edge case when stored message data is unavailable.

The current tests cover happy paths, but there's no test verifying behavior when the in-flight data hash doesn't contain the stored message (e.g., data corruption scenario). This would help ensure the silent fallback behavior is intentional and documented.

Would you like me to help draft a test case for this edge case?

📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 72594a4 and d184737.

📒 Files selected for processing (5)
  • packages/redis-worker/src/fair-queue/index.ts
  • packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts
  • packages/redis-worker/src/fair-queue/tests/visibility.test.ts
  • packages/redis-worker/src/fair-queue/types.ts
  • packages/redis-worker/src/fair-queue/visibility.ts
🧰 Additional context used
📓 Path-based instructions (8)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

**/*.{ts,tsx}: Use types over interfaces for TypeScript
Avoid using enums; prefer string unions or const objects instead

**/*.{ts,tsx}: Always import tasks from @trigger.dev/sdk, never use @trigger.dev/sdk/v3 or deprecated client.defineJob pattern
Every Trigger.dev task must be exported and have a unique id property with no timeouts in the run function

Files:

  • packages/redis-worker/src/fair-queue/types.ts
  • packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts
  • packages/redis-worker/src/fair-queue/visibility.ts
  • packages/redis-worker/src/fair-queue/index.ts
  • packages/redis-worker/src/fair-queue/tests/visibility.test.ts
**/*.{ts,tsx,js,jsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Use function declarations instead of default exports

Import from @trigger.dev/core using subpaths only, never import from root

Files:

  • packages/redis-worker/src/fair-queue/types.ts
  • packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts
  • packages/redis-worker/src/fair-queue/visibility.ts
  • packages/redis-worker/src/fair-queue/index.ts
  • packages/redis-worker/src/fair-queue/tests/visibility.test.ts
**/*.ts

📄 CodeRabbit inference engine (.cursor/rules/otel-metrics.mdc)

**/*.ts: When creating or editing OTEL metrics (counters, histograms, gauges), ensure metric attributes have low cardinality by using only enums, booleans, bounded error codes, or bounded shard IDs
Do not use high-cardinality attributes in OTEL metrics such as UUIDs/IDs (envId, userId, runId, projectId, organizationId), unbounded integers (itemCount, batchSize, retryCount), timestamps (createdAt, startTime), or free-form strings (errorMessage, taskName, queueName)
When exporting OTEL metrics via OTLP to Prometheus, be aware that the exporter automatically adds unit suffixes to metric names (e.g., 'my_duration_ms' becomes 'my_duration_ms_milliseconds', 'my_counter' becomes 'my_counter_total'). Account for these transformations when writing Grafana dashboards or Prometheus queries

Files:

  • packages/redis-worker/src/fair-queue/types.ts
  • packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts
  • packages/redis-worker/src/fair-queue/visibility.ts
  • packages/redis-worker/src/fair-queue/index.ts
  • packages/redis-worker/src/fair-queue/tests/visibility.test.ts
**/*.{js,ts,jsx,tsx,json,md,yaml,yml}

📄 CodeRabbit inference engine (AGENTS.md)

Format code using Prettier before committing

Files:

  • packages/redis-worker/src/fair-queue/types.ts
  • packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts
  • packages/redis-worker/src/fair-queue/visibility.ts
  • packages/redis-worker/src/fair-queue/index.ts
  • packages/redis-worker/src/fair-queue/tests/visibility.test.ts
{packages,integrations}/**/*

📄 CodeRabbit inference engine (CLAUDE.md)

Add a changeset when modifying any public package in packages/* or integrations/* using pnpm run changeset:add

Files:

  • packages/redis-worker/src/fair-queue/types.ts
  • packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts
  • packages/redis-worker/src/fair-queue/visibility.ts
  • packages/redis-worker/src/fair-queue/index.ts
  • packages/redis-worker/src/fair-queue/tests/visibility.test.ts
**/*.{test,spec}.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Use vitest for all tests in the Trigger.dev repository

Files:

  • packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts
  • packages/redis-worker/src/fair-queue/tests/visibility.test.ts
**/*.test.{ts,tsx,js,jsx}

📄 CodeRabbit inference engine (AGENTS.md)

**/*.test.{ts,tsx,js,jsx}: Test files should live beside the files under test and use descriptive describe and it blocks
Tests should avoid mocks or stubs and use the helpers from @internal/testcontainers when Redis or Postgres are needed
Use vitest for running unit tests

**/*.test.{ts,tsx,js,jsx}: Use vitest exclusively for testing and never mock anything - use testcontainers instead
Place test files next to source files with naming pattern: source file (e.g., MyService.ts) → MyService.test.ts

Files:

  • packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts
  • packages/redis-worker/src/fair-queue/tests/visibility.test.ts
**/*.test.{ts,tsx}

📄 CodeRabbit inference engine (CLAUDE.md)

Use testcontainers helpers (redisTest, postgresTest, containerTest) from @internal/testcontainers for Redis/PostgreSQL testing instead of mocks

Files:

  • packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts
  • packages/redis-worker/src/fair-queue/tests/visibility.test.ts
🧠 Learnings (4)
📓 Common learnings
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Control concurrency using the `queue` property with `concurrencyLimit` option
📚 Learning: 2026-01-12T17:18:09.451Z
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 2870
File: apps/webapp/app/services/redisConcurrencyLimiter.server.ts:56-66
Timestamp: 2026-01-12T17:18:09.451Z
Learning: In `apps/webapp/app/services/redisConcurrencyLimiter.server.ts`, the query concurrency limiter will not be deployed with Redis Cluster mode, so multi-key operations (keyKey and globalKey in different hash slots) are acceptable and will function correctly in standalone Redis mode.

Applied to files:

  • packages/redis-worker/src/fair-queue/visibility.ts
📚 Learning: 2026-01-15T11:50:06.044Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-01-15T11:50:06.044Z
Learning: Applies to **/*.test.{ts,tsx} : Use testcontainers helpers (`redisTest`, `postgresTest`, `containerTest`) from `internal/testcontainers` for Redis/PostgreSQL testing instead of mocks

Applied to files:

  • packages/redis-worker/src/fair-queue/tests/visibility.test.ts
📚 Learning: 2026-01-15T10:48:02.673Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-15T10:48:02.673Z
Learning: Applies to **/*.test.{ts,tsx,js,jsx} : Tests should avoid mocks or stubs and use the helpers from `internal/testcontainers` when Redis or Postgres are needed

Applied to files:

  • packages/redis-worker/src/fair-queue/tests/visibility.test.ts
🧬 Code graph analysis (3)
packages/redis-worker/src/fair-queue/visibility.ts (2)
packages/redis-worker/src/fair-queue/types.ts (2)
  • ReclaimedMessageInfo (144-153)
  • StoredMessage (62-79)
packages/redis-worker/src/fair-queue/keyProducer.ts (1)
  • inflightDataKey (61-63)
packages/redis-worker/src/fair-queue/index.ts (2)
packages/redis-worker/src/fair-queue/visibility.ts (2)
  • shardId (548-557)
  • queueId (529-531)
packages/redis-worker/src/fair-queue/masterQueue.ts (1)
  • queueId (195-197)
packages/redis-worker/src/fair-queue/tests/visibility.test.ts (1)
packages/redis-worker/src/fair-queue/types.ts (1)
  • ReclaimedMessageInfo (144-153)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (22)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (6)
packages/redis-worker/src/fair-queue/visibility.ts (2)

3-9: LGTM!

The imports are correctly updated to include the new ReclaimedMessageInfo and StoredMessage types needed for the enhanced reclaim functionality.


377-386: LGTM!

The method signature and documentation are properly updated to reflect the new return type Promise<ReclaimedMessageInfo[]>, clearly communicating the purpose of returning per-message info for concurrency release.

packages/redis-worker/src/fair-queue/index.ts (1)

1346-1376: Well-implemented fix for concurrency leak.

The implementation correctly addresses the core issue: when a message times out, its concurrency slot is now properly released. Key strengths:

  1. Per-message error handling ensures one failure doesn't prevent releasing other slots
  2. Clear comments explain the critical nature of this code path
  3. The QueueDescriptor is properly constructed from the reclaimed message info

This is a solid fix for preventing concurrency leaks on visibility timeout.

packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts (1)

627-632: LGTM!

The test correctly adapts to the new reclaimTimedOut return type by using .length to get the count of reclaimed messages. The variable rename from reclaimed to reclaimedMessages improves clarity.

packages/redis-worker/src/fair-queue/tests/visibility.test.ts (2)

600-778: Comprehensive test coverage for the new reclaimTimedOut behavior.

The tests thoroughly validate the key scenarios:

  1. Successful reclaim returns complete message info including tenantId and metadata
  2. No messages returned when nothing has timed out
  3. Multiple messages from different tenants are correctly reclaimed

The tests properly use redisTest from @internal/testcontainers as per coding guidelines and correctly clean up resources.


5-5: LGTM!

The import of ReclaimedMessageInfo type is correctly added to support type-safe assertions in the new tests.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

@vibe-kanban-cloud
Copy link

Review Complete

Your review story is ready!

View Story

Comment !reviewfast on this PR to re-generate the story.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants