From a132a08fc083f87de4e9ccdc0f03b0962c3b5ac9 Mon Sep 17 00:00:00 2001 From: Thodoris Greasidis Date: Fri, 23 Jan 2026 10:54:37 +0200 Subject: [PATCH] Add the addCronTask helper to make registering cron tasks easier Change-type: minor See: https://balena.fibery.io/Work/Project/961 --- src/tasks/index.ts | 84 +++++++++++++ src/tasks/tasks.sbvr | 4 +- test/08-tasks.test.ts | 151 ++++++++++++++++++++++-- test/fixtures/08-tasks/example.sbvr | 9 ++ test/fixtures/08-tasks/task-handlers.ts | 93 ++++++++++++++- test/lib/pine-in-process.ts | 2 +- 6 files changed, 332 insertions(+), 11 deletions(-) diff --git a/src/tasks/index.ts b/src/tasks/index.ts index 3fdfde2ec..8641cdc34 100644 --- a/src/tasks/index.ts +++ b/src/tasks/index.ts @@ -2,6 +2,7 @@ import type { Schema } from 'ajv'; import { type CronExpression, CronExpressionParser } from 'cron-parser'; import { tasks as tasksEnv } from '../config-loader/env.js'; import { addPureHook } from '../sbvr-api/hooks.js'; +import * as permissions from '../sbvr-api/permissions.js'; import * as sbvrUtils from '../sbvr-api/sbvr-utils.js'; import type { ConfigLoader } from '../server-glue/module.js'; import { ajv, apiRoot } from './common.js'; @@ -11,6 +12,7 @@ import type TasksModel from './tasks.js'; import type { Task } from './tasks.js'; import type { FromSchema } from 'json-schema-to-ts'; import { importSBVR } from '../server-glue/sbvr-loader.js'; +import { BadRequestError, ConflictError } from '../sbvr-api/errors.js'; export type * from './tasks.js'; @@ -23,6 +25,13 @@ CREATE INDEX IF NOT EXISTS idx_task_poll ON task USING btree ( "is scheduled to execute on-time" ASC, "id" ASC ) WHERE status = 'queued'; + +-- TODO: Remove this once pinejs is able to auto generate partial unique indexes from rules. +-- It is necessary that each handler that executes a task that is scheduled with a cron expression and has a status that is equal to "queued", executes at most one task that is scheduled with a cron expression and has a status that is equal to "queued". +CREATE UNIQUE INDEX IF NOT EXISTS "task$/Mt7Ad3mHEm0JFpuaX1BioDwNSWTgsEFOG1igq8EIrk=" +ON "task" ("is executed by-handler") +WHERE ("is scheduled with-cron expression" IS NOT NULL +AND 'queued' = "status"); `; declare module '../sbvr-api/sbvr-utils.js' { @@ -164,6 +173,19 @@ export const config: ConfigLoader.Config = { break; } }, + '23.4.0-unique-cron-tasks': async (tx, { db }) => { + switch (db.engine) { + // No need to migrate anything other than postgres + case 'postgres': + await tx.executeSql(`\ + -- It is necessary that each handler that executes a task that is scheduled with a cron expression and has a status that is equal to "queued", executes at most one task that is scheduled with a cron expression and has a status that is equal to "queued". + CREATE UNIQUE INDEX IF NOT EXISTS "task$/Mt7Ad3mHEm0JFpuaX1BioDwNSWTgsEFOG1igq8EIrk=" + ON "task" ("is executed by-handler") + WHERE ("is scheduled with-cron expression" IS NOT NULL + AND 'queued' = "status");`); + break; + } + }, }, }, ], @@ -202,3 +224,65 @@ export function addTaskHandler( validate: schema != null ? ajv.compile(schema) : undefined, }); } + +// Register a cron task and its handler +export async function addCronTask( + name: string, + cron: string, + fn: TaskHandler< + NonNullable + >['fn'], +): Promise { + addTaskHandler(name, fn); + + const client = sbvrUtils.api[apiRoot]; + try { + await client.post({ + resource: 'task', + passthrough: { + req: permissions.root, + }, + options: { + returnResource: false, + }, + body: { + is_executed_by__handler: name, + is_scheduled_with__cron_expression: cron, + status: 'queued', + }, + }); + } catch (err) { + if ( + // TODO: Remove the ConflictError one we bump to https://github.com/balena-io-modules/abstract-sql-compiler/pull/316 + // since then the rule will throw a proper BadRequestError instead of the generic ConflictError that's atm thrown + // from the partial unique index. + (err instanceof ConflictError && + err.message === 'Unique key constraint violated') || + (err instanceof BadRequestError && + err.message === + 'It is necessary that each handler that executes a task that is scheduled with a cron expression and has a status that is equal to "queued", executes at most one task that is scheduled with a cron expression and has a status that is equal to "queued".') + ) { + await client.patch({ + resource: 'task', + passthrough: { + req: permissions.root, + }, + options: { + $filter: { + is_executed_by__handler: name, + status: 'queued', + $and: [ + { is_scheduled_with__cron_expression: { $ne: null } }, + { is_scheduled_with__cron_expression: { $ne: cron } }, + ], + }, + }, + body: { + is_scheduled_with__cron_expression: cron, + }, + }); + } else { + throw err; + } + } +} diff --git a/src/tasks/tasks.sbvr b/src/tasks/tasks.sbvr index 743e31911..ab7623271 100644 --- a/src/tasks/tasks.sbvr +++ b/src/tasks/tasks.sbvr @@ -1,7 +1,7 @@ Vocabulary: tasks Term: id - Concept Type: Big Serial (Type) + Concept Type: Big Serial (Type) Term: actor Concept Type: Integer (Type) Term: attempt count @@ -31,6 +31,7 @@ Fact type: task has key Fact type: task is created by actor Necessity: each task is created by exactly one actor Fact type: task is executed by handler + Synonymous Form: handler executes task Necessity: each task is executed by exactly one handler Fact type: task is executed with parameter set Necessity: each task is executed with at most one parameter set @@ -53,3 +54,4 @@ Fact type: task has attempt limit Necessity: each task has exactly one attempt limit Necessity: each task has an attempt limit that is greater than or equal to 1 +Rule: It is necessary that each handler that executes a task that is scheduled with a cron expression and has a status that is equal to "queued", executes at most one task that is scheduled with a cron expression and has a status that is equal to "queued". diff --git a/test/08-tasks.test.ts b/test/08-tasks.test.ts index 8ee74c3f7..9e088ea4e 100644 --- a/test/08-tasks.test.ts +++ b/test/08-tasks.test.ts @@ -7,7 +7,7 @@ import { testInit, testDeInit, testLocalServer } from './lib/test-init.js'; import { env } from '@balena/pinejs'; import type Model from '@balena/pinejs/out/tasks/tasks.js'; import { CronExpressionParser } from 'cron-parser'; -import { PINE_TEST_SIGNALS, waitFor } from './lib/common.js'; +import { assertExists, PINE_TEST_SIGNALS, waitFor } from './lib/common.js'; const actorId = 1; const fixturesBasePath = import.meta.dirname + '/fixtures/08-tasks/'; @@ -284,6 +284,28 @@ describe('08 task tests', function () { ).to.be.oneOf(potentialScheduleTimestamps); }); + it('should update the cron expression of pre-existing cron tasks on init, when a different one is provided in tasks.addCronTask()', async () => { + const { + body: [queuedTask], + } = await pineTest + .get({ + resource: 'task', + options: { + $select: 'is_scheduled_with__cron_expression', + $filter: { + status: 'queued', + is_executed_by__handler: 'set_device_last_heartbeat', + is_scheduled_with__cron_expression: { $ne: null }, + }, + }, + }) + .expect(200); + assertExists(queuedTask); + expect(queuedTask).to.deep.equal({ + is_scheduled_with__cron_expression: '0 0 0,3,6,12 * * *', + }); + }); + it('should not immediately execute tasks scheduled to execute in the future', async () => { // Create a task to create a new device record in 3s const name = randomUUID(); @@ -363,11 +385,126 @@ describe('08 task tests', function () { expect(finalizedTask?.attempt_count).to.equal(attemptLimit); }); - it('should create new tasks for completed tasks with a cron string', async () => { - const cron = '0 0 10 1 1 *'; + it('should not be able to create a second cron task with the same handler [using a POST to task]', async () => { + const cron = '0 0 2,4,6,8,12,14 * * *'; + await pineTest + .post({ + resource: 'task', + body: { + apikey, + key: randomUUID(), + is_executed_by__handler: 'create_device', + is_executed_with__parameter_set: { + name: randomUUID(), + type: randomUUID(), + }, + is_scheduled_with__cron_expression: cron, + }, + }) + .expect((res) => { + expect(res.statusCode).to.be.oneOf([400, 409]); + if (res.statusCode === 409) { + expect([res.statusCode, res.text]).to.deep.equal([ + 409, + '"Unique key constraint violated"', + ]); + } else { + expect([res.statusCode, res.text]).to.deep.equal([ + 400, + '"It is necessary that each handler that executes a task that is scheduled with a cron expression and has a status that is equal to \\"queued\\", executes at most one task that is scheduled with a cron expression and has a status that is equal to \\"queued\\"."', + ]); + } + }); + // TODO: This should become v once we bump to https://github.com/balena-io-modules/abstract-sql-compiler/pull/316 + // .expect( + // 400, + // '"It is necessary that each handler that executes a task that is scheduled with a cron expression and has a status that is equal to \\"queued\\", executes at most one task that is scheduled with a cron expression and has a status that is equal to \\"queued\\"."', + // ); + }); + + it(`should be able to stop recurrent tasks by marking it as cancelled `, async () => { + await pineTest + .patch({ + resource: 'task', + options: { + $filter: { + status: 'queued', + is_executed_by__handler: 'create_device', + is_scheduled_with__cron_expression: { $ne: null }, + }, + }, + body: { + status: 'cancelled', + }, + }) + .expect(200); + + const { + body: [cancelledTask], + } = await pineTest + .get({ + resource: 'task', + options: { + $select: ['id', 'is_scheduled_to_execute_on__time'], + $filter: { + status: 'cancelled', + is_executed_by__handler: 'create_device', + is_scheduled_with__cron_expression: { $ne: null }, + }, + $orderby: { id: 'desc' }, + }, + }) + .expect(200); + assertExists(cancelledTask); + + // Wait unil when the task was supposed to get executed + const scheduledExecutionTime = new Date( + cancelledTask.is_scheduled_to_execute_on__time, + ); + await setTimeout( + Math.max(scheduledExecutionTime.getTime() - Date.now(), 0) + + env.tasks.queueIntervalMS, + ); + + // check that the cancelled task was not exexuted + const { body: refetchedCancelledTask } = await pineTest + .get({ + resource: 'task', + id: cancelledTask.id, + options: { + $select: ['status', 'started_on__time'], + }, + }) + .expect(200); + assertExists(refetchedCancelledTask); + expect(refetchedCancelledTask).to.deep.equal({ + status: 'cancelled', + started_on__time: null, + }); + // check that no new task was added + const { + body: [newQueuedTask], + } = await pineTest + .get({ + resource: 'task', + options: { + $select: ['id', 'is_scheduled_to_execute_on__time'], + $filter: { + id: { $ge: cancelledTask.id }, + status: 'queued', + is_executed_by__handler: 'create_device', + is_scheduled_with__cron_expression: { $ne: null }, + }, + }, + }) + .expect(200); + expect(newQueuedTask).to.be.undefined; + }); - // Test both succeeded and failure cases - for (const status of ['succeeded', 'failed']) { + // Test both succeeded and failure cases + for (const status of ['succeeded', 'failed']) { + it(`should be able to create new tasks with a cron string where there are pre-existing tasks completed with status ${status}`, async () => { + const cron = '0 0 10 1 1 *'; const handler = status === 'succeeded' ? 'create_device' : 'will_fail'; const name = randomUUID(); await createTask(pineTest, apikey, { @@ -404,8 +541,8 @@ describe('08 task tests', function () { queuedTask?.is_scheduled_to_execute_on__time === nextExecutionDate ); }, env.tasks.queueIntervalMS); - } - }); + }); + } it('should not allow tasks with invalid handler params', async () => { // Handler requires 'name' and 'type' params, but passing 'foo' param diff --git a/test/fixtures/08-tasks/example.sbvr b/test/fixtures/08-tasks/example.sbvr index 0029afd5c..838b3a77d 100644 --- a/test/fixtures/08-tasks/example.sbvr +++ b/test/fixtures/08-tasks/example.sbvr @@ -6,6 +6,9 @@ Term: name Term: note Concept Type: Text (Type) +Term: last heartbeat + Concept Type: Date Time (Type) + Term: type Concept Type: Short Text (Type) @@ -20,6 +23,12 @@ Fact Type: device has name Fact Type: device has note Necessity: each device has at most one note. +Fact Type: device has note + Necessity: each device has at most one note. + +Fact type: device has last heartbeat + Necessity: each device has at most one last heartbeat. + Fact Type: device has type Necessity: each device has exactly one type. diff --git a/test/fixtures/08-tasks/task-handlers.ts b/test/fixtures/08-tasks/task-handlers.ts index a3104be51..1a752f8e2 100644 --- a/test/fixtures/08-tasks/task-handlers.ts +++ b/test/fixtures/08-tasks/task-handlers.ts @@ -1,5 +1,5 @@ import type { FromSchema } from 'json-schema-to-ts'; -import { sbvrUtils, tasks } from '@balena/pinejs'; +import { sbvrUtils, tasks, dbModule } from '@balena/pinejs'; // Define JSON schema for accepted parameters const createDeviceParamsSchema = { @@ -33,7 +33,7 @@ export type IncrementDeviceCountParams = FromSchema< typeof incrementDeviceCountParamsSchema >; -export const initTaskHandlers = () => { +export const initTaskHandlers = async () => { tasks.addTaskHandler( 'create_device', async (options) => { @@ -84,5 +84,94 @@ export const initTaskHandlers = () => { incrementDeviceCountParamsSchema, ); + await tasks.addCronTask( + 'set_device_note', + '0 0 1,3,5,7 * * *', + async (options) => { + const newNote = 'provisioning done'; + try { + await options.api.patch({ + apiPrefix: '/example/', + resource: 'device', + options: { + $filter: { + note: { $ne: newNote }, + }, + }, + body: { + note: newNote, + }, + }); + return { + status: 'succeeded', + }; + } catch (err: any) { + return { + status: 'failed', + error: err.message, + }; + } + }, + ); + + // manually add a scheduled task ahead of time, to confirm that during pine init, addCronTask() will update its cron expression. + const oldHeartbeatTaskCronExpression = '0 0 2,4,6,8 * * *'; + try { + await sbvrUtils.db.executeSql( + ` + INSERT INTO task ("key", "is created by-actor", "is executed by-handler", "is executed with-parameter set", "is scheduled with-cron expression", "is scheduled to execute on-time", status, "attempt count", "attempt limit") + VALUES (NULL, 0, 'set_device_last_heartbeat', NULL, '${oldHeartbeatTaskCronExpression}', '2100-02-03 03:00:00+02', 'queued', 0, 1); + `, + [], + ); + } catch (err) { + if ( + err instanceof dbModule.UniqueConstraintError && + err.message === + 'duplicate key value violates unique constraint "task$/Mt7Ad3mHEm0JFpuaX1BioDwNSWTgsEFOG1igq8EIrk="' + ) { + // that's fine, a different pine instance has already created this, just make sure the expected cron expression is set + await sbvrUtils.db.executeSql( + ` + UPDATE task + SET "is scheduled with-cron expression" = '${oldHeartbeatTaskCronExpression}' + WHERE "is executed by-handler" = 'set_device_last_heartbeat' + AND "status" = 'queued' + `, + [], + ); + } else { + throw err; + } + } + await tasks.addCronTask( + 'set_device_last_heartbeat', + '0 0 0,3,6,12 * * *', + async (options) => { + try { + // Fake heartbeat to the device with the oldest one + await options.api.patch({ + apiPrefix: '/example/', + resource: 'device', + options: { + $top: 1, + $orderby: { last_heartbeat: 'desc' }, + }, + body: { + last_heartbeat: new Date(), + }, + }); + return { + status: 'succeeded', + }; + } catch (err: any) { + return { + status: 'failed', + error: err.message, + }; + } + }, + ); + tasks.worker?.start(); }; diff --git a/test/lib/pine-in-process.ts b/test/lib/pine-in-process.ts index e0a9555d1..54cf1caa6 100644 --- a/test/lib/pine-in-process.ts +++ b/test/lib/pine-in-process.ts @@ -85,7 +85,7 @@ async function runApp(processArgs: PineTestOptions) { // load task handlers if (processArgs.taskHandlersPath) { const { initTaskHandlers } = await import(processArgs.taskHandlersPath); - initTaskHandlers(); + await initTaskHandlers(); } if (processArgs.routesPath) {