Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions src/tasks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { Schema } from 'ajv';
import { type CronExpression, CronExpressionParser } from 'cron-parser';
import { tasks as tasksEnv } from '../config-loader/env.js';
import { addPureHook } from '../sbvr-api/hooks.js';
import * as permissions from '../sbvr-api/permissions.js';
import * as sbvrUtils from '../sbvr-api/sbvr-utils.js';
import type { ConfigLoader } from '../server-glue/module.js';
import { ajv, apiRoot } from './common.js';
Expand All @@ -11,6 +12,7 @@ import type TasksModel from './tasks.js';
import type { Task } from './tasks.js';
import type { FromSchema } from 'json-schema-to-ts';
import { importSBVR } from '../server-glue/sbvr-loader.js';
import { BadRequestError, ConflictError } from '../sbvr-api/errors.js';

export type * from './tasks.js';

Expand All @@ -23,6 +25,13 @@ CREATE INDEX IF NOT EXISTS idx_task_poll ON task USING btree (
"is scheduled to execute on-time" ASC,
"id" ASC
) WHERE status = 'queued';

-- TODO: Remove this once pinejs is able to auto generate partial unique indexes from rules.
-- It is necessary that each handler that executes a task that is scheduled with a cron expression and has a status that is equal to "queued", executes at most one task that is scheduled with a cron expression and has a status that is equal to "queued".
CREATE UNIQUE INDEX IF NOT EXISTS "task$/Mt7Ad3mHEm0JFpuaX1BioDwNSWTgsEFOG1igq8EIrk="
Comment thread
thgreasi marked this conversation as resolved.
ON "task" ("is executed by-handler")
WHERE ("is scheduled with-cron expression" IS NOT NULL
AND 'queued' = "status");
`;

declare module '../sbvr-api/sbvr-utils.js' {
Expand Down Expand Up @@ -164,6 +173,19 @@ export const config: ConfigLoader.Config = {
break;
}
},
'23.4.0-unique-cron-tasks': async (tx, { db }) => {
switch (db.engine) {
// No need to migrate anything other than postgres
case 'postgres':
await tx.executeSql(`\
-- It is necessary that each handler that executes a task that is scheduled with a cron expression and has a status that is equal to "queued", executes at most one task that is scheduled with a cron expression and has a status that is equal to "queued".
Comment thread
thgreasi marked this conversation as resolved.
CREATE UNIQUE INDEX IF NOT EXISTS "task$/Mt7Ad3mHEm0JFpuaX1BioDwNSWTgsEFOG1igq8EIrk="
ON "task" ("is executed by-handler")
WHERE ("is scheduled with-cron expression" IS NOT NULL
AND 'queued' = "status");`);
break;
}
},
},
},
],
Expand Down Expand Up @@ -202,3 +224,65 @@ export function addTaskHandler<T extends Schema>(
validate: schema != null ? ajv.compile(schema) : undefined,
});
}

// Register a cron task and its handler
export async function addCronTask(
name: string,
cron: string,
fn: TaskHandler<
NonNullable<Task['Read']['is_executed_with__parameter_set']>
>['fn'],
): Promise<void> {
addTaskHandler(name, fn);

const client = sbvrUtils.api[apiRoot];
try {
await client.post({
resource: 'task',
passthrough: {
req: permissions.root,
},
options: {
returnResource: false,
},
body: {
is_executed_by__handler: name,
is_scheduled_with__cron_expression: cron,
status: 'queued',
},
});
} catch (err) {
if (
// TODO: Remove the ConflictError one we bump to https://github.com/balena-io-modules/abstract-sql-compiler/pull/316
// since then the rule will throw a proper BadRequestError instead of the generic ConflictError that's atm thrown
// from the partial unique index.
(err instanceof ConflictError &&
err.message === 'Unique key constraint violated') ||
(err instanceof BadRequestError &&
err.message ===
'It is necessary that each handler that executes a task that is scheduled with a cron expression and has a status that is equal to "queued", executes at most one task that is scheduled with a cron expression and has a status that is equal to "queued".')
) {
await client.patch({
resource: 'task',
passthrough: {
req: permissions.root,
},
options: {
$filter: {
is_executed_by__handler: name,
status: 'queued',
$and: [
{ is_scheduled_with__cron_expression: { $ne: null } },
{ is_scheduled_with__cron_expression: { $ne: cron } },
],
},
},
body: {
is_scheduled_with__cron_expression: cron,
},
});
} else {
throw err;
}
}
}
4 changes: 3 additions & 1 deletion src/tasks/tasks.sbvr
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Vocabulary: tasks

Term: id
Concept Type: Big Serial (Type)
Concept Type: Big Serial (Type)
Term: actor
Concept Type: Integer (Type)
Term: attempt count
Expand Down Expand Up @@ -31,6 +31,7 @@ Fact type: task has key
Fact type: task is created by actor
Necessity: each task is created by exactly one actor
Fact type: task is executed by handler
Synonymous Form: handler executes task
Necessity: each task is executed by exactly one handler
Fact type: task is executed with parameter set
Necessity: each task is executed with at most one parameter set
Expand All @@ -53,3 +54,4 @@ Fact type: task has attempt limit
Necessity: each task has exactly one attempt limit
Necessity: each task has an attempt limit that is greater than or equal to 1

Rule: It is necessary that each handler that executes a task that is scheduled with a cron expression and has a status that is equal to "queued", executes at most one task that is scheduled with a cron expression and has a status that is equal to "queued".
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be simplified to:

Suggested change
Rule: It is necessary that each handler that executes a task that is scheduled with a cron expression and has a status that is equal to "queued", executes at most one task that is scheduled with a cron expression and has a status that is equal to "queued".
Rule: It is necessary that each handler executes at most one task that is scheduled with a cron expression and has a status that is equal to "queued".

Copy link
Copy Markdown
Member Author

@thgreasi thgreasi Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Page- It seems that the affected IDs optimization doesn't like it that rule (I also tried adding a comma, but that lead to "no match" errors):

Error: Only queries in the form of:
	SELECT (SELECT COUNT(*) ...) = 0
	SELECT NOT EXISTS (SELECT ...)
are supported for affected IDs insertion.

I also realized that this should also work (the sentence makes sense) but
a) Afaict filtering on both sides generates much faster queries, since allows the db to use indexes & cut down rows much sooner.
If we fake the generated sql (by filtering on the fields and then commenting them out), the generated query is prohibitively slow:

SELECT NOT EXISTS (
	SELECT "task.1"."is executed by-handler"
	FROM "task" AS "task.1"
	WHERE TRUE
-- 	AND "task.1"."is scheduled with-cron expression" IS NOT NULL
-- 	AND 'queued' = "task.1"."status"
-- 	AND "task.1"."status" IS NOT NULL
	AND (
		SELECT COUNT(*)
		FROM "task" AS "task.4"
		WHERE "task.4"."is scheduled with-cron expression" IS NOT NULL
		AND 'queued' = "task.4"."status"
		AND "task.4"."status" IS NOT NULL
		AND "task.4"."is executed by-handler" = "task.1"."is executed by-handler"
	) >= 2
) AS "result";
QUERY PLAN
Result  (cost=4.90..4.91 rows=1 width=1) (actual time=25883.233..25883.235 rows=1 loops=1)
  Output: (NOT (InitPlan 2).col1)
  InitPlan 2
    ->  Seq Scan on public.task task_1  (cost=0.00..6711290.02 rows=1368708 width=0) (actual time=25883.231..25883.231 rows=0 loops=1)
          Filter: ((SubPlan 1) >= 2)
          Rows Removed by Filter: 4112852
          SubPlan 1
            ->  Aggregate  (cost=1.59..1.60 rows=1 width=8) (actual time=0.006..0.006 rows=1 loops=4112852)
                  Output: count(*)
                  ->  Index Scan using idx_task_poll on public.task  (cost=0.12..1.58 rows=1 width=0) (actual time=0.005..0.005 rows=0 loops=4112852)
                        Index Cond: ((task."is executed by-handler")::text = (task_1."is executed by-handler")::text)
                        Filter: (task."is scheduled with-cron expression" IS NOT NULL)
Planning Time: 0.326 ms
Execution Time: 25883.270 ms

on the other hand w/ the filters present on both sides the query is much faster:

QUERY PLAN
Result  (cost=3.18..3.19 rows=1 width=1) (actual time=5.364..5.365 rows=1 loops=1)
  Output: (NOT (InitPlan 2).col1)
  InitPlan 2
    ->  Index Scan using idx_task_poll on public.task task_1  (cost=0.12..3.18 rows=1 width=0) (actual time=5.362..5.363 rows=0 loops=1)
          Filter: ((task_1."is scheduled with-cron expression" IS NOT NULL) AND ((SubPlan 1) >= 2))
          SubPlan 1
            ->  Aggregate  (cost=1.59..1.60 rows=1 width=8) (never executed)
                  Output: count(*)
                  ->  Index Scan using idx_task_poll on public.task  (cost=0.12..1.58 rows=1 width=0) (never executed)
                        Index Cond: ((task."is executed by-handler")::text = (task_1."is executed by-handler")::text)
                        Filter: (task."is scheduled with-cron expression" IS NOT NULL)
Planning Time: 0.305 ms
Execution Time: 5.393 ms

b) I followed the pattern that we use in all other cases where we mirror the where clauses in both sides.

Necessity: each name (Auth) that is of an application membership role that belongs to an organization, is of at most one application membership role that belongs to the organization.
Rule: It is necessary that each application that owns a release1 that has a status that is equal to "success" and is not invalidated and has a release version, owns at most one release2 that has a status that is equal to "success" and is not invalidated and has a release version that is of the release1.
Rule: It is necessary that each image that is produced by a delta1 that has a status that is equal to "success" and originates from an image1, is produced by at most one delta that originates from the image1 and has a version that is of the delta1 and has a status that is equal to "success".

fwiw, given the benefits of the pattern of having the "where clauses" on both sides & that we use it in all such rules atm, it's the pattern that my partial unique index auto generation PR v is attempting to detect:
balena-io-modules/abstract-sql-compiler#316

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I realized that it's specifically the pattern of each $column that is of ... $table ..., is of ... $table ... that needs to happen, since in this case handler isn't really something that exists in its own right and so not having the that predicate means things get particularly weird with the sql generation

151 changes: 144 additions & 7 deletions test/08-tasks.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { testInit, testDeInit, testLocalServer } from './lib/test-init.js';
import { env } from '@balena/pinejs';
import type Model from '@balena/pinejs/out/tasks/tasks.js';
import { CronExpressionParser } from 'cron-parser';
import { PINE_TEST_SIGNALS, waitFor } from './lib/common.js';
import { assertExists, PINE_TEST_SIGNALS, waitFor } from './lib/common.js';

const actorId = 1;
const fixturesBasePath = import.meta.dirname + '/fixtures/08-tasks/';
Expand Down Expand Up @@ -284,6 +284,28 @@ describe('08 task tests', function () {
).to.be.oneOf(potentialScheduleTimestamps);
});

it('should update the cron expression of pre-existing cron tasks on init, when a different one is provided in tasks.addCronTask()', async () => {
const {
body: [queuedTask],
} = await pineTest
.get({
resource: 'task',
options: {
$select: 'is_scheduled_with__cron_expression',
$filter: {
status: 'queued',
is_executed_by__handler: 'set_device_last_heartbeat',
is_scheduled_with__cron_expression: { $ne: null },
},
},
})
.expect(200);
assertExists(queuedTask);
expect(queuedTask).to.deep.equal({
is_scheduled_with__cron_expression: '0 0 0,3,6,12 * * *',
});
});

it('should not immediately execute tasks scheduled to execute in the future', async () => {
// Create a task to create a new device record in 3s
const name = randomUUID();
Expand Down Expand Up @@ -363,11 +385,126 @@ describe('08 task tests', function () {
expect(finalizedTask?.attempt_count).to.equal(attemptLimit);
});

it('should create new tasks for completed tasks with a cron string', async () => {
const cron = '0 0 10 1 1 *';
it('should not be able to create a second cron task with the same handler [using a POST to task]', async () => {
const cron = '0 0 2,4,6,8,12,14 * * *';
await pineTest
.post({
resource: 'task',
body: {
apikey,
key: randomUUID(),
is_executed_by__handler: 'create_device',
is_executed_with__parameter_set: {
name: randomUUID(),
type: randomUUID(),
},
is_scheduled_with__cron_expression: cron,
},
})
.expect((res) => {
expect(res.statusCode).to.be.oneOf([400, 409]);
if (res.statusCode === 409) {
expect([res.statusCode, res.text]).to.deep.equal([
409,
'"Unique key constraint violated"',
]);
} else {
expect([res.statusCode, res.text]).to.deep.equal([
400,
'"It is necessary that each handler that executes a task that is scheduled with a cron expression and has a status that is equal to \\"queued\\", executes at most one task that is scheduled with a cron expression and has a status that is equal to \\"queued\\"."',
]);
}
});
// TODO: This should become v once we bump to https://github.com/balena-io-modules/abstract-sql-compiler/pull/316
// .expect(
// 400,
// '"It is necessary that each handler that executes a task that is scheduled with a cron expression and has a status that is equal to \\"queued\\", executes at most one task that is scheduled with a cron expression and has a status that is equal to \\"queued\\"."',
// );
});

it(`should be able to stop recurrent tasks by marking it as cancelled `, async () => {
await pineTest
.patch({
resource: 'task',
options: {
$filter: {
status: 'queued',
is_executed_by__handler: 'create_device',
is_scheduled_with__cron_expression: { $ne: null },
},
},
body: {
status: 'cancelled',
},
})
.expect(200);

const {
body: [cancelledTask],
} = await pineTest
.get({
resource: 'task',
options: {
$select: ['id', 'is_scheduled_to_execute_on__time'],
$filter: {
status: 'cancelled',
is_executed_by__handler: 'create_device',
is_scheduled_with__cron_expression: { $ne: null },
},
$orderby: { id: 'desc' },
},
})
.expect(200);
assertExists(cancelledTask);

// Wait unil when the task was supposed to get executed
const scheduledExecutionTime = new Date(
cancelledTask.is_scheduled_to_execute_on__time,
);
await setTimeout(
Math.max(scheduledExecutionTime.getTime() - Date.now(), 0) +
env.tasks.queueIntervalMS,
);

// check that the cancelled task was not exexuted
const { body: refetchedCancelledTask } = await pineTest
.get({
resource: 'task',
id: cancelledTask.id,
options: {
$select: ['status', 'started_on__time'],
},
})
.expect(200);
assertExists(refetchedCancelledTask);
expect(refetchedCancelledTask).to.deep.equal({
status: 'cancelled',
started_on__time: null,
});
// check that no new task was added
const {
body: [newQueuedTask],
} = await pineTest
.get({
resource: 'task',
options: {
$select: ['id', 'is_scheduled_to_execute_on__time'],
$filter: {
id: { $ge: cancelledTask.id },
status: 'queued',
is_executed_by__handler: 'create_device',
is_scheduled_with__cron_expression: { $ne: null },
},
},
})
.expect(200);
expect(newQueuedTask).to.be.undefined;
});

// Test both succeeded and failure cases
for (const status of ['succeeded', 'failed']) {
// Test both succeeded and failure cases
for (const status of ['succeeded', 'failed']) {
it(`should be able to create new tasks with a cron string where there are pre-existing tasks completed with status ${status}`, async () => {
const cron = '0 0 10 1 1 *';
const handler = status === 'succeeded' ? 'create_device' : 'will_fail';
const name = randomUUID();
await createTask(pineTest, apikey, {
Expand Down Expand Up @@ -404,8 +541,8 @@ describe('08 task tests', function () {
queuedTask?.is_scheduled_to_execute_on__time === nextExecutionDate
);
}, env.tasks.queueIntervalMS);
}
});
});
}

it('should not allow tasks with invalid handler params', async () => {
// Handler requires 'name' and 'type' params, but passing 'foo' param
Expand Down
9 changes: 9 additions & 0 deletions test/fixtures/08-tasks/example.sbvr
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ Term: name
Term: note
Concept Type: Text (Type)

Term: last heartbeat
Concept Type: Date Time (Type)

Term: type
Concept Type: Short Text (Type)

Expand All @@ -20,6 +23,12 @@ Fact Type: device has name
Fact Type: device has note
Necessity: each device has at most one note.

Fact Type: device has note
Necessity: each device has at most one note.

Fact type: device has last heartbeat
Necessity: each device has at most one last heartbeat.

Fact Type: device has type
Necessity: each device has exactly one type.

Expand Down
Loading