Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/stop-creating-taskruntag-records.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: improvement
---

Stop creating TaskRunTag records and _TaskRunToTaskRunTag join table entries during task triggering. The denormalized runTags string array on TaskRun already stores tag names, making the M2M relation redundant write overhead.
107 changes: 0 additions & 107 deletions apps/webapp/app/models/taskRunTag.server.ts
Comment thread
ericallam marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -1,108 +1 @@
import { Prisma } from "@trigger.dev/database";
import { prisma } from "~/db.server";
import { generateFriendlyId } from "~/v3/friendlyIdentifiers";
import { PrismaClientOrTransaction } from "@trigger.dev/database";

export const MAX_TAGS_PER_RUN = 10;
const MAX_RETRIES = 3;

export async function createTag(
{ tag, projectId }: { tag: string; projectId: string },
prismaClient: PrismaClientOrTransaction = prisma
) {
if (tag.trim().length === 0) return;

let attempts = 0;
const friendlyId = generateFriendlyId("runtag");

while (attempts < MAX_RETRIES) {
try {
return await prisma.taskRunTag.upsert({
where: {
projectId_name: {
projectId,
name: tag,
},
},
create: {
friendlyId,
name: tag,
projectId,
},
update: {},
});
} catch (error) {
if (error instanceof Prisma.PrismaClientKnownRequestError && error.code === "P2002") {
// Handle unique constraint violation (conflict)
attempts++;
if (attempts >= MAX_RETRIES) {
throw new Error(`Failed to create tag after ${MAX_RETRIES} attempts due to conflicts.`);
}
} else {
throw error; // Re-throw other errors
}
}
}
}

export type TagRecord = {
id: string;
name: string;
};

export async function createTags(
{
tags,
projectId,
}: {
tags: string | string[] | undefined;
projectId: string;
},
prismaClient: PrismaClientOrTransaction = prisma
): Promise<TagRecord[]> {
if (!tags) {
return [];
}

const tagsArray = typeof tags === "string" ? [tags] : tags;

if (tagsArray.length === 0) {
return [];
}

const tagRecords: TagRecord[] = [];
for (const tag of tagsArray) {
const tagRecord = await createTag(
{
tag,
projectId,
},
prismaClient
);
if (tagRecord) {
tagRecords.push({ id: tagRecord.id, name: tagRecord.name });
}
}

return tagRecords;
}

export async function getTagsForRunId({
friendlyId,
environmentId,
}: {
friendlyId: string;
environmentId: string;
}) {
const run = await prisma.taskRun.findFirst({
where: {
friendlyId,
runtimeEnvironmentId: environmentId,
},
select: {
tags: true,
},
});

return run?.tags ?? undefined;
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const commonRunSelect = {
metadata: true,
metadataType: true,
ttl: true,
tags: true,
runTags: true,
Comment thread
ericallam marked this conversation as resolved.
Outdated
costInCents: true,
baseCostInCents: true,
usageDurationMs: true,
Expand Down Expand Up @@ -459,9 +459,7 @@ async function createCommonRunStructure(run: CommonRelatedRun, apiVersion: API_V
durationMs: run.usageDurationMs,
isTest: run.isTest,
depth: run.depth,
tags: run.tags
.map((t: { name: string }) => t.name)
.sort((a: string, b: string) => a.localeCompare(b)),
tags: [...(run.runTags ?? [])].sort((a: string, b: string) => a.localeCompare(b)),
...ApiRetrieveRunPresenter.apiBooleanHelpersFromTaskRunStatus(run.status, apiVersion),
triggerFunction: resolveTriggerFunction(run),
batchId: run.batch?.friendlyId,
Expand Down
35 changes: 12 additions & 23 deletions apps/webapp/app/routes/api.v1.runs.$runId.tags.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { type ActionFunctionArgs, json } from "@remix-run/server-runtime";
import { AddTagsRequestBody } from "@trigger.dev/core/v3";
import { z } from "zod";
import { prisma } from "~/db.server";
import { createTag, getTagsForRunId, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
import { MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
import { authenticateApiRequest } from "~/services/apiAuth.server";

const ParamsSchema = z.object({
Expand Down Expand Up @@ -37,17 +37,23 @@ export async function action({ request, params }: ActionFunctionArgs) {
return json({ error: "Invalid request body", issues: body.error.issues }, { status: 400 });
}

const existingTags =
(await getTagsForRunId({
const run = await prisma.taskRun.findFirst({
where: {
friendlyId: parsedParams.data.runId,
environmentId: authenticationResult.environment.id,
})) ?? [];
runtimeEnvironmentId: authenticationResult.environment.id,
},
select: {
runTags: true,
},
});

const existingTags = run?.runTags ?? [];

//remove duplicate tags from the new tags
const bodyTags = typeof body.data.tags === "string" ? [body.data.tags] : body.data.tags;
const newTags = bodyTags.filter((tag) => {
if (tag.trim().length === 0) return false;
return !existingTags.map((t) => t.name).includes(tag);
return !existingTags.includes(tag);
});

if (existingTags.length + newTags.length > MAX_TAGS_PER_RUN) {
Expand All @@ -65,29 +71,12 @@ export async function action({ request, params }: ActionFunctionArgs) {
return json({ message: "No new tags to add" }, { status: 200 });
}

//create tags
let tagIds: string[] = existingTags.map((t) => t.id);
if (newTags.length > 0) {
for (const tag of newTags) {
const tagRecord = await createTag({
tag,
projectId: authenticationResult.environment.projectId,
});
if (tagRecord) {
tagIds.push(tagRecord.id);
}
}
}

await prisma.taskRun.update({
where: {
friendlyId: parsedParams.data.runId,
runtimeEnvironmentId: authenticationResult.environment.id,
},
data: {
tags: {
connect: tagIds.map((id) => ({ id })),
},
runTags: {
push: newTags,
},
Comment thread
ericallam marked this conversation as resolved.
Expand Down
10 changes: 3 additions & 7 deletions apps/webapp/app/routes/resources.runs.$runParam.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
taskIdentifier: true,
friendlyId: true,
isTest: true,
tags: {
select: {
name: true,
},
},
runTags: true,
machinePreset: true,
lockedToVersion: {
select: {
Expand Down Expand Up @@ -178,7 +174,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
run: {
id: run.friendlyId,
createdAt: run.createdAt,
tags: run.tags.map((tag) => tag.name),
tags: run.runTags ?? [],
isTest: run.isTest,
idempotencyKey: run.idempotencyKey ?? undefined,
startedAt: run.startedAt ?? run.createdAt,
Expand Down Expand Up @@ -244,7 +240,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
isCustomQueue: !run.queue.startsWith("task/"),
concurrencyKey: run.concurrencyKey,
},
tags: run.tags.map((tag) => tag.name),
tags: run.runTags ?? [],
baseCostInCents: run.baseCostInCents,
costInCents: run.costInCents,
totalCostInCents: run.costInCents + run.baseCostInCents,
Expand Down
14 changes: 5 additions & 9 deletions apps/webapp/app/runEngine/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import {
stringifyDuration,
} from "@trigger.dev/core/v3/isomorphic";
import type { PrismaClientOrTransaction } from "@trigger.dev/database";
import { createTags } from "~/models/taskRunTag.server";
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { parseDelay } from "~/utils/delays";
Expand Down Expand Up @@ -287,14 +286,11 @@ export class RunEngineTriggerTaskService {
)
: undefined;

//upsert tags
const tags = await createTags(
{
tags: body.options?.tags,
projectId: environment.projectId,
},
this.prisma
);
const tags = body.options?.tags
? typeof body.options.tags === "string"
? [body.options.tags]
: body.options.tags
: [];
Comment thread
ericallam marked this conversation as resolved.
Outdated

const depth = parentRun ? parentRun.depth + 1 : 0;

Expand Down
5 changes: 2 additions & 3 deletions apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,6 @@ export class SharedQueueConsumer {
take: 1,
orderBy: { number: "desc" },
},
tags: true,
checkpoints: {
take: 1,
orderBy: {
Expand Down Expand Up @@ -1648,7 +1647,7 @@ export const AttemptForExecutionGetPayload = {
costInCents: true,
baseCostInCents: true,
maxDurationInSeconds: true,
tags: true,
runTags: true,
taskEventStore: true,
},
},
Expand Down Expand Up @@ -1725,7 +1724,7 @@ class SharedQueueTasks {
context: taskRun.context,
createdAt: taskRun.createdAt,
startedAt: taskRun.startedAt ?? taskRun.createdAt,
tags: taskRun.tags.map((tag) => tag.name),
tags: taskRun.runTags ?? [],
isTest: taskRun.isTest,
idempotencyKey: taskRun.idempotencyKey ?? undefined,
durationMs: taskRun.usageDurationMs,
Expand Down
3 changes: 1 addition & 2 deletions apps/webapp/app/v3/services/createTaskRunAttempt.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ export class CreateTaskRunAttemptService extends BaseService {
runtimeEnvironmentId: environment.id,
},
include: {
tags: true,
attempts: {
take: 1,
orderBy: {
Expand Down Expand Up @@ -209,7 +208,7 @@ export class CreateTaskRunAttemptService extends BaseService {
payloadType: taskRun.payloadType,
context: taskRun.context,
createdAt: taskRun.createdAt,
tags: taskRun.tags.map((tag) => tag.name),
tags: taskRun.runTags ?? [],
isTest: taskRun.isTest,
idempotencyKey: taskRun.idempotencyKey ?? undefined,
startedAt: taskRun.startedAt ?? taskRun.createdAt,
Expand Down
21 changes: 1 addition & 20 deletions apps/webapp/app/v3/services/triggerTaskV1.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
import { Prisma } from "@trigger.dev/database";
import { z } from "zod";
import { env } from "~/env.server";
import { createTag, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
import { MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { autoIncrementCounter } from "~/services/autoIncrementCounter.server";
import { logger } from "~/services/logger.server";
Expand Down Expand Up @@ -345,21 +345,8 @@ export class TriggerTaskServiceV1 extends BaseService {

span.setAttribute("queueName", queueName);

//upsert tags
let tagIds: string[] = [];
const bodyTags =
typeof body.options?.tags === "string" ? [body.options.tags] : body.options?.tags;
if (bodyTags && bodyTags.length > 0) {
for (const tag of bodyTags) {
const tagRecord = await createTag({
tag,
projectId: environment.projectId,
});
if (tagRecord) {
tagIds.push(tagRecord.id);
}
}
}

const depth = dependentAttempt
? dependentAttempt.taskRun.depth + 1
Expand Down Expand Up @@ -409,12 +396,6 @@ export class TriggerTaskServiceV1 extends BaseService {
maxAttempts: body.options?.maxAttempts,
taskEventStore: store,
ttl,
tags:
tagIds.length === 0
? undefined
: {
connect: tagIds.map((id) => ({ id })),
},
parentTaskRunId:
dependentAttempt?.taskRun.id ??
parentAttempt?.taskRun.id ??
Expand Down
8 changes: 1 addition & 7 deletions internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -637,13 +637,7 @@ export class RunEngine {
priorityMs,
queueTimestamp: queueTimestamp ?? delayUntil ?? new Date(),
ttl: resolvedTtl,
tags:
tags.length === 0
? undefined
: {
connect: tags,
},
runTags: tags.length === 0 ? undefined : tags.map((tag) => tag.name),
runTags: tags.length === 0 ? undefined : tags,
Comment thread
coderabbitai[bot] marked this conversation as resolved.
oneTimeUseToken,
parentTaskRunId,
rootTaskRunId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export type DebounceOptions = {
payloadType: string;
metadata?: string;
metadataType?: string;
tags?: { id: string; name: string }[];
tags?: string[];
maxAttempts?: number;
maxDurationInSeconds?: number;
machine?: string;
Expand Down Expand Up @@ -876,10 +876,7 @@ return 0

// Handle tags update - replace existing tags
if (updateData.tags !== undefined) {
updatePayload.runTags = updateData.tags.map((t) => t.name);
updatePayload.tags = {
set: updateData.tags.map((t) => ({ id: t.id })),
};
updatePayload.runTags = updateData.tags;
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

const updatedRun = await prisma.taskRun.update({
Expand Down
2 changes: 1 addition & 1 deletion internal-packages/run-engine/src/engine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ export type TriggerParams = {
priorityMs?: number;
queueTimestamp?: Date;
ttl?: string;
tags: { id: string; name: string }[];
tags: string[];
parentTaskRunId?: string;
rootTaskRunId?: string;
replayedFromTaskRunFriendlyId?: string;
Expand Down
Loading