diff --git a/.server-changes/task-identifier-registry.md b/.server-changes/task-identifier-registry.md
new file mode 100644
index 00000000000..327e188de21
--- /dev/null
+++ b/.server-changes/task-identifier-registry.md
@@ -0,0 +1,6 @@
+---
+area: webapp
+type: improvement
+---
+
+Replace the expensive DISTINCT query for task filter dropdowns with a dedicated TaskIdentifier registry table backed by Redis. Environments migrate automatically on their next deploy, with a transparent fallback to the legacy query for unmigrated environments. Also fixes duplicate dropdown entries when a task changes trigger source, and adds active/archived grouping for removed tasks. Moves BackgroundWorkerTask reads in the trigger hot path to the read replica.
diff --git a/apps/webapp/app/components/logs/LogsTaskFilter.tsx b/apps/webapp/app/components/logs/LogsTaskFilter.tsx
index fa64eff7bd3..3e2afdcf798 100644
--- a/apps/webapp/app/components/logs/LogsTaskFilter.tsx
+++ b/apps/webapp/app/components/logs/LogsTaskFilter.tsx
@@ -4,6 +4,8 @@ import { useMemo } from "react";
import * as Ariakit from "@ariakit/react";
import {
ComboBox,
+ SelectGroup,
+ SelectGroupLabel,
SelectItem,
SelectList,
SelectPopover,
@@ -21,6 +23,7 @@ const shortcut = { key: "t" };
type TaskOption = {
slug: string;
triggerSource: TaskTriggerSource;
+ isInLatestDeployment: boolean;
};
interface LogsTaskFilterProps {
@@ -126,17 +129,42 @@ function TasksDropdown({
>
- {filtered.map((item, index) => (
-
- }
- >
- {item.slug}
-
- ))}
+ {filtered
+ .filter((item) => item.isInLatestDeployment)
+ .map((item) => (
+
+ }
+ >
+ {item.slug}
+
+ ))}
+ {filtered.some((item) => !item.isInLatestDeployment) && (
+
+ Archived
+ {filtered
+ .filter((item) => !item.isInLatestDeployment)
+ .map((item) => (
+
+
+
+ }
+ >
+ {item.slug}
+
+ ))}
+
+ )}
diff --git a/apps/webapp/app/components/runs/v3/RunFilters.tsx b/apps/webapp/app/components/runs/v3/RunFilters.tsx
index dc3657b42a9..83ebaa0d51b 100644
--- a/apps/webapp/app/components/runs/v3/RunFilters.tsx
+++ b/apps/webapp/app/components/runs/v3/RunFilters.tsx
@@ -36,6 +36,8 @@ import { Paragraph } from "~/components/primitives/Paragraph";
import {
ComboBox,
SelectButtonItem,
+ SelectGroup,
+ SelectGroupLabel,
SelectItem,
SelectList,
SelectPopover,
@@ -322,7 +324,7 @@ export function getRunFiltersFromSearchParams(
}
type RunFiltersProps = {
- possibleTasks: { slug: string; triggerSource: TaskTriggerSource }[];
+ possibleTasks: { slug: string; triggerSource: TaskTriggerSource; isInLatestDeployment: boolean }[];
bulkActions: {
id: string;
type: BulkActionType;
@@ -627,7 +629,7 @@ function TasksDropdown({
clearSearchValue: () => void;
searchValue: string;
onClose?: () => void;
- possibleTasks: { slug: string; triggerSource: TaskTriggerSource }[];
+ possibleTasks: { slug: string; triggerSource: TaskTriggerSource; isInLatestDeployment: boolean }[];
}) {
const { values, replace } = useSearchParams();
@@ -658,17 +660,42 @@ function TasksDropdown({
>
- {filtered.map((item, index) => (
-
- }
- >
-
-
- ))}
+ {filtered
+ .filter((item) => item.isInLatestDeployment)
+ .map((item) => (
+
+ }
+ >
+
+
+ ))}
+ {filtered.some((item) => !item.isInLatestDeployment) && (
+
+ Archived
+ {filtered
+ .filter((item) => !item.isInLatestDeployment)
+ .map((item) => (
+
+
+
+ }
+ >
+
+
+ ))}
+
+ )}
diff --git a/apps/webapp/app/models/task.server.ts b/apps/webapp/app/models/task.server.ts
index b696bac6039..aab3b3bcfc1 100644
--- a/apps/webapp/app/models/task.server.ts
+++ b/apps/webapp/app/models/task.server.ts
@@ -1,6 +1,9 @@
import type { TaskTriggerSource } from "@trigger.dev/database";
import { PrismaClientOrTransaction, sqlDatabaseSchema } from "~/db.server";
+export { getTaskIdentifiers } from "~/services/taskIdentifierRegistry.server";
+export type { TaskIdentifierEntry } from "~/services/taskIdentifierCache.server";
+
/**
*
* @param prisma An efficient query to get all task identifiers for a project.
diff --git a/apps/webapp/app/presenters/v3/ErrorsListPresenter.server.ts b/apps/webapp/app/presenters/v3/ErrorsListPresenter.server.ts
index 13da4ff91f8..ea6e522dbd5 100644
--- a/apps/webapp/app/presenters/v3/ErrorsListPresenter.server.ts
+++ b/apps/webapp/app/presenters/v3/ErrorsListPresenter.server.ts
@@ -13,7 +13,7 @@ import { type ErrorGroupStatus, type PrismaClientOrTransaction } from "@trigger.
import { type Direction } from "~/components/ListPagination";
import { timeFilterFromTo } from "~/components/runs/v3/SharedFilters";
import { findDisplayableEnvironment } from "~/models/runtimeEnvironment.server";
-import { getAllTaskIdentifiers } from "~/models/task.server";
+import { getTaskIdentifiers } from "~/models/task.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";
import { BasePresenter } from "~/presenters/v3/basePresenter.server";
@@ -170,7 +170,7 @@ export class ErrorsListPresenter extends BasePresenter {
(search !== undefined && search !== "") ||
(statuses !== undefined && statuses.length > 0);
- const possibleTasksAsync = getAllTaskIdentifiers(this.replica, environmentId);
+ const possibleTasksAsync = getTaskIdentifiers(environmentId);
// Pre-filter by status: since status lives in Postgres (ErrorGroupState) and the error
// list comes from ClickHouse, we resolve inclusion/exclusion sets upfront so that
diff --git a/apps/webapp/app/presenters/v3/LogsListPresenter.server.ts b/apps/webapp/app/presenters/v3/LogsListPresenter.server.ts
index 8a3bf692b5b..517c586e4e7 100644
--- a/apps/webapp/app/presenters/v3/LogsListPresenter.server.ts
+++ b/apps/webapp/app/presenters/v3/LogsListPresenter.server.ts
@@ -7,7 +7,7 @@ import parseDuration from "parse-duration";
import { type Direction } from "~/components/ListPagination";
import { timeFilterFromTo, timeFilters } from "~/components/runs/v3/SharedFilters";
import { findDisplayableEnvironment } from "~/models/runtimeEnvironment.server";
-import { getAllTaskIdentifiers } from "~/models/task.server";
+import { getTaskIdentifiers } from "~/models/task.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";
import { kindToLevel, type LogLevel, LogLevelSchema } from "~/utils/logUtils";
import { BasePresenter } from "~/presenters/v3/basePresenter.server";
@@ -176,7 +176,7 @@ export class LogsListPresenter extends BasePresenter {
(search !== undefined && search !== "") ||
!time.isDefault;
- const possibleTasksAsync = getAllTaskIdentifiers(this.replica, environmentId);
+ const possibleTasksAsync = getTaskIdentifiers(environmentId);
const bulkActionsAsync = this.replica.bulkActionGroup.findMany({
select: {
@@ -386,12 +386,7 @@ export class LogsListPresenter extends BasePresenter {
next: nextCursor,
previous: undefined, // For now, only support forward pagination
},
- possibleTasks: possibleTasks
- .map((task) => ({
- slug: task.slug,
- triggerSource: task.triggerSource,
- }))
- .sort((a, b) => a.slug.localeCompare(b.slug)),
+ possibleTasks,
bulkActions: bulkActions.map((bulkAction) => ({
id: bulkAction.friendlyId,
type: bulkAction.type,
diff --git a/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts b/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts
index f22c7ccf340..de111abd279 100644
--- a/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts
+++ b/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts
@@ -8,7 +8,7 @@ import {
import { type Direction } from "~/components/ListPagination";
import { timeFilters } from "~/components/runs/v3/SharedFilters";
import { findDisplayableEnvironment } from "~/models/runtimeEnvironment.server";
-import { getAllTaskIdentifiers } from "~/models/task.server";
+import { getTaskIdentifiers } from "~/models/task.server";
import { RunsRepository } from "~/services/runsRepository/runsRepository.server";
import { machinePresetFromRun } from "~/v3/machinePresets.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";
@@ -105,7 +105,7 @@ export class NextRunListPresenter {
!time.isDefault;
//get all possible tasks
- const possibleTasksAsync = getAllTaskIdentifiers(this.replica, environmentId);
+ const possibleTasksAsync = getTaskIdentifiers(environmentId);
//get possible bulk actions
const bulkActionsAsync = this.replica.bulkActionGroup.findMany({
@@ -256,11 +256,7 @@ export class NextRunListPresenter {
next: pagination.nextCursor ?? undefined,
previous: pagination.previousCursor ?? undefined,
},
- possibleTasks: possibleTasks
- .map((task) => ({ slug: task.slug, triggerSource: task.triggerSource }))
- .sort((a, b) => {
- return a.slug.localeCompare(b.slug);
- }),
+ possibleTasks,
bulkActions: bulkActions.map((bulkAction) => ({
id: bulkAction.friendlyId,
type: bulkAction.type,
diff --git a/apps/webapp/app/presenters/v3/ScheduleListPresenter.server.ts b/apps/webapp/app/presenters/v3/ScheduleListPresenter.server.ts
index 053414dcfc7..22c151d720b 100644
--- a/apps/webapp/app/presenters/v3/ScheduleListPresenter.server.ts
+++ b/apps/webapp/app/presenters/v3/ScheduleListPresenter.server.ts
@@ -1,6 +1,7 @@
import { type RuntimeEnvironmentType, type ScheduleType } from "@trigger.dev/database";
import { type ScheduleListFilters } from "~/components/runs/v3/ScheduleFilters";
import { displayableEnvironment } from "~/models/runtimeEnvironment.server";
+import { getTaskIdentifiers } from "~/models/task.server";
import { getLimit } from "~/services/platform.v3.server";
import { findCurrentWorkerFromEnvironment } from "~/v3/models/workerDeployment.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";
@@ -123,14 +124,10 @@ export class ScheduleListPresenter extends BasePresenter {
}
//get all possible scheduled tasks
- const possibleTasks = await this._replica.backgroundWorkerTask.findMany({
- where: {
- workerId: latestWorker.id,
- projectId: project.id,
- runtimeEnvironmentId: environmentId,
- triggerSource: "SCHEDULED",
- },
- });
+ const allIdentifiers = await getTaskIdentifiers(environmentId);
+ const possibleTasks = allIdentifiers
+ .filter((t) => t.triggerSource === "SCHEDULED" && t.isInLatestDeployment)
+ .map((t) => ({ slug: t.slug }));
//do this here to protect against SQL injection
search = search && search !== "" ? `%${search}%` : undefined;
@@ -285,7 +282,7 @@ export class ScheduleListPresenter extends BasePresenter {
totalPages: Math.ceil(totalCount / pageSize),
totalCount: totalCount,
schedules,
- possibleTasks: possibleTasks.map((task) => task.slug).sort((a, b) => a.localeCompare(b)),
+ possibleTasks: possibleTasks.map((task) => task.slug),
hasFilters,
limits: {
used: schedulesCount,
diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.dashboards.$dashboardKey/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.dashboards.$dashboardKey/route.tsx
index 57b6b71db6f..cd358b7e67d 100644
--- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.dashboards.$dashboardKey/route.tsx
+++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.dashboards.$dashboardKey/route.tsx
@@ -17,14 +17,13 @@ import { TitleWidget } from "~/components/metrics/TitleWidget";
import { CreateDashboardPageButton } from "~/components/navigation/DashboardDialogs";
import { NavBar, PageAccessories, PageTitle } from "~/components/primitives/PageHeader";
import { TimeFilter } from "~/components/runs/v3/SharedFilters";
-import { $replica } from "~/db.server";
import { useEnvironment } from "~/hooks/useEnvironment";
import { useOrganization } from "~/hooks/useOrganizations";
import { useProject } from "~/hooks/useProject";
import { useSearchParams } from "~/hooks/useSearchParam";
import { findProjectBySlug } from "~/models/project.server";
import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server";
-import { getAllTaskIdentifiers } from "~/models/task.server";
+import { getTaskIdentifiers } from "~/models/task.server";
import {
type BuiltInDashboardFilter,
type LayoutItem,
@@ -70,7 +69,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
organizationId: project.organizationId,
key: dashboardKey,
}),
- getAllTaskIdentifiers($replica, environment.id),
+ getTaskIdentifiers(environment.id),
]);
const filters = dashboard.filters ?? ["tasks", "queues"];
@@ -114,9 +113,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
return typedjson({
...dashboard,
filters,
- possibleTasks: possibleTasks
- .map((task) => ({ slug: task.slug, triggerSource: task.triggerSource }))
- .sort((a, b) => a.slug.localeCompare(b.slug)),
+ possibleTasks,
possibleModels,
possiblePrompts,
possibleOperations,
@@ -201,7 +198,7 @@ export function MetricDashboard({
/** Which filters to show. Defaults to ["tasks", "queues"]. */
filters?: BuiltInDashboardFilter[];
/** Possible tasks for filtering */
- possibleTasks?: { slug: string; triggerSource: TaskTriggerSource }[];
+ possibleTasks?: { slug: string; triggerSource: TaskTriggerSource; isInLatestDeployment: boolean }[];
/** Possible models for filtering */
possibleModels?: ModelOption[];
/** Possible prompt slugs for filtering */
diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.dashboards.custom.$dashboardId/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.dashboards.custom.$dashboardId/route.tsx
index 051ea7a8a28..418078760cd 100644
--- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.dashboards.custom.$dashboardId/route.tsx
+++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.dashboards.custom.$dashboardId/route.tsx
@@ -35,7 +35,7 @@ import { Sheet, SheetContent } from "~/components/primitives/SheetV3";
import { useToast } from "~/components/primitives/Toast";
import { SimpleTooltip } from "~/components/primitives/Tooltip";
import { QueryEditor, type QueryEditorSaveData } from "~/components/query/QueryEditor";
-import { $replica, prisma } from "~/db.server";
+import { prisma } from "~/db.server";
import { env } from "~/env.server";
import { useDashboardEditor } from "~/hooks/useDashboardEditor";
import { useEnvironment } from "~/hooks/useEnvironment";
@@ -44,7 +44,7 @@ import { useProject } from "~/hooks/useProject";
import { redirectWithSuccessMessage } from "~/models/message.server";
import { findProjectBySlug } from "~/models/project.server";
import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server";
-import { getAllTaskIdentifiers } from "~/models/task.server";
+import { getTaskIdentifiers } from "~/models/task.server";
import { MetricDashboardPresenter } from "~/presenters/v3/MetricDashboardPresenter.server";
import { QueryPresenter } from "~/presenters/v3/QueryPresenter.server";
import { requireUser, requireUserId } from "~/services/session.server";
@@ -93,7 +93,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
queryPresenter.call({
organizationId: project.organizationId,
}),
- getAllTaskIdentifiers($replica, environment.id),
+ getTaskIdentifiers(environment.id),
]);
// Admins and impersonating users can use EXPLAIN
@@ -109,9 +109,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
queryHistory: history,
isAdmin,
maxRows: env.QUERY_CLICKHOUSE_MAX_RETURNED_ROWS,
- possibleTasks: possibleTasks
- .map((task) => ({ slug: task.slug, triggerSource: task.triggerSource }))
- .sort((a, b) => a.slug.localeCompare(b.slug)),
+ possibleTasks,
widgetCount,
});
};
diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.ai-filter.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.ai-filter.tsx
index 9ae306c98a6..ff289d241be 100644
--- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.ai-filter.tsx
+++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.ai-filter.tsx
@@ -2,11 +2,10 @@ import { openai } from "@ai-sdk/openai";
import { type ActionFunctionArgs, json } from "@remix-run/server-runtime";
import { tryCatch } from "@trigger.dev/core";
import { z } from "zod";
-import { $replica } from "~/db.server";
import { env } from "~/env.server";
import { findProjectBySlug } from "~/models/project.server";
import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server";
-import { getAllTaskIdentifiers } from "~/models/task.server";
+import { getTaskIdentifiers } from "~/models/task.server";
import { QueueListPresenter } from "~/presenters/v3/QueueListPresenter.server";
import { RunTagListPresenter } from "~/presenters/v3/RunTagListPresenter.server";
import { VersionListPresenter } from "~/presenters/v3/VersionListPresenter.server";
@@ -126,7 +125,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
const queryTasks: QueryTasks = {
query: async () => {
- const tasks = await getAllTaskIdentifiers($replica, environment.id);
+ const tasks = await getTaskIdentifiers(environment.id);
return {
tasks,
};
diff --git a/apps/webapp/app/runEngine/concerns/queues.server.ts b/apps/webapp/app/runEngine/concerns/queues.server.ts
index eb00bf1c586..136c3da3b9c 100644
--- a/apps/webapp/app/runEngine/concerns/queues.server.ts
+++ b/apps/webapp/app/runEngine/concerns/queues.server.ts
@@ -62,10 +62,15 @@ function extractQueueName(queue: { name?: unknown } | undefined): string | undef
}
export class DefaultQueueManager implements QueueManager {
+ private readonly replicaPrisma: PrismaClientOrTransaction;
+
constructor(
private readonly prisma: PrismaClientOrTransaction,
- private readonly engine: RunEngine
- ) { }
+ private readonly engine: RunEngine,
+ replicaPrisma?: PrismaClientOrTransaction
+ ) {
+ this.replicaPrisma = replicaPrisma ?? prisma;
+ }
async resolveQueueProperties(
request: TriggerTaskRequest,
@@ -103,7 +108,7 @@ export class DefaultQueueManager implements QueueManager {
// Only fetch task for TTL if caller didn't provide a per-trigger TTL
if (request.body.options?.ttl === undefined) {
- const lockedTask = await this.prisma.backgroundWorkerTask.findFirst({
+ const lockedTask = await this.replicaPrisma.backgroundWorkerTask.findFirst({
where: {
workerId: lockedBackgroundWorker.id,
runtimeEnvironmentId: request.environment.id,
@@ -116,7 +121,7 @@ export class DefaultQueueManager implements QueueManager {
}
} else {
// No queue override - fetch task with queue to get both default queue and TTL
- const lockedTask = await this.prisma.backgroundWorkerTask.findFirst({
+ const lockedTask = await this.replicaPrisma.backgroundWorkerTask.findFirst({
where: {
workerId: lockedBackgroundWorker.id,
runtimeEnvironmentId: request.environment.id,
@@ -217,7 +222,7 @@ export class DefaultQueueManager implements QueueManager {
// When queue is overridden, we only need TTL from the task (no queue join needed)
if (overriddenQueueName) {
- const task = await this.prisma.backgroundWorkerTask.findFirst({
+ const task = await this.replicaPrisma.backgroundWorkerTask.findFirst({
where: {
workerId: worker.id,
runtimeEnvironmentId: environment.id,
@@ -229,7 +234,7 @@ export class DefaultQueueManager implements QueueManager {
return { queueName: overriddenQueueName, taskTtl: task?.ttl };
}
- const task = await this.prisma.backgroundWorkerTask.findFirst({
+ const task = await this.replicaPrisma.backgroundWorkerTask.findFirst({
where: {
workerId: worker.id,
runtimeEnvironmentId: environment.id,
diff --git a/apps/webapp/app/runEngine/services/batchTrigger.server.ts b/apps/webapp/app/runEngine/services/batchTrigger.server.ts
index 4e504163fec..3df2dfb00f9 100644
--- a/apps/webapp/app/runEngine/services/batchTrigger.server.ts
+++ b/apps/webapp/app/runEngine/services/batchTrigger.server.ts
@@ -531,6 +531,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
const triggerFailedTaskService = new TriggerFailedTaskService({
prisma: this._prisma,
engine: this._engine,
+ replicaPrisma: this._replica,
});
for (const item of itemsToProcess) {
diff --git a/apps/webapp/app/runEngine/services/createBatch.server.ts b/apps/webapp/app/runEngine/services/createBatch.server.ts
index 309a7700f1a..0653e1ef1c2 100644
--- a/apps/webapp/app/runEngine/services/createBatch.server.ts
+++ b/apps/webapp/app/runEngine/services/createBatch.server.ts
@@ -40,7 +40,7 @@ export class CreateBatchService extends WithRunEngine {
constructor(protected readonly _prisma: PrismaClientOrTransaction = prisma) {
super({ prisma: _prisma });
- this.queueConcern = new DefaultQueueManager(this._prisma, this._engine);
+ this.queueConcern = new DefaultQueueManager(this._prisma, this._engine, this._replica);
this.validator = new DefaultTriggerTaskValidator();
}
diff --git a/apps/webapp/app/runEngine/services/triggerFailedTask.server.ts b/apps/webapp/app/runEngine/services/triggerFailedTask.server.ts
index cdcfa63ff0b..0b59a523a6e 100644
--- a/apps/webapp/app/runEngine/services/triggerFailedTask.server.ts
+++ b/apps/webapp/app/runEngine/services/triggerFailedTask.server.ts
@@ -51,10 +51,16 @@ export type TriggerFailedTaskRequest = {
*/
export class TriggerFailedTaskService {
private readonly prisma: PrismaClientOrTransaction;
+ private readonly replicaPrisma: PrismaClientOrTransaction;
private readonly engine: RunEngine;
- constructor(opts: { prisma: PrismaClientOrTransaction; engine: RunEngine }) {
+ constructor(opts: {
+ prisma: PrismaClientOrTransaction;
+ engine: RunEngine;
+ replicaPrisma?: PrismaClientOrTransaction;
+ }) {
this.prisma = opts.prisma;
+ this.replicaPrisma = opts.replicaPrisma ?? opts.prisma;
this.engine = opts.engine;
}
@@ -91,7 +97,7 @@ export class TriggerFailedTaskService {
let queueName: string | undefined;
let lockedQueueId: string | undefined;
try {
- const queueConcern = new DefaultQueueManager(this.prisma, this.engine);
+ const queueConcern = new DefaultQueueManager(this.prisma, this.engine, this.replicaPrisma);
const bodyOptions = request.options as TriggerTaskRequest["body"]["options"];
const triggerRequest: TriggerTaskRequest = {
taskId: request.taskId,
diff --git a/apps/webapp/app/services/taskIdentifierCache.server.ts b/apps/webapp/app/services/taskIdentifierCache.server.ts
new file mode 100644
index 00000000000..9d243b5f740
--- /dev/null
+++ b/apps/webapp/app/services/taskIdentifierCache.server.ts
@@ -0,0 +1,115 @@
+import { Redis } from "ioredis";
+import type { TaskTriggerSource } from "@trigger.dev/database";
+import { env } from "~/env.server";
+import { singleton } from "~/utils/singleton";
+import { logger } from "./logger.server";
+
+const KEY_PREFIX = "tids:";
+
+type CachedTaskIdentifier = {
+ s: string;
+ ts: TaskTriggerSource;
+ live: boolean;
+};
+
+export type TaskIdentifierEntry = {
+ slug: string;
+ triggerSource: TaskTriggerSource;
+ isInLatestDeployment: boolean;
+};
+
+function buildKey(environmentId: string): string {
+ return `${KEY_PREFIX}${environmentId}`;
+}
+
+function encode(entry: TaskIdentifierEntry): string {
+ return JSON.stringify({
+ s: entry.slug,
+ ts: entry.triggerSource,
+ live: entry.isInLatestDeployment,
+ } satisfies CachedTaskIdentifier);
+}
+
+function decode(raw: string): TaskIdentifierEntry {
+ const parsed = JSON.parse(raw) as CachedTaskIdentifier;
+ return {
+ slug: parsed.s,
+ triggerSource: parsed.ts,
+ isInLatestDeployment: parsed.live,
+ };
+}
+
+function initializeRedis(): Redis | undefined {
+ const host = env.CACHE_REDIS_HOST;
+ if (!host) {
+ return undefined;
+ }
+
+ return new Redis({
+ connectionName: "taskIdentifierCache",
+ host,
+ port: env.CACHE_REDIS_PORT,
+ username: env.CACHE_REDIS_USERNAME,
+ password: env.CACHE_REDIS_PASSWORD,
+ keyPrefix: "tr:",
+ enableAutoPipelining: true,
+ ...(env.CACHE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
+ });
+}
+
+const redis = singleton("taskIdentifierCache", initializeRedis);
+
+export async function populateTaskIdentifierCache(
+ environmentId: string,
+ identifiers: TaskIdentifierEntry[]
+): Promise {
+ if (!redis) return;
+
+ try {
+ const key = buildKey(environmentId);
+ const pipeline = redis.pipeline();
+ pipeline.del(key);
+ if (identifiers.length > 0) {
+ pipeline.sadd(key, ...identifiers.map(encode));
+ }
+ await pipeline.exec();
+ } catch (error) {
+ logger.error("Failed to populate task identifier cache", {
+ environmentId,
+ error,
+ });
+ }
+}
+
+export async function invalidateTaskIdentifierCache(environmentId: string): Promise {
+ if (!redis) return;
+
+ try {
+ const key = buildKey(environmentId);
+ await redis.del(key);
+ } catch (error) {
+ logger.error("Failed to invalidate task identifier cache", {
+ environmentId,
+ error,
+ });
+ }
+}
+
+export async function getTaskIdentifiersFromCache(
+ environmentId: string
+): Promise {
+ if (!redis) return null;
+
+ try {
+ const key = buildKey(environmentId);
+ const members = await redis.smembers(key);
+ if (members.length === 0) return null;
+ return members.map(decode);
+ } catch (error) {
+ logger.error("Failed to get task identifiers from cache", {
+ environmentId,
+ error,
+ });
+ return null;
+ }
+}
diff --git a/apps/webapp/app/services/taskIdentifierRegistry.server.ts b/apps/webapp/app/services/taskIdentifierRegistry.server.ts
new file mode 100644
index 00000000000..3d1430518c7
--- /dev/null
+++ b/apps/webapp/app/services/taskIdentifierRegistry.server.ts
@@ -0,0 +1,138 @@
+import { TaskTriggerSource } from "@trigger.dev/database";
+import { $replica, prisma } from "~/db.server";
+import { getAllTaskIdentifiers } from "~/models/task.server";
+import { logger } from "./logger.server";
+import {
+ getTaskIdentifiersFromCache,
+ populateTaskIdentifierCache,
+ type TaskIdentifierEntry,
+} from "./taskIdentifierCache.server";
+
+function toTriggerSource(source: string | undefined): TaskTriggerSource {
+ if (source === "SCHEDULED" || source === "schedule") return "SCHEDULED";
+ return "STANDARD";
+}
+
+export async function syncTaskIdentifiers(
+ environmentId: string,
+ projectId: string,
+ workerId: string,
+ tasks: { id: string; triggerSource?: string }[]
+): Promise {
+ const slugs = tasks.map((t) => t.id);
+
+ for (const task of tasks) {
+ await prisma.taskIdentifier.upsert({
+ where: {
+ runtimeEnvironmentId_slug: {
+ runtimeEnvironmentId: environmentId,
+ slug: task.id,
+ },
+ },
+ create: {
+ runtimeEnvironmentId: environmentId,
+ projectId,
+ slug: task.id,
+ currentTriggerSource: toTriggerSource(task.triggerSource),
+ currentWorkerId: workerId,
+ isInLatestDeployment: true,
+ },
+ update: {
+ currentTriggerSource: toTriggerSource(task.triggerSource),
+ currentWorkerId: workerId,
+ lastSeenAt: new Date(),
+ isInLatestDeployment: true,
+ },
+ });
+ }
+
+ if (slugs.length > 0) {
+ await prisma.taskIdentifier.updateMany({
+ where: {
+ runtimeEnvironmentId: environmentId,
+ slug: { notIn: slugs },
+ isInLatestDeployment: true,
+ },
+ data: { isInLatestDeployment: false },
+ });
+ }
+
+ const allIdentifiers = await prisma.taskIdentifier.findMany({
+ where: { runtimeEnvironmentId: environmentId },
+ select: {
+ slug: true,
+ currentTriggerSource: true,
+ isInLatestDeployment: true,
+ },
+ });
+
+ populateTaskIdentifierCache(
+ environmentId,
+ allIdentifiers.map((t) => ({
+ slug: t.slug,
+ triggerSource: t.currentTriggerSource,
+ isInLatestDeployment: t.isInLatestDeployment,
+ }))
+ ).catch((error) => {
+ logger.error("Failed to populate task identifier cache after sync", { environmentId, error });
+ });
+}
+
+function sortEntries(entries: TaskIdentifierEntry[]): TaskIdentifierEntry[] {
+ return entries.sort((a, b) => {
+ if (a.isInLatestDeployment !== b.isInLatestDeployment)
+ return a.isInLatestDeployment ? -1 : 1;
+ return a.slug.localeCompare(b.slug);
+ });
+}
+
+export async function getTaskIdentifiers(
+ environmentId: string
+): Promise {
+ const cached = await getTaskIdentifiersFromCache(environmentId);
+ if (cached) return sortEntries(cached);
+
+ const dbRows = await $replica.taskIdentifier.findMany({
+ where: { runtimeEnvironmentId: environmentId },
+ select: {
+ slug: true,
+ currentTriggerSource: true,
+ isInLatestDeployment: true,
+ },
+ });
+
+ if (dbRows.length > 0) {
+ const entries: TaskIdentifierEntry[] = dbRows.map((t) => ({
+ slug: t.slug,
+ triggerSource: t.currentTriggerSource,
+ isInLatestDeployment: t.isInLatestDeployment,
+ }));
+
+ populateTaskIdentifierCache(environmentId, entries).catch((error) => {
+ logger.error("Failed to populate task identifier cache after DB read", {
+ environmentId,
+ error,
+ });
+ });
+
+ return sortEntries(entries);
+ }
+
+ const legacyRows = await getAllTaskIdentifiers($replica, environmentId);
+ const entries: TaskIdentifierEntry[] = legacyRows.map((t) => ({
+ slug: t.slug,
+ triggerSource: t.triggerSource,
+ isInLatestDeployment: true,
+ }));
+
+ if (entries.length > 0) {
+ populateTaskIdentifierCache(environmentId, entries).catch((error) => {
+ logger.error("Failed to populate task identifier cache after legacy fallback", {
+ environmentId,
+ error,
+ });
+ });
+ }
+
+ return sortEntries(entries);
+}
diff --git a/apps/webapp/app/v3/runEngineHandlers.server.ts b/apps/webapp/app/v3/runEngineHandlers.server.ts
index 411f91ff75d..fa2afeeca68 100644
--- a/apps/webapp/app/v3/runEngineHandlers.server.ts
+++ b/apps/webapp/app/v3/runEngineHandlers.server.ts
@@ -667,6 +667,7 @@ export function setupBatchQueueCallbacks() {
const triggerFailedTaskService = new TriggerFailedTaskService({
prisma,
engine,
+ replicaPrisma: $replica,
});
// Check for pre-marked error items (e.g. oversized payloads)
diff --git a/apps/webapp/app/v3/services/changeCurrentDeployment.server.ts b/apps/webapp/app/v3/services/changeCurrentDeployment.server.ts
index 00360df946c..ee788397a08 100644
--- a/apps/webapp/app/v3/services/changeCurrentDeployment.server.ts
+++ b/apps/webapp/app/v3/services/changeCurrentDeployment.server.ts
@@ -1,8 +1,11 @@
+import { tryCatch } from "@trigger.dev/core/v3";
+import { CURRENT_DEPLOYMENT_LABEL } from "@trigger.dev/core/v3/isomorphic";
import { WorkerDeployment } from "@trigger.dev/database";
+import { logger } from "~/services/logger.server";
+import { syncTaskIdentifiers } from "~/services/taskIdentifierRegistry.server";
import { BaseService, ServiceValidationError } from "./baseService.server";
import { ExecuteTasksWaitingForDeployService } from "./executeTasksWaitingForDeploy";
import { compareDeploymentVersions } from "../utils/deploymentVersions";
-import { CURRENT_DEPLOYMENT_LABEL } from "@trigger.dev/core/v3/isomorphic";
export type ChangeCurrentDeploymentDirection = "promote" | "rollback";
@@ -96,6 +99,25 @@ export class ChangeCurrentDeploymentService extends BaseService {
},
});
+ const [syncError] = await tryCatch(
+ (async () => {
+ const tasks = await this._prisma.backgroundWorkerTask.findMany({
+ where: { workerId: deployment.workerId! },
+ select: { slug: true, triggerSource: true },
+ });
+ await syncTaskIdentifiers(
+ deployment.environmentId,
+ deployment.projectId,
+ deployment.workerId!,
+ tasks.map((t) => ({ id: t.slug, triggerSource: t.triggerSource }))
+ );
+ })()
+ );
+
+ if (syncError) {
+ logger.error("Error syncing task identifiers on deployment change", { error: syncError });
+ }
+
await ExecuteTasksWaitingForDeployService.enqueue(deployment.workerId);
}
}
diff --git a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts
index 1b51ec04aee..c8381327249 100644
--- a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts
+++ b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts
@@ -13,6 +13,7 @@ import { $transaction, Prisma, PrismaClientOrTransaction } from "~/db.server";
import { sanitizeQueueName } from "~/models/taskQueue.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
+import { syncTaskIdentifiers } from "~/services/taskIdentifierRegistry.server";
import { generateFriendlyId } from "../friendlyIdentifiers";
import {
removeQueueConcurrencyLimits,
@@ -158,6 +159,23 @@ export class CreateBackgroundWorkerService extends BaseService {
throw new ServiceValidationError("Error syncing declarative schedules");
}
+ const [syncIdentifiersError] = await tryCatch(
+ syncTaskIdentifiers(
+ environment.id,
+ project.id,
+ backgroundWorker.id,
+ body.metadata.tasks.map((t) => ({ id: t.id, triggerSource: t.triggerSource }))
+ )
+ );
+
+ if (syncIdentifiersError) {
+ logger.error("Error syncing task identifiers", {
+ error: syncIdentifiersError,
+ backgroundWorker,
+ environment,
+ });
+ }
+
const [updateConcurrencyLimitsError] = await tryCatch(
updateEnvConcurrencyLimits(environment)
);
diff --git a/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts
index e093f2c2006..9743cffcdbe 100644
--- a/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts
+++ b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts
@@ -1,7 +1,8 @@
-import { CreateBackgroundWorkerRequestBody } from "@trigger.dev/core/v3";
+import { CreateBackgroundWorkerRequestBody, tryCatch } from "@trigger.dev/core/v3";
import type { BackgroundWorker, Prisma } from "@trigger.dev/database";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
+import { syncTaskIdentifiers } from "~/services/taskIdentifierRegistry.server";
import { socketIo } from "../handleSocketIo.server";
import { updateEnvConcurrencyLimits } from "../runQueue.server";
import { PerformDeploymentAlertsService } from "./alerts/performDeploymentAlerts.server";
@@ -130,6 +131,19 @@ export class CreateDeploymentBackgroundWorkerServiceV3 extends BaseService {
},
});
+ const [syncIdError] = await tryCatch(
+ syncTaskIdentifiers(
+ environment.id,
+ environment.projectId,
+ backgroundWorker.id,
+ body.metadata.tasks.map((t) => ({ id: t.id, triggerSource: t.triggerSource }))
+ )
+ );
+
+ if (syncIdError) {
+ logger.error("Error syncing task identifiers", { error: syncIdError });
+ }
+
try {
//send a notification that a new worker has been created
await projectPubSub.publish(
diff --git a/apps/webapp/app/v3/services/triggerTask.server.ts b/apps/webapp/app/v3/services/triggerTask.server.ts
index 000633fb73f..96712c36cc4 100644
--- a/apps/webapp/app/v3/services/triggerTask.server.ts
+++ b/apps/webapp/app/v3/services/triggerTask.server.ts
@@ -99,7 +99,7 @@ export class TriggerTaskService extends WithRunEngine {
const service = new RunEngineTriggerTaskService({
prisma: this._prisma,
engine: this._engine,
- queueConcern: new DefaultQueueManager(this._prisma, this._engine),
+ queueConcern: new DefaultQueueManager(this._prisma, this._engine, this._replica),
validator: new DefaultTriggerTaskValidator(),
payloadProcessor: new DefaultPayloadProcessor(),
idempotencyKeyConcern: new IdempotencyKeyConcern(
diff --git a/internal-packages/database/prisma/migrations/20260413000000_add_bwt_covering_index/migration.sql b/internal-packages/database/prisma/migrations/20260413000000_add_bwt_covering_index/migration.sql
new file mode 100644
index 00000000000..6e95c900b34
--- /dev/null
+++ b/internal-packages/database/prisma/migrations/20260413000000_add_bwt_covering_index/migration.sql
@@ -0,0 +1,2 @@
+CREATE INDEX CONCURRENTLY IF NOT EXISTS "BackgroundWorkerTask_runtimeEnvironmentId_slug_triggerSource_idx"
+ ON "BackgroundWorkerTask"("runtimeEnvironmentId", slug, "triggerSource");
diff --git a/internal-packages/database/prisma/migrations/20260413000001_add_task_identifier_table/migration.sql b/internal-packages/database/prisma/migrations/20260413000001_add_task_identifier_table/migration.sql
new file mode 100644
index 00000000000..43a31e905d0
--- /dev/null
+++ b/internal-packages/database/prisma/migrations/20260413000001_add_task_identifier_table/migration.sql
@@ -0,0 +1,40 @@
+-- CreateTable
+CREATE TABLE "TaskIdentifier" (
+ "id" TEXT NOT NULL,
+ "runtimeEnvironmentId" TEXT NOT NULL,
+ "projectId" TEXT NOT NULL,
+ "slug" TEXT NOT NULL,
+ "currentTriggerSource" "TaskTriggerSource" NOT NULL DEFAULT 'STANDARD',
+ "currentWorkerId" TEXT NOT NULL,
+ "firstSeenAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ "lastSeenAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ "isInLatestDeployment" BOOLEAN NOT NULL DEFAULT true,
+
+ CONSTRAINT "TaskIdentifier_pkey" PRIMARY KEY ("id")
+);
+
+-- CreateIndex
+CREATE UNIQUE INDEX "TaskIdentifier_runtimeEnvironmentId_slug_key"
+ ON "TaskIdentifier"("runtimeEnvironmentId", "slug");
+
+-- CreateIndex
+CREATE INDEX "TaskIdentifier_runtimeEnvironmentId_isInLatestDeployment_idx"
+ ON "TaskIdentifier"("runtimeEnvironmentId", "isInLatestDeployment");
+
+-- AddForeignKey
+ALTER TABLE "TaskIdentifier"
+ ADD CONSTRAINT "TaskIdentifier_runtimeEnvironmentId_fkey"
+ FOREIGN KEY ("runtimeEnvironmentId") REFERENCES "RuntimeEnvironment"("id")
+ ON DELETE CASCADE ON UPDATE CASCADE;
+
+-- AddForeignKey
+ALTER TABLE "TaskIdentifier"
+ ADD CONSTRAINT "TaskIdentifier_projectId_fkey"
+ FOREIGN KEY ("projectId") REFERENCES "Project"("id")
+ ON DELETE CASCADE ON UPDATE CASCADE;
+
+-- AddForeignKey
+ALTER TABLE "TaskIdentifier"
+ ADD CONSTRAINT "TaskIdentifier_currentWorkerId_fkey"
+ FOREIGN KEY ("currentWorkerId") REFERENCES "BackgroundWorker"("id")
+ ON DELETE CASCADE ON UPDATE CASCADE;
diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma
index 7138aeaab0d..02bedf2045e 100644
--- a/internal-packages/database/prisma/schema.prisma
+++ b/internal-packages/database/prisma/schema.prisma
@@ -354,6 +354,7 @@ model RuntimeEnvironment {
customerQueries CustomerQuery[]
prompts Prompt[]
errorGroupStates ErrorGroupState[]
+ taskIdentifiers TaskIdentifier[]
@@unique([projectId, slug, orgMemberId])
@@unique([projectId, shortcode])
@@ -429,6 +430,7 @@ model Project {
prompts Prompt[]
platformNotifications PlatformNotification[]
errorGroupStates ErrorGroupState[]
+ taskIdentifiers TaskIdentifier[]
}
enum ProjectVersion {
@@ -517,6 +519,8 @@ model BackgroundWorker {
supportsLazyAttempts Boolean @default(false)
+ taskIdentifiers TaskIdentifier[]
+
@@unique([projectId, runtimeEnvironmentId, version])
@@index([runtimeEnvironmentId])
// Get the latest worker for a given environment
@@ -659,6 +663,7 @@ model BackgroundWorkerTask {
// Quick lookup of task identifiers
@@index([projectId, slug])
@@index([runtimeEnvironmentId, projectId])
+ @@index([runtimeEnvironmentId, slug, triggerSource])
}
enum TaskTriggerSource {
@@ -2917,3 +2922,27 @@ model ErrorGroupState {
@@unique([environmentId, taskIdentifier, errorFingerprint])
@@index([environmentId, status])
}
+
+model TaskIdentifier {
+ id String @id @default(cuid())
+
+ runtimeEnvironment RuntimeEnvironment @relation(fields: [runtimeEnvironmentId], references: [id], onDelete: Cascade, onUpdate: Cascade)
+ runtimeEnvironmentId String
+
+ project Project @relation(fields: [projectId], references: [id], onDelete: Cascade, onUpdate: Cascade)
+ projectId String
+
+ slug String
+
+ currentTriggerSource TaskTriggerSource @default(STANDARD)
+
+ currentWorker BackgroundWorker @relation(fields: [currentWorkerId], references: [id], onDelete: Cascade, onUpdate: Cascade)
+ currentWorkerId String
+
+ firstSeenAt DateTime @default(now())
+ lastSeenAt DateTime @default(now())
+ isInLatestDeployment Boolean @default(true)
+
+ @@unique([runtimeEnvironmentId, slug])
+ @@index([runtimeEnvironmentId, isInLatestDeployment])
+}
\ No newline at end of file