Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
run: bun install

- name: Build migrations
run: sh ./scripts/build-migrations.sh
run: bash ./scripts/build-migrations.sh

- name: Format
run: bun run oxfmt
Expand Down
2 changes: 2 additions & 0 deletions docs/zensical.toml
Original file line number Diff line number Diff line change
Expand Up @@ -318,11 +318,13 @@ features = [
# - https://zensical.org/docs/setup/colors/
# ----------------------------------------------------------------------------
[[project.theme.palette]]
media = "(prefers-color-scheme: light)"
scheme = "default"
toggle.icon = "lucide/sun"
toggle.name = "Switch to dark mode"

[[project.theme.palette]]
media = "(prefers-color-scheme: dark)"
scheme = "slate"
toggle.icon = "lucide/moon"
toggle.name = "Switch to light mode"
Expand Down
11 changes: 11 additions & 0 deletions packages/pgconductor-js/src/database-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,18 @@ export class DatabaseClient {
return result[0]?.cancel_execution || false;
}

/**
* Get current time.
* In production: returns system time directly (no DB call).
* In tests: queries database to respect fake_now setting.
*/
async getCurrentTime(options?: QueryMethodOptions): Promise<Date> {
// In production, use system time for performance
if (process.env.NODE_ENV !== "test") {
return new Date();
}

// In tests, query DB to respect fake_now
const result = await this.query(
(sql) =>
sql<{ now: Date }[]>`
Expand Down
9 changes: 3 additions & 6 deletions packages/pgconductor-js/src/task-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -372,14 +372,11 @@ export class TaskContext<
}

/**
* Get current time. In tests, use database time (respects fake_now).
* In production, use system time for performance.
* Get current time.
* DatabaseClient handles test vs production: returns fake time in tests, system time in production.
*/
private async getNow(): Promise<Date> {
if (process.env.NODE_ENV === "test") {
return this.opts.db.getCurrentTime({ signal: this.signal });
}
return new Date();
return this.opts.db.getCurrentTime({ signal: this.signal });
}

/**
Expand Down
40 changes: 26 additions & 14 deletions packages/pgconductor-js/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -396,26 +396,38 @@ export class Worker<
.filter((task) => task.concurrency != null)
.map((task) => task.name);

// Check if any tasks have windows - only then do we need time-based filtering
const tasksWithWindows = allTasks.filter((task) => task.window);

while (!this.signal?.aborted) {
try {
// Filter tasks based on time windows (if any)
const now = new Date();
let disallowedTaskKeys: string[] = [];

function isWithinWindow(task: AnyTask, now: Date): boolean {
if (!task.window) return true;
const [start, end] = task.window;
const currentTime = now.toTimeString().slice(0, 8);
return currentTime >= start && currentTime < end;
}
if (tasksWithWindows.length > 0) {
// db.getCurrentTime() handles test vs production: returns fake time in tests, system time in production
const now = await this.db.getCurrentTime({ signal: this.signal });

const disallowedTaskKeys = allTasks
.filter((task) => !isWithinWindow(task, now))
.map((t) => t.name);
const isWithinWindow = (task: AnyTask, now: Date): boolean => {
if (!task.window) return true;
const [start, end] = task.window;
const currentTime = now.toISOString().slice(11, 19);
return currentTime >= start && currentTime < end;
};

if (disallowedTaskKeys.length === this.tasks.size) {
// skip fetch if no tasks are allowed to run now
await waitFor(this.pollIntervalMs, { signal: this.signal });
continue;
disallowedTaskKeys = tasksWithWindows
.filter((task) => !isWithinWindow(task, now))
.map((t) => t.name);

if (disallowedTaskKeys.length === tasksWithWindows.length) {
// All windowed tasks are outside their windows, skip fetch for those
// But non-windowed tasks can still run
if (tasksWithWindows.length === this.tasks.size) {
// All tasks have windows and all are outside - skip entirely
await waitFor(this.pollIntervalMs, { signal: this.signal });
continue;
}
}
}

const executions = await this.db.getExecutions(
Expand Down
29 changes: 19 additions & 10 deletions packages/pgconductor-js/tests/integration/window-execution.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ describe("Window Execution", () => {
const db = await pool.child();
databases.push(db);

// Set time outside window (18:00, window is 09:00-17:00)
const outsideTime = new Date("2024-01-01T18:00:00Z");
await db.client.setFakeTime({ date: outsideTime });
// Start inside window so task gets fetched
const insideTime = new Date("2024-01-01T10:00:00Z");
await db.client.setFakeTime({ date: insideTime });

const taskDefinitions = defineTask({
name: "window-task",
Expand All @@ -43,15 +43,20 @@ describe("Window Execution", () => {
const conductor = Conductor.create({
sql: db.sql,
tasks: TaskSchemas.fromSchema([taskDefinitions]),
context: {},
// Pass db client so task can advance time
context: { dbClient: db.client },
});

const windowTask = conductor.createTask(
{ name: "window-task", window: ["09:00", "17:00"] },
{ invocable: true },
async (event, ctx) => {
if (event.name === "pgconductor.invoke") {
// This step will trigger release (we're outside window)
// Advance time to outside window BEFORE calling ctx.step()
// This simulates the scenario where time passes during execution
await ctx.dbClient.setFakeTime({ date: new Date("2024-01-01T18:00:00Z") });

// This step will trigger release (we're now outside window)
const result = await ctx.step("step1", () => {
return stepFn(event.payload.value);
});
Expand Down Expand Up @@ -99,9 +104,9 @@ describe("Window Execution", () => {
const db = await pool.child();
databases.push(db);

// Set time outside window
const outsideTime = new Date("2024-01-01T17:30:00Z");
await db.client.setFakeTime({ date: outsideTime });
// Start inside window so task gets fetched
const insideTime = new Date("2024-01-01T10:00:00Z");
await db.client.setFakeTime({ date: insideTime });

const taskDefinitions = defineTask({
name: "checkpoint-task",
Expand All @@ -115,7 +120,8 @@ describe("Window Execution", () => {
const conductor = Conductor.create({
sql: db.sql,
tasks: TaskSchemas.fromSchema([taskDefinitions]),
context: {},
// Pass db client so task can advance time
context: { dbClient: db.client },
});

const checkpointTask = conductor.createTask(
Expand All @@ -125,7 +131,10 @@ describe("Window Execution", () => {
if (event.name === "pgconductor.invoke") {
beforeCheckpoint();

// Checkpoint should trigger release (we're outside window)
// Advance time to outside window BEFORE calling ctx.checkpoint()
await ctx.dbClient.setFakeTime({ date: new Date("2024-01-01T17:30:00Z") });

// Checkpoint should trigger release (we're now outside window)
await ctx.checkpoint();

// This should not execute
Expand Down