From 94fce542e6f9e480635381f44e50d2fddfbbf30f Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Mon, 13 Apr 2026 14:15:44 +0100 Subject: [PATCH 1/2] feat: add TaskIdentifier registry to eliminate 7.28% DB runtime from getAllTaskIdentifiers refs TRI-8441 --- .server-changes/task-identifier-registry.md | 6 + .../app/components/logs/LogsTaskFilter.tsx | 50 +++++-- .../app/components/runs/v3/RunFilters.tsx | 53 +++++-- apps/webapp/app/models/task.server.ts | 3 + .../v3/ErrorsListPresenter.server.ts | 4 +- .../presenters/v3/LogsListPresenter.server.ts | 11 +- .../v3/NextRunListPresenter.server.ts | 10 +- .../v3/ScheduleListPresenter.server.ts | 15 +- .../route.tsx | 11 +- .../route.tsx | 10 +- ...jectParam.env.$envParam.runs.ai-filter.tsx | 5 +- .../app/runEngine/concerns/queues.server.ts | 17 ++- .../runEngine/services/batchTrigger.server.ts | 1 + .../runEngine/services/createBatch.server.ts | 2 +- .../services/triggerFailedTask.server.ts | 10 +- .../services/taskIdentifierCache.server.ts | 115 +++++++++++++++ .../services/taskIdentifierRegistry.server.ts | 139 ++++++++++++++++++ .../webapp/app/v3/runEngineHandlers.server.ts | 1 + .../changeCurrentDeployment.server.ts | 24 ++- .../services/createBackgroundWorker.server.ts | 18 +++ ...eateDeploymentBackgroundWorkerV3.server.ts | 16 +- .../app/v3/services/triggerTask.server.ts | 2 +- .../migration.sql | 2 + .../migration.sql | 40 +++++ .../database/prisma/schema.prisma | 29 ++++ 25 files changed, 515 insertions(+), 79 deletions(-) create mode 100644 .server-changes/task-identifier-registry.md create mode 100644 apps/webapp/app/services/taskIdentifierCache.server.ts create mode 100644 apps/webapp/app/services/taskIdentifierRegistry.server.ts create mode 100644 internal-packages/database/prisma/migrations/20260413000000_add_bwt_covering_index/migration.sql create mode 100644 internal-packages/database/prisma/migrations/20260413000001_add_task_identifier_table/migration.sql diff --git a/.server-changes/task-identifier-registry.md b/.server-changes/task-identifier-registry.md new file mode 100644 index 00000000000..327e188de21 --- /dev/null +++ b/.server-changes/task-identifier-registry.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: improvement +--- + +Replace the expensive DISTINCT query for task filter dropdowns with a dedicated TaskIdentifier registry table backed by Redis. Environments migrate automatically on their next deploy, with a transparent fallback to the legacy query for unmigrated environments. Also fixes duplicate dropdown entries when a task changes trigger source, and adds active/archived grouping for removed tasks. Moves BackgroundWorkerTask reads in the trigger hot path to the read replica. diff --git a/apps/webapp/app/components/logs/LogsTaskFilter.tsx b/apps/webapp/app/components/logs/LogsTaskFilter.tsx index fa64eff7bd3..3e2afdcf798 100644 --- a/apps/webapp/app/components/logs/LogsTaskFilter.tsx +++ b/apps/webapp/app/components/logs/LogsTaskFilter.tsx @@ -4,6 +4,8 @@ import { useMemo } from "react"; import * as Ariakit from "@ariakit/react"; import { ComboBox, + SelectGroup, + SelectGroupLabel, SelectItem, SelectList, SelectPopover, @@ -21,6 +23,7 @@ const shortcut = { key: "t" }; type TaskOption = { slug: string; triggerSource: TaskTriggerSource; + isInLatestDeployment: boolean; }; interface LogsTaskFilterProps { @@ -126,17 +129,42 @@ function TasksDropdown({ > - {filtered.map((item, index) => ( - - } - > - {item.slug} - - ))} + {filtered + .filter((item) => item.isInLatestDeployment) + .map((item) => ( + + } + > + {item.slug} + + ))} + {filtered.some((item) => !item.isInLatestDeployment) && ( + + Archived + {filtered + .filter((item) => !item.isInLatestDeployment) + .map((item) => ( + + + + } + > + {item.slug} + + ))} + + )} diff --git a/apps/webapp/app/components/runs/v3/RunFilters.tsx b/apps/webapp/app/components/runs/v3/RunFilters.tsx index dc3657b42a9..83ebaa0d51b 100644 --- a/apps/webapp/app/components/runs/v3/RunFilters.tsx +++ b/apps/webapp/app/components/runs/v3/RunFilters.tsx @@ -36,6 +36,8 @@ import { Paragraph } from "~/components/primitives/Paragraph"; import { ComboBox, SelectButtonItem, + SelectGroup, + SelectGroupLabel, SelectItem, SelectList, SelectPopover, @@ -322,7 +324,7 @@ export function getRunFiltersFromSearchParams( } type RunFiltersProps = { - possibleTasks: { slug: string; triggerSource: TaskTriggerSource }[]; + possibleTasks: { slug: string; triggerSource: TaskTriggerSource; isInLatestDeployment: boolean }[]; bulkActions: { id: string; type: BulkActionType; @@ -627,7 +629,7 @@ function TasksDropdown({ clearSearchValue: () => void; searchValue: string; onClose?: () => void; - possibleTasks: { slug: string; triggerSource: TaskTriggerSource }[]; + possibleTasks: { slug: string; triggerSource: TaskTriggerSource; isInLatestDeployment: boolean }[]; }) { const { values, replace } = useSearchParams(); @@ -658,17 +660,42 @@ function TasksDropdown({ > - {filtered.map((item, index) => ( - - } - > - - - ))} + {filtered + .filter((item) => item.isInLatestDeployment) + .map((item) => ( + + } + > + + + ))} + {filtered.some((item) => !item.isInLatestDeployment) && ( + + Archived + {filtered + .filter((item) => !item.isInLatestDeployment) + .map((item) => ( + + + + } + > + + + ))} + + )} diff --git a/apps/webapp/app/models/task.server.ts b/apps/webapp/app/models/task.server.ts index b696bac6039..aab3b3bcfc1 100644 --- a/apps/webapp/app/models/task.server.ts +++ b/apps/webapp/app/models/task.server.ts @@ -1,6 +1,9 @@ import type { TaskTriggerSource } from "@trigger.dev/database"; import { PrismaClientOrTransaction, sqlDatabaseSchema } from "~/db.server"; +export { getTaskIdentifiers } from "~/services/taskIdentifierRegistry.server"; +export type { TaskIdentifierEntry } from "~/services/taskIdentifierCache.server"; + /** * * @param prisma An efficient query to get all task identifiers for a project. diff --git a/apps/webapp/app/presenters/v3/ErrorsListPresenter.server.ts b/apps/webapp/app/presenters/v3/ErrorsListPresenter.server.ts index 13da4ff91f8..ea6e522dbd5 100644 --- a/apps/webapp/app/presenters/v3/ErrorsListPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/ErrorsListPresenter.server.ts @@ -13,7 +13,7 @@ import { type ErrorGroupStatus, type PrismaClientOrTransaction } from "@trigger. import { type Direction } from "~/components/ListPagination"; import { timeFilterFromTo } from "~/components/runs/v3/SharedFilters"; import { findDisplayableEnvironment } from "~/models/runtimeEnvironment.server"; -import { getAllTaskIdentifiers } from "~/models/task.server"; +import { getTaskIdentifiers } from "~/models/task.server"; import { ServiceValidationError } from "~/v3/services/baseService.server"; import { BasePresenter } from "~/presenters/v3/basePresenter.server"; @@ -170,7 +170,7 @@ export class ErrorsListPresenter extends BasePresenter { (search !== undefined && search !== "") || (statuses !== undefined && statuses.length > 0); - const possibleTasksAsync = getAllTaskIdentifiers(this.replica, environmentId); + const possibleTasksAsync = getTaskIdentifiers(environmentId); // Pre-filter by status: since status lives in Postgres (ErrorGroupState) and the error // list comes from ClickHouse, we resolve inclusion/exclusion sets upfront so that diff --git a/apps/webapp/app/presenters/v3/LogsListPresenter.server.ts b/apps/webapp/app/presenters/v3/LogsListPresenter.server.ts index 8a3bf692b5b..517c586e4e7 100644 --- a/apps/webapp/app/presenters/v3/LogsListPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/LogsListPresenter.server.ts @@ -7,7 +7,7 @@ import parseDuration from "parse-duration"; import { type Direction } from "~/components/ListPagination"; import { timeFilterFromTo, timeFilters } from "~/components/runs/v3/SharedFilters"; import { findDisplayableEnvironment } from "~/models/runtimeEnvironment.server"; -import { getAllTaskIdentifiers } from "~/models/task.server"; +import { getTaskIdentifiers } from "~/models/task.server"; import { ServiceValidationError } from "~/v3/services/baseService.server"; import { kindToLevel, type LogLevel, LogLevelSchema } from "~/utils/logUtils"; import { BasePresenter } from "~/presenters/v3/basePresenter.server"; @@ -176,7 +176,7 @@ export class LogsListPresenter extends BasePresenter { (search !== undefined && search !== "") || !time.isDefault; - const possibleTasksAsync = getAllTaskIdentifiers(this.replica, environmentId); + const possibleTasksAsync = getTaskIdentifiers(environmentId); const bulkActionsAsync = this.replica.bulkActionGroup.findMany({ select: { @@ -386,12 +386,7 @@ export class LogsListPresenter extends BasePresenter { next: nextCursor, previous: undefined, // For now, only support forward pagination }, - possibleTasks: possibleTasks - .map((task) => ({ - slug: task.slug, - triggerSource: task.triggerSource, - })) - .sort((a, b) => a.slug.localeCompare(b.slug)), + possibleTasks, bulkActions: bulkActions.map((bulkAction) => ({ id: bulkAction.friendlyId, type: bulkAction.type, diff --git a/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts b/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts index f22c7ccf340..de111abd279 100644 --- a/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts @@ -8,7 +8,7 @@ import { import { type Direction } from "~/components/ListPagination"; import { timeFilters } from "~/components/runs/v3/SharedFilters"; import { findDisplayableEnvironment } from "~/models/runtimeEnvironment.server"; -import { getAllTaskIdentifiers } from "~/models/task.server"; +import { getTaskIdentifiers } from "~/models/task.server"; import { RunsRepository } from "~/services/runsRepository/runsRepository.server"; import { machinePresetFromRun } from "~/v3/machinePresets.server"; import { ServiceValidationError } from "~/v3/services/baseService.server"; @@ -105,7 +105,7 @@ export class NextRunListPresenter { !time.isDefault; //get all possible tasks - const possibleTasksAsync = getAllTaskIdentifiers(this.replica, environmentId); + const possibleTasksAsync = getTaskIdentifiers(environmentId); //get possible bulk actions const bulkActionsAsync = this.replica.bulkActionGroup.findMany({ @@ -256,11 +256,7 @@ export class NextRunListPresenter { next: pagination.nextCursor ?? undefined, previous: pagination.previousCursor ?? undefined, }, - possibleTasks: possibleTasks - .map((task) => ({ slug: task.slug, triggerSource: task.triggerSource })) - .sort((a, b) => { - return a.slug.localeCompare(b.slug); - }), + possibleTasks, bulkActions: bulkActions.map((bulkAction) => ({ id: bulkAction.friendlyId, type: bulkAction.type, diff --git a/apps/webapp/app/presenters/v3/ScheduleListPresenter.server.ts b/apps/webapp/app/presenters/v3/ScheduleListPresenter.server.ts index 053414dcfc7..22c151d720b 100644 --- a/apps/webapp/app/presenters/v3/ScheduleListPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/ScheduleListPresenter.server.ts @@ -1,6 +1,7 @@ import { type RuntimeEnvironmentType, type ScheduleType } from "@trigger.dev/database"; import { type ScheduleListFilters } from "~/components/runs/v3/ScheduleFilters"; import { displayableEnvironment } from "~/models/runtimeEnvironment.server"; +import { getTaskIdentifiers } from "~/models/task.server"; import { getLimit } from "~/services/platform.v3.server"; import { findCurrentWorkerFromEnvironment } from "~/v3/models/workerDeployment.server"; import { ServiceValidationError } from "~/v3/services/baseService.server"; @@ -123,14 +124,10 @@ export class ScheduleListPresenter extends BasePresenter { } //get all possible scheduled tasks - const possibleTasks = await this._replica.backgroundWorkerTask.findMany({ - where: { - workerId: latestWorker.id, - projectId: project.id, - runtimeEnvironmentId: environmentId, - triggerSource: "SCHEDULED", - }, - }); + const allIdentifiers = await getTaskIdentifiers(environmentId); + const possibleTasks = allIdentifiers + .filter((t) => t.triggerSource === "SCHEDULED" && t.isInLatestDeployment) + .map((t) => ({ slug: t.slug })); //do this here to protect against SQL injection search = search && search !== "" ? `%${search}%` : undefined; @@ -285,7 +282,7 @@ export class ScheduleListPresenter extends BasePresenter { totalPages: Math.ceil(totalCount / pageSize), totalCount: totalCount, schedules, - possibleTasks: possibleTasks.map((task) => task.slug).sort((a, b) => a.localeCompare(b)), + possibleTasks: possibleTasks.map((task) => task.slug), hasFilters, limits: { used: schedulesCount, diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.dashboards.$dashboardKey/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.dashboards.$dashboardKey/route.tsx index 57b6b71db6f..cd358b7e67d 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.dashboards.$dashboardKey/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.dashboards.$dashboardKey/route.tsx @@ -17,14 +17,13 @@ import { TitleWidget } from "~/components/metrics/TitleWidget"; import { CreateDashboardPageButton } from "~/components/navigation/DashboardDialogs"; import { NavBar, PageAccessories, PageTitle } from "~/components/primitives/PageHeader"; import { TimeFilter } from "~/components/runs/v3/SharedFilters"; -import { $replica } from "~/db.server"; import { useEnvironment } from "~/hooks/useEnvironment"; import { useOrganization } from "~/hooks/useOrganizations"; import { useProject } from "~/hooks/useProject"; import { useSearchParams } from "~/hooks/useSearchParam"; import { findProjectBySlug } from "~/models/project.server"; import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server"; -import { getAllTaskIdentifiers } from "~/models/task.server"; +import { getTaskIdentifiers } from "~/models/task.server"; import { type BuiltInDashboardFilter, type LayoutItem, @@ -70,7 +69,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { organizationId: project.organizationId, key: dashboardKey, }), - getAllTaskIdentifiers($replica, environment.id), + getTaskIdentifiers(environment.id), ]); const filters = dashboard.filters ?? ["tasks", "queues"]; @@ -114,9 +113,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { return typedjson({ ...dashboard, filters, - possibleTasks: possibleTasks - .map((task) => ({ slug: task.slug, triggerSource: task.triggerSource })) - .sort((a, b) => a.slug.localeCompare(b.slug)), + possibleTasks, possibleModels, possiblePrompts, possibleOperations, @@ -201,7 +198,7 @@ export function MetricDashboard({ /** Which filters to show. Defaults to ["tasks", "queues"]. */ filters?: BuiltInDashboardFilter[]; /** Possible tasks for filtering */ - possibleTasks?: { slug: string; triggerSource: TaskTriggerSource }[]; + possibleTasks?: { slug: string; triggerSource: TaskTriggerSource; isInLatestDeployment: boolean }[]; /** Possible models for filtering */ possibleModels?: ModelOption[]; /** Possible prompt slugs for filtering */ diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.dashboards.custom.$dashboardId/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.dashboards.custom.$dashboardId/route.tsx index 051ea7a8a28..418078760cd 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.dashboards.custom.$dashboardId/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.dashboards.custom.$dashboardId/route.tsx @@ -35,7 +35,7 @@ import { Sheet, SheetContent } from "~/components/primitives/SheetV3"; import { useToast } from "~/components/primitives/Toast"; import { SimpleTooltip } from "~/components/primitives/Tooltip"; import { QueryEditor, type QueryEditorSaveData } from "~/components/query/QueryEditor"; -import { $replica, prisma } from "~/db.server"; +import { prisma } from "~/db.server"; import { env } from "~/env.server"; import { useDashboardEditor } from "~/hooks/useDashboardEditor"; import { useEnvironment } from "~/hooks/useEnvironment"; @@ -44,7 +44,7 @@ import { useProject } from "~/hooks/useProject"; import { redirectWithSuccessMessage } from "~/models/message.server"; import { findProjectBySlug } from "~/models/project.server"; import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server"; -import { getAllTaskIdentifiers } from "~/models/task.server"; +import { getTaskIdentifiers } from "~/models/task.server"; import { MetricDashboardPresenter } from "~/presenters/v3/MetricDashboardPresenter.server"; import { QueryPresenter } from "~/presenters/v3/QueryPresenter.server"; import { requireUser, requireUserId } from "~/services/session.server"; @@ -93,7 +93,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { queryPresenter.call({ organizationId: project.organizationId, }), - getAllTaskIdentifiers($replica, environment.id), + getTaskIdentifiers(environment.id), ]); // Admins and impersonating users can use EXPLAIN @@ -109,9 +109,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { queryHistory: history, isAdmin, maxRows: env.QUERY_CLICKHOUSE_MAX_RETURNED_ROWS, - possibleTasks: possibleTasks - .map((task) => ({ slug: task.slug, triggerSource: task.triggerSource })) - .sort((a, b) => a.slug.localeCompare(b.slug)), + possibleTasks, widgetCount, }); }; diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.ai-filter.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.ai-filter.tsx index 9ae306c98a6..ff289d241be 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.ai-filter.tsx +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.ai-filter.tsx @@ -2,11 +2,10 @@ import { openai } from "@ai-sdk/openai"; import { type ActionFunctionArgs, json } from "@remix-run/server-runtime"; import { tryCatch } from "@trigger.dev/core"; import { z } from "zod"; -import { $replica } from "~/db.server"; import { env } from "~/env.server"; import { findProjectBySlug } from "~/models/project.server"; import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server"; -import { getAllTaskIdentifiers } from "~/models/task.server"; +import { getTaskIdentifiers } from "~/models/task.server"; import { QueueListPresenter } from "~/presenters/v3/QueueListPresenter.server"; import { RunTagListPresenter } from "~/presenters/v3/RunTagListPresenter.server"; import { VersionListPresenter } from "~/presenters/v3/VersionListPresenter.server"; @@ -126,7 +125,7 @@ export async function action({ request, params }: ActionFunctionArgs) { const queryTasks: QueryTasks = { query: async () => { - const tasks = await getAllTaskIdentifiers($replica, environment.id); + const tasks = await getTaskIdentifiers(environment.id); return { tasks, }; diff --git a/apps/webapp/app/runEngine/concerns/queues.server.ts b/apps/webapp/app/runEngine/concerns/queues.server.ts index eb00bf1c586..136c3da3b9c 100644 --- a/apps/webapp/app/runEngine/concerns/queues.server.ts +++ b/apps/webapp/app/runEngine/concerns/queues.server.ts @@ -62,10 +62,15 @@ function extractQueueName(queue: { name?: unknown } | undefined): string | undef } export class DefaultQueueManager implements QueueManager { + private readonly replicaPrisma: PrismaClientOrTransaction; + constructor( private readonly prisma: PrismaClientOrTransaction, - private readonly engine: RunEngine - ) { } + private readonly engine: RunEngine, + replicaPrisma?: PrismaClientOrTransaction + ) { + this.replicaPrisma = replicaPrisma ?? prisma; + } async resolveQueueProperties( request: TriggerTaskRequest, @@ -103,7 +108,7 @@ export class DefaultQueueManager implements QueueManager { // Only fetch task for TTL if caller didn't provide a per-trigger TTL if (request.body.options?.ttl === undefined) { - const lockedTask = await this.prisma.backgroundWorkerTask.findFirst({ + const lockedTask = await this.replicaPrisma.backgroundWorkerTask.findFirst({ where: { workerId: lockedBackgroundWorker.id, runtimeEnvironmentId: request.environment.id, @@ -116,7 +121,7 @@ export class DefaultQueueManager implements QueueManager { } } else { // No queue override - fetch task with queue to get both default queue and TTL - const lockedTask = await this.prisma.backgroundWorkerTask.findFirst({ + const lockedTask = await this.replicaPrisma.backgroundWorkerTask.findFirst({ where: { workerId: lockedBackgroundWorker.id, runtimeEnvironmentId: request.environment.id, @@ -217,7 +222,7 @@ export class DefaultQueueManager implements QueueManager { // When queue is overridden, we only need TTL from the task (no queue join needed) if (overriddenQueueName) { - const task = await this.prisma.backgroundWorkerTask.findFirst({ + const task = await this.replicaPrisma.backgroundWorkerTask.findFirst({ where: { workerId: worker.id, runtimeEnvironmentId: environment.id, @@ -229,7 +234,7 @@ export class DefaultQueueManager implements QueueManager { return { queueName: overriddenQueueName, taskTtl: task?.ttl }; } - const task = await this.prisma.backgroundWorkerTask.findFirst({ + const task = await this.replicaPrisma.backgroundWorkerTask.findFirst({ where: { workerId: worker.id, runtimeEnvironmentId: environment.id, diff --git a/apps/webapp/app/runEngine/services/batchTrigger.server.ts b/apps/webapp/app/runEngine/services/batchTrigger.server.ts index 4e504163fec..3df2dfb00f9 100644 --- a/apps/webapp/app/runEngine/services/batchTrigger.server.ts +++ b/apps/webapp/app/runEngine/services/batchTrigger.server.ts @@ -531,6 +531,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine { const triggerFailedTaskService = new TriggerFailedTaskService({ prisma: this._prisma, engine: this._engine, + replicaPrisma: this._replica, }); for (const item of itemsToProcess) { diff --git a/apps/webapp/app/runEngine/services/createBatch.server.ts b/apps/webapp/app/runEngine/services/createBatch.server.ts index 309a7700f1a..0653e1ef1c2 100644 --- a/apps/webapp/app/runEngine/services/createBatch.server.ts +++ b/apps/webapp/app/runEngine/services/createBatch.server.ts @@ -40,7 +40,7 @@ export class CreateBatchService extends WithRunEngine { constructor(protected readonly _prisma: PrismaClientOrTransaction = prisma) { super({ prisma: _prisma }); - this.queueConcern = new DefaultQueueManager(this._prisma, this._engine); + this.queueConcern = new DefaultQueueManager(this._prisma, this._engine, this._replica); this.validator = new DefaultTriggerTaskValidator(); } diff --git a/apps/webapp/app/runEngine/services/triggerFailedTask.server.ts b/apps/webapp/app/runEngine/services/triggerFailedTask.server.ts index cdcfa63ff0b..0b59a523a6e 100644 --- a/apps/webapp/app/runEngine/services/triggerFailedTask.server.ts +++ b/apps/webapp/app/runEngine/services/triggerFailedTask.server.ts @@ -51,10 +51,16 @@ export type TriggerFailedTaskRequest = { */ export class TriggerFailedTaskService { private readonly prisma: PrismaClientOrTransaction; + private readonly replicaPrisma: PrismaClientOrTransaction; private readonly engine: RunEngine; - constructor(opts: { prisma: PrismaClientOrTransaction; engine: RunEngine }) { + constructor(opts: { + prisma: PrismaClientOrTransaction; + engine: RunEngine; + replicaPrisma?: PrismaClientOrTransaction; + }) { this.prisma = opts.prisma; + this.replicaPrisma = opts.replicaPrisma ?? opts.prisma; this.engine = opts.engine; } @@ -91,7 +97,7 @@ export class TriggerFailedTaskService { let queueName: string | undefined; let lockedQueueId: string | undefined; try { - const queueConcern = new DefaultQueueManager(this.prisma, this.engine); + const queueConcern = new DefaultQueueManager(this.prisma, this.engine, this.replicaPrisma); const bodyOptions = request.options as TriggerTaskRequest["body"]["options"]; const triggerRequest: TriggerTaskRequest = { taskId: request.taskId, diff --git a/apps/webapp/app/services/taskIdentifierCache.server.ts b/apps/webapp/app/services/taskIdentifierCache.server.ts new file mode 100644 index 00000000000..9d243b5f740 --- /dev/null +++ b/apps/webapp/app/services/taskIdentifierCache.server.ts @@ -0,0 +1,115 @@ +import { Redis } from "ioredis"; +import type { TaskTriggerSource } from "@trigger.dev/database"; +import { env } from "~/env.server"; +import { singleton } from "~/utils/singleton"; +import { logger } from "./logger.server"; + +const KEY_PREFIX = "tids:"; + +type CachedTaskIdentifier = { + s: string; + ts: TaskTriggerSource; + live: boolean; +}; + +export type TaskIdentifierEntry = { + slug: string; + triggerSource: TaskTriggerSource; + isInLatestDeployment: boolean; +}; + +function buildKey(environmentId: string): string { + return `${KEY_PREFIX}${environmentId}`; +} + +function encode(entry: TaskIdentifierEntry): string { + return JSON.stringify({ + s: entry.slug, + ts: entry.triggerSource, + live: entry.isInLatestDeployment, + } satisfies CachedTaskIdentifier); +} + +function decode(raw: string): TaskIdentifierEntry { + const parsed = JSON.parse(raw) as CachedTaskIdentifier; + return { + slug: parsed.s, + triggerSource: parsed.ts, + isInLatestDeployment: parsed.live, + }; +} + +function initializeRedis(): Redis | undefined { + const host = env.CACHE_REDIS_HOST; + if (!host) { + return undefined; + } + + return new Redis({ + connectionName: "taskIdentifierCache", + host, + port: env.CACHE_REDIS_PORT, + username: env.CACHE_REDIS_USERNAME, + password: env.CACHE_REDIS_PASSWORD, + keyPrefix: "tr:", + enableAutoPipelining: true, + ...(env.CACHE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + }); +} + +const redis = singleton("taskIdentifierCache", initializeRedis); + +export async function populateTaskIdentifierCache( + environmentId: string, + identifiers: TaskIdentifierEntry[] +): Promise { + if (!redis) return; + + try { + const key = buildKey(environmentId); + const pipeline = redis.pipeline(); + pipeline.del(key); + if (identifiers.length > 0) { + pipeline.sadd(key, ...identifiers.map(encode)); + } + await pipeline.exec(); + } catch (error) { + logger.error("Failed to populate task identifier cache", { + environmentId, + error, + }); + } +} + +export async function invalidateTaskIdentifierCache(environmentId: string): Promise { + if (!redis) return; + + try { + const key = buildKey(environmentId); + await redis.del(key); + } catch (error) { + logger.error("Failed to invalidate task identifier cache", { + environmentId, + error, + }); + } +} + +export async function getTaskIdentifiersFromCache( + environmentId: string +): Promise { + if (!redis) return null; + + try { + const key = buildKey(environmentId); + const members = await redis.smembers(key); + if (members.length === 0) return null; + return members.map(decode); + } catch (error) { + logger.error("Failed to get task identifiers from cache", { + environmentId, + error, + }); + return null; + } +} diff --git a/apps/webapp/app/services/taskIdentifierRegistry.server.ts b/apps/webapp/app/services/taskIdentifierRegistry.server.ts new file mode 100644 index 00000000000..23ffe41e9bd --- /dev/null +++ b/apps/webapp/app/services/taskIdentifierRegistry.server.ts @@ -0,0 +1,139 @@ +import { TaskTriggerSource } from "@trigger.dev/database"; +import { $replica, prisma } from "~/db.server"; +import { getAllTaskIdentifiers } from "~/models/task.server"; +import { logger } from "./logger.server"; +import { + getTaskIdentifiersFromCache, + populateTaskIdentifierCache, + type TaskIdentifierEntry, +} from "./taskIdentifierCache.server"; + +function toTriggerSource(source: string | undefined): TaskTriggerSource { + if (source === "SCHEDULED" || source === "schedule") return "SCHEDULED"; + if (source === "AGENT" || source === "agent") return "AGENT"; + return "STANDARD"; +} + +export async function syncTaskIdentifiers( + environmentId: string, + projectId: string, + workerId: string, + tasks: { id: string; triggerSource?: string }[] +): Promise { + const slugs = tasks.map((t) => t.id); + + for (const task of tasks) { + await prisma.taskIdentifier.upsert({ + where: { + runtimeEnvironmentId_slug: { + runtimeEnvironmentId: environmentId, + slug: task.id, + }, + }, + create: { + runtimeEnvironmentId: environmentId, + projectId, + slug: task.id, + currentTriggerSource: toTriggerSource(task.triggerSource), + currentWorkerId: workerId, + isInLatestDeployment: true, + }, + update: { + currentTriggerSource: toTriggerSource(task.triggerSource), + currentWorkerId: workerId, + lastSeenAt: new Date(), + isInLatestDeployment: true, + }, + }); + } + + if (slugs.length > 0) { + await prisma.taskIdentifier.updateMany({ + where: { + runtimeEnvironmentId: environmentId, + slug: { notIn: slugs }, + isInLatestDeployment: true, + }, + data: { isInLatestDeployment: false }, + }); + } + + const allIdentifiers = await prisma.taskIdentifier.findMany({ + where: { runtimeEnvironmentId: environmentId }, + select: { + slug: true, + currentTriggerSource: true, + isInLatestDeployment: true, + }, + }); + + populateTaskIdentifierCache( + environmentId, + allIdentifiers.map((t) => ({ + slug: t.slug, + triggerSource: t.currentTriggerSource, + isInLatestDeployment: t.isInLatestDeployment, + })) + ).catch((error) => { + logger.error("Failed to populate task identifier cache after sync", { environmentId, error }); + }); +} + +function sortEntries(entries: TaskIdentifierEntry[]): TaskIdentifierEntry[] { + return entries.sort((a, b) => { + if (a.isInLatestDeployment !== b.isInLatestDeployment) + return a.isInLatestDeployment ? -1 : 1; + return a.slug.localeCompare(b.slug); + }); +} + +export async function getTaskIdentifiers( + environmentId: string +): Promise { + const cached = await getTaskIdentifiersFromCache(environmentId); + if (cached) return sortEntries(cached); + + const dbRows = await $replica.taskIdentifier.findMany({ + where: { runtimeEnvironmentId: environmentId }, + select: { + slug: true, + currentTriggerSource: true, + isInLatestDeployment: true, + }, + }); + + if (dbRows.length > 0) { + const entries: TaskIdentifierEntry[] = dbRows.map((t) => ({ + slug: t.slug, + triggerSource: t.currentTriggerSource, + isInLatestDeployment: t.isInLatestDeployment, + })); + + populateTaskIdentifierCache(environmentId, entries).catch((error) => { + logger.error("Failed to populate task identifier cache after DB read", { + environmentId, + error, + }); + }); + + return sortEntries(entries); + } + + const legacyRows = await getAllTaskIdentifiers($replica, environmentId); + const entries: TaskIdentifierEntry[] = legacyRows.map((t) => ({ + slug: t.slug, + triggerSource: t.triggerSource, + isInLatestDeployment: true, + })); + + if (entries.length > 0) { + populateTaskIdentifierCache(environmentId, entries).catch((error) => { + logger.error("Failed to populate task identifier cache after legacy fallback", { + environmentId, + error, + }); + }); + } + + return sortEntries(entries); +} diff --git a/apps/webapp/app/v3/runEngineHandlers.server.ts b/apps/webapp/app/v3/runEngineHandlers.server.ts index 411f91ff75d..fa2afeeca68 100644 --- a/apps/webapp/app/v3/runEngineHandlers.server.ts +++ b/apps/webapp/app/v3/runEngineHandlers.server.ts @@ -667,6 +667,7 @@ export function setupBatchQueueCallbacks() { const triggerFailedTaskService = new TriggerFailedTaskService({ prisma, engine, + replicaPrisma: $replica, }); // Check for pre-marked error items (e.g. oversized payloads) diff --git a/apps/webapp/app/v3/services/changeCurrentDeployment.server.ts b/apps/webapp/app/v3/services/changeCurrentDeployment.server.ts index 00360df946c..fc841199eb7 100644 --- a/apps/webapp/app/v3/services/changeCurrentDeployment.server.ts +++ b/apps/webapp/app/v3/services/changeCurrentDeployment.server.ts @@ -1,8 +1,11 @@ +import { tryCatch } from "@trigger.dev/core/v3"; +import { CURRENT_DEPLOYMENT_LABEL } from "@trigger.dev/core/v3/isomorphic"; import { WorkerDeployment } from "@trigger.dev/database"; +import { logger } from "~/services/logger.server"; +import { syncTaskIdentifiers } from "~/services/taskIdentifierRegistry.server"; import { BaseService, ServiceValidationError } from "./baseService.server"; import { ExecuteTasksWaitingForDeployService } from "./executeTasksWaitingForDeploy"; import { compareDeploymentVersions } from "../utils/deploymentVersions"; -import { CURRENT_DEPLOYMENT_LABEL } from "@trigger.dev/core/v3/isomorphic"; export type ChangeCurrentDeploymentDirection = "promote" | "rollback"; @@ -96,6 +99,23 @@ export class ChangeCurrentDeploymentService extends BaseService { }, }); - await ExecuteTasksWaitingForDeployService.enqueue(deployment.workerId); + const [syncError] = await tryCatch( + (async () => { + const tasks = await this._prisma.backgroundWorkerTask.findMany({ + where: { workerId: deployment.workerId! }, + select: { slug: true, triggerSource: true }, + }); + await syncTaskIdentifiers( + deployment.environmentId, + deployment.projectId, + deployment.workerId!, + tasks.map((t) => ({ id: t.slug, triggerSource: t.triggerSource })) + ); + })() + ); + + if (syncError) { + logger.error("Error syncing task identifiers on deployment change", { error: syncError }); + } } } diff --git a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts index 1b51ec04aee..c8381327249 100644 --- a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts +++ b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts @@ -13,6 +13,7 @@ import { $transaction, Prisma, PrismaClientOrTransaction } from "~/db.server"; import { sanitizeQueueName } from "~/models/taskQueue.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; +import { syncTaskIdentifiers } from "~/services/taskIdentifierRegistry.server"; import { generateFriendlyId } from "../friendlyIdentifiers"; import { removeQueueConcurrencyLimits, @@ -158,6 +159,23 @@ export class CreateBackgroundWorkerService extends BaseService { throw new ServiceValidationError("Error syncing declarative schedules"); } + const [syncIdentifiersError] = await tryCatch( + syncTaskIdentifiers( + environment.id, + project.id, + backgroundWorker.id, + body.metadata.tasks.map((t) => ({ id: t.id, triggerSource: t.triggerSource })) + ) + ); + + if (syncIdentifiersError) { + logger.error("Error syncing task identifiers", { + error: syncIdentifiersError, + backgroundWorker, + environment, + }); + } + const [updateConcurrencyLimitsError] = await tryCatch( updateEnvConcurrencyLimits(environment) ); diff --git a/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts index e093f2c2006..9743cffcdbe 100644 --- a/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts +++ b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts @@ -1,7 +1,8 @@ -import { CreateBackgroundWorkerRequestBody } from "@trigger.dev/core/v3"; +import { CreateBackgroundWorkerRequestBody, tryCatch } from "@trigger.dev/core/v3"; import type { BackgroundWorker, Prisma } from "@trigger.dev/database"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; +import { syncTaskIdentifiers } from "~/services/taskIdentifierRegistry.server"; import { socketIo } from "../handleSocketIo.server"; import { updateEnvConcurrencyLimits } from "../runQueue.server"; import { PerformDeploymentAlertsService } from "./alerts/performDeploymentAlerts.server"; @@ -130,6 +131,19 @@ export class CreateDeploymentBackgroundWorkerServiceV3 extends BaseService { }, }); + const [syncIdError] = await tryCatch( + syncTaskIdentifiers( + environment.id, + environment.projectId, + backgroundWorker.id, + body.metadata.tasks.map((t) => ({ id: t.id, triggerSource: t.triggerSource })) + ) + ); + + if (syncIdError) { + logger.error("Error syncing task identifiers", { error: syncIdError }); + } + try { //send a notification that a new worker has been created await projectPubSub.publish( diff --git a/apps/webapp/app/v3/services/triggerTask.server.ts b/apps/webapp/app/v3/services/triggerTask.server.ts index 000633fb73f..96712c36cc4 100644 --- a/apps/webapp/app/v3/services/triggerTask.server.ts +++ b/apps/webapp/app/v3/services/triggerTask.server.ts @@ -99,7 +99,7 @@ export class TriggerTaskService extends WithRunEngine { const service = new RunEngineTriggerTaskService({ prisma: this._prisma, engine: this._engine, - queueConcern: new DefaultQueueManager(this._prisma, this._engine), + queueConcern: new DefaultQueueManager(this._prisma, this._engine, this._replica), validator: new DefaultTriggerTaskValidator(), payloadProcessor: new DefaultPayloadProcessor(), idempotencyKeyConcern: new IdempotencyKeyConcern( diff --git a/internal-packages/database/prisma/migrations/20260413000000_add_bwt_covering_index/migration.sql b/internal-packages/database/prisma/migrations/20260413000000_add_bwt_covering_index/migration.sql new file mode 100644 index 00000000000..6e95c900b34 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20260413000000_add_bwt_covering_index/migration.sql @@ -0,0 +1,2 @@ +CREATE INDEX CONCURRENTLY IF NOT EXISTS "BackgroundWorkerTask_runtimeEnvironmentId_slug_triggerSource_idx" + ON "BackgroundWorkerTask"("runtimeEnvironmentId", slug, "triggerSource"); diff --git a/internal-packages/database/prisma/migrations/20260413000001_add_task_identifier_table/migration.sql b/internal-packages/database/prisma/migrations/20260413000001_add_task_identifier_table/migration.sql new file mode 100644 index 00000000000..43a31e905d0 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20260413000001_add_task_identifier_table/migration.sql @@ -0,0 +1,40 @@ +-- CreateTable +CREATE TABLE "TaskIdentifier" ( + "id" TEXT NOT NULL, + "runtimeEnvironmentId" TEXT NOT NULL, + "projectId" TEXT NOT NULL, + "slug" TEXT NOT NULL, + "currentTriggerSource" "TaskTriggerSource" NOT NULL DEFAULT 'STANDARD', + "currentWorkerId" TEXT NOT NULL, + "firstSeenAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "lastSeenAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "isInLatestDeployment" BOOLEAN NOT NULL DEFAULT true, + + CONSTRAINT "TaskIdentifier_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE UNIQUE INDEX "TaskIdentifier_runtimeEnvironmentId_slug_key" + ON "TaskIdentifier"("runtimeEnvironmentId", "slug"); + +-- CreateIndex +CREATE INDEX "TaskIdentifier_runtimeEnvironmentId_isInLatestDeployment_idx" + ON "TaskIdentifier"("runtimeEnvironmentId", "isInLatestDeployment"); + +-- AddForeignKey +ALTER TABLE "TaskIdentifier" + ADD CONSTRAINT "TaskIdentifier_runtimeEnvironmentId_fkey" + FOREIGN KEY ("runtimeEnvironmentId") REFERENCES "RuntimeEnvironment"("id") + ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "TaskIdentifier" + ADD CONSTRAINT "TaskIdentifier_projectId_fkey" + FOREIGN KEY ("projectId") REFERENCES "Project"("id") + ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "TaskIdentifier" + ADD CONSTRAINT "TaskIdentifier_currentWorkerId_fkey" + FOREIGN KEY ("currentWorkerId") REFERENCES "BackgroundWorker"("id") + ON DELETE CASCADE ON UPDATE CASCADE; diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index 7138aeaab0d..02bedf2045e 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -354,6 +354,7 @@ model RuntimeEnvironment { customerQueries CustomerQuery[] prompts Prompt[] errorGroupStates ErrorGroupState[] + taskIdentifiers TaskIdentifier[] @@unique([projectId, slug, orgMemberId]) @@unique([projectId, shortcode]) @@ -429,6 +430,7 @@ model Project { prompts Prompt[] platformNotifications PlatformNotification[] errorGroupStates ErrorGroupState[] + taskIdentifiers TaskIdentifier[] } enum ProjectVersion { @@ -517,6 +519,8 @@ model BackgroundWorker { supportsLazyAttempts Boolean @default(false) + taskIdentifiers TaskIdentifier[] + @@unique([projectId, runtimeEnvironmentId, version]) @@index([runtimeEnvironmentId]) // Get the latest worker for a given environment @@ -659,6 +663,7 @@ model BackgroundWorkerTask { // Quick lookup of task identifiers @@index([projectId, slug]) @@index([runtimeEnvironmentId, projectId]) + @@index([runtimeEnvironmentId, slug, triggerSource]) } enum TaskTriggerSource { @@ -2917,3 +2922,27 @@ model ErrorGroupState { @@unique([environmentId, taskIdentifier, errorFingerprint]) @@index([environmentId, status]) } + +model TaskIdentifier { + id String @id @default(cuid()) + + runtimeEnvironment RuntimeEnvironment @relation(fields: [runtimeEnvironmentId], references: [id], onDelete: Cascade, onUpdate: Cascade) + runtimeEnvironmentId String + + project Project @relation(fields: [projectId], references: [id], onDelete: Cascade, onUpdate: Cascade) + projectId String + + slug String + + currentTriggerSource TaskTriggerSource @default(STANDARD) + + currentWorker BackgroundWorker @relation(fields: [currentWorkerId], references: [id], onDelete: Cascade, onUpdate: Cascade) + currentWorkerId String + + firstSeenAt DateTime @default(now()) + lastSeenAt DateTime @default(now()) + isInLatestDeployment Boolean @default(true) + + @@unique([runtimeEnvironmentId, slug]) + @@index([runtimeEnvironmentId, isInLatestDeployment]) +} \ No newline at end of file From c0ceb9002ca6d5316970d89d2b094235eed1452c Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Mon, 13 Apr 2026 22:43:25 +0100 Subject: [PATCH 2/2] fix: remove AGENT trigger source ref, restore ExecuteTasksWaitingForDeploy call - Remove AGENT case from toTriggerSource (not yet on this branch) - Restore accidentally removed ExecuteTasksWaitingForDeployService.enqueue call in ChangeCurrentDeploymentService --- apps/webapp/app/services/taskIdentifierRegistry.server.ts | 1 - apps/webapp/app/v3/services/changeCurrentDeployment.server.ts | 2 ++ 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/webapp/app/services/taskIdentifierRegistry.server.ts b/apps/webapp/app/services/taskIdentifierRegistry.server.ts index 23ffe41e9bd..3d1430518c7 100644 --- a/apps/webapp/app/services/taskIdentifierRegistry.server.ts +++ b/apps/webapp/app/services/taskIdentifierRegistry.server.ts @@ -10,7 +10,6 @@ import { function toTriggerSource(source: string | undefined): TaskTriggerSource { if (source === "SCHEDULED" || source === "schedule") return "SCHEDULED"; - if (source === "AGENT" || source === "agent") return "AGENT"; return "STANDARD"; } diff --git a/apps/webapp/app/v3/services/changeCurrentDeployment.server.ts b/apps/webapp/app/v3/services/changeCurrentDeployment.server.ts index fc841199eb7..ee788397a08 100644 --- a/apps/webapp/app/v3/services/changeCurrentDeployment.server.ts +++ b/apps/webapp/app/v3/services/changeCurrentDeployment.server.ts @@ -117,5 +117,7 @@ export class ChangeCurrentDeploymentService extends BaseService { if (syncError) { logger.error("Error syncing task identifiers on deployment change", { error: syncError }); } + + await ExecuteTasksWaitingForDeployService.enqueue(deployment.workerId); } }