Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/task-execution/concurrency.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ Postgres Conductor uses a slot-based system to enforce concurrency limits:

This happens entirely in Postgres - no external coordination needed.

> [!WARNING]
> Setting concurrency on any task in a queue reduces throughput by up to 50% for the entire queue, regardless of how many tasks have concurrency limits.

## Concurrency vs Worker Concurrency

**Task-level concurrency** (this page):
Expand Down
6 changes: 3 additions & 3 deletions packages/pgconductor-js/src/select-columns.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,9 @@ type ValidatePart<Part extends string, Obj> =
: // unquoted identifier
IsValidUnquoted<P> extends false
? `[Column Selection Error]: Invalid identifier "${P}". Must start with a letter or underscore.`
: Lowercase<P> extends keyof Obj
? Lowercase<P>
: `[Column Selection Error]: Column "${Lowercase<P>}" does not exist.`
: P extends keyof Obj
? P
: `[Column Selection Error]: Column "${P}" does not exist.`
: never;

type ValidateParts<Parts extends readonly string[], Obj> = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ describe("Cron Scheduling", () => {

const runsBeforeUnschedule = targetExecutions.mock.calls.length;
await conductor.invoke({ name: "dynamic-unscheduler" }, {});
await waitFor(200);
await waitFor(2000);

const futureSchedules = await db.sql<Array<{ id: string }>>`
SELECT id
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
import { z } from "zod";
import { test, expect, describe, beforeAll, afterAll, afterEach, mock } from "bun:test";
import { Conductor } from "../../src/conductor";
import { Orchestrator } from "../../src/orchestrator";
import { defineTask } from "../../src/task-definition";
import { defineEvent } from "../../src/event-definition";
import { TaskSchemas, EventSchemas } from "../../src/schemas";
import { TestDatabasePool } from "../fixtures/test-database";
import type { TestDatabase } from "../fixtures/test-database";

describe("Field Selection - Type Safety & Runtime", () => {
let pool: TestDatabasePool;
const databases: TestDatabase[] = [];

beforeAll(async () => {
pool = await TestDatabasePool.create();
}, 60000);

afterEach(async () => {
await Promise.all(databases.map((db) => db.destroy()));
databases.length = 0;
});

afterAll(async () => {
await pool?.destroy();
});

test("field selection preserves JSON types (numbers, booleans, objects, arrays)", async () => {
const db = await pool.child();
databases.push(db);

const dataEvent = defineEvent({
name: "data.test",
payload: z.object({
stringField: z.string(),
numberField: z.number(),
booleanField: z.boolean(),
objectField: z.object({ nested: z.string() }),
arrayField: z.array(z.string()),
// Extra fields that should be filtered out
extraString: z.string(),
extraNumber: z.number(),
}),
});

const taskDef = defineTask({
name: "on-data-test",
payload: z.object({}),
});

const taskFn = mock(async (event) => {
// Type-level: TypeScript should know these fields exist with correct types
const str: string = event.payload.stringField;
const num: number = event.payload.numberField;
const bool: boolean = event.payload.booleanField;
const obj: { nested: string } = event.payload.objectField;
const arr: string[] = event.payload.arrayField;

// Runtime: Verify correct values and types
expect(event.payload.stringField).toBe("test-string");
expect(event.payload.numberField).toBe(42);
expect(typeof event.payload.numberField).toBe("number");
expect(event.payload.booleanField).toBe(true);
expect(typeof event.payload.booleanField).toBe("boolean");
expect(event.payload.objectField).toEqual({ nested: "value" });
expect(typeof event.payload.objectField).toBe("object");
expect(event.payload.arrayField).toEqual(["a", "b", "c"]);
expect(Array.isArray(event.payload.arrayField)).toBe(true);

// Non-selected fields should not exist at runtime
expect(event.payload.extraString).toBeUndefined();
expect(event.payload.extraNumber).toBeUndefined();
});

const conductor = Conductor.create({
sql: db.sql,
tasks: TaskSchemas.fromSchema([taskDef]),
events: EventSchemas.fromSchema([dataEvent]),
context: {},
});

const orchestrator = Orchestrator.create({
conductor,
tasks: [
conductor.createTask(
{ name: "on-data-test" },
{
event: "data.test",
fields: "stringField,numberField,booleanField,objectField,arrayField",
},
taskFn,
),
],
defaultWorker: { pollIntervalMs: 50, flushIntervalMs: 50 },
});

await orchestrator.start();

// Emit event with all fields
await conductor.emit("data.test", {
stringField: "test-string",
numberField: 42,
booleanField: true,
objectField: { nested: "value" },
arrayField: ["a", "b", "c"],
extraString: "should-be-filtered",
extraNumber: 999,
});

// Wait for task to execute
await new Promise((r) => setTimeout(r, 300));

expect(taskFn).toHaveBeenCalledTimes(1);

await orchestrator.stop();
}, 30000);

test("field selection with camelCase field names", async () => {
const db = await pool.child();
databases.push(db);

const userEvent = defineEvent({
name: "user.signup",
payload: z.object({
userId: z.string(),
emailAddress: z.string(),
firstName: z.string(),
lastName: z.string(),
accountType: z.string(),
}),
});

const taskDef = defineTask({
name: "on-user-signup",
payload: z.object({}),
});

const taskFn = mock(async (event) => {
// Type-level: Selected fields with correct camelCase
const userId: string = event.payload.userId;
const emailAddress: string = event.payload.emailAddress;

// Runtime: Verify camelCase preserved
expect(event.payload.userId).toBe("user-123");
expect(event.payload.emailAddress).toBe("test@example.com");
expect(event.payload.firstName).toBeUndefined();
expect(event.payload.lastName).toBeUndefined();
expect(event.payload.accountType).toBeUndefined();
});

const conductor = Conductor.create({
sql: db.sql,
tasks: TaskSchemas.fromSchema([taskDef]),
events: EventSchemas.fromSchema([userEvent]),
context: {},
});

const orchestrator = Orchestrator.create({
conductor,
tasks: [
conductor.createTask(
{ name: "on-user-signup" },
{ event: "user.signup", fields: "userId,emailAddress" },
taskFn,
),
],
defaultWorker: { pollIntervalMs: 50, flushIntervalMs: 50 },
});

await orchestrator.start();

await conductor.emit("user.signup", {
userId: "user-123",
emailAddress: "test@example.com",
firstName: "John",
lastName: "Doe",
accountType: "premium",
});

await new Promise((r) => setTimeout(r, 300));

expect(taskFn).toHaveBeenCalledTimes(1);

await orchestrator.stop();
}, 30000);

test("no field selection - all fields available", async () => {
const db = await pool.child();
databases.push(db);

const fullEvent = defineEvent({
name: "full.event",
payload: z.object({
field1: z.string(),
field2: z.number(),
field3: z.boolean(),
}),
});

const taskDef = defineTask({
name: "on-full-event",
payload: z.object({}),
});

const taskFn = mock(async (event) => {
// Type-level: All fields should be available
const f1: string = event.payload.field1;
const f2: number = event.payload.field2;
const f3: boolean = event.payload.field3;

// Runtime: All fields should exist
expect(event.payload.field1).toBe("value1");
expect(event.payload.field2).toBe(123);
expect(event.payload.field3).toBe(true);
});

const conductor = Conductor.create({
sql: db.sql,
tasks: TaskSchemas.fromSchema([taskDef]),
events: EventSchemas.fromSchema([fullEvent]),
context: {},
});

const orchestrator = Orchestrator.create({
conductor,
tasks: [
// No fields parameter - should get all fields
conductor.createTask({ name: "on-full-event" }, { event: "full.event" }, taskFn),
],
defaultWorker: { pollIntervalMs: 50, flushIntervalMs: 50 },
});

await orchestrator.start();

await conductor.emit("full.event", {
field1: "value1",
field2: 123,
field3: true,
});

await new Promise((r) => setTimeout(r, 300));

expect(taskFn).toHaveBeenCalledTimes(1);

await orchestrator.stop();
}, 30000);
});
67 changes: 52 additions & 15 deletions packages/pgconductor-js/tests/unit/event-trigger-types.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,21 @@ describe("event triggers", () => {
// Task triggered by database insert - not invocable
conductor.createTask(
{ name: "on-contact-insert" },
{ schema: "public", table: "contact", operation: "insert", columns: "id,email,first_name" },
{
schema: "public",
table: "contact",
operation: "insert",
columns: "id,email,first_name",
},
async (event) => {
// Event should have database event payload with schema.table.op format
expectTypeOf(event.name).toEqualTypeOf<"public.contact.insert">();
expectTypeOf(event.payload.tg_op).toEqualTypeOf<"INSERT">();
expectTypeOf(event.payload.old).toEqualTypeOf<null>();
// new should have selected contact columns
if (event.payload.new) {
expectTypeOf(event.payload.new.id).toEqualTypeOf<string>();
expectTypeOf(event.payload.new.email).toEqualTypeOf<string | null>();
expectTypeOf(event.payload.new.first_name).toEqualTypeOf<string>();
}
expectTypeOf(event.payload.new.id).toEqualTypeOf<string>();
expectTypeOf(event.payload.new.email).toEqualTypeOf<string | null>();
expectTypeOf(event.payload.new.first_name).toEqualTypeOf<string>();
},
);
});
Expand Down Expand Up @@ -119,7 +122,12 @@ describe("event triggers", () => {
{ name: "cron-and-event" },
[
{ cron: "0 * * * *", name: "hourly" },
{ schema: "public", table: "contact", operation: "update", columns: "id,email" },
{
schema: "public",
table: "contact",
operation: "update",
columns: "id,email",
},
],
async (event) => {
// Event should be union of cron and database event
Expand Down Expand Up @@ -158,15 +166,29 @@ describe("event triggers", () => {
});

// Task with field selection - should only receive selected fields
// Note: Field selection type inference is not yet fully implemented at compile time
// At runtime, only selected fields will be present in the payload
conductor.createTask(
{ name: "on-user-created-fields" },
{ event: "user.created", fields: "userId,email" },
async (event) => {
// At runtime, payload will only have selected fields (userId, email)
// Type-level inference for field selection is not yet implemented
// so event.payload type will show all fields from the event definition
// Event name should be correct
expectTypeOf(event.name).toEqualTypeOf<"user.created">();

// Payload should only have selected fields
expectTypeOf(event.payload).toEqualTypeOf<{
userId: string;
email: string;
}>();

// Selected fields should be accessible
expectTypeOf(event.payload.userId).toEqualTypeOf<string>();
expectTypeOf(event.payload.email).toEqualTypeOf<string>();

// Non-selected fields should cause type errors
// @ts-expect-error - name was not selected
event.payload.name;

// @ts-expect-error - plan was not selected
event.payload.plan;
},
);
});
Expand Down Expand Up @@ -218,7 +240,12 @@ describe("event triggers", () => {
// Task with column selection - should only receive selected columns
conductor.createTask(
{ name: "on-contact-update-columns" },
{ schema: "public", table: "contact", operation: "update", columns: "id,email" },
{
schema: "public",
table: "contact",
operation: "update",
columns: "id,email",
},
async (event) => {
expectTypeOf(event.name).toEqualTypeOf<"public.contact.update">();
expectTypeOf(event.payload.tg_op).toEqualTypeOf<"UPDATE">();
Expand Down Expand Up @@ -295,7 +322,12 @@ describe("event triggers", () => {

conductor.createTask(
{ name: "on-contact-delete" },
{ schema: "public", table: "contact", operation: "delete", columns: "id" },
{
schema: "public",
table: "contact",
operation: "delete",
columns: "id",
},
async (event) => {
expectTypeOf(event.payload.tg_op).toEqualTypeOf<"DELETE">();
// DELETE has OLD but not NEW
Expand All @@ -322,7 +354,12 @@ describe("event triggers", () => {

conductor.createTask(
{ name: "on-contact-update" },
{ schema: "public", table: "contact", operation: "update", columns: "id" },
{
schema: "public",
table: "contact",
operation: "update",
columns: "id",
},
async (event) => {
expectTypeOf(event.payload.tg_op).toEqualTypeOf<"UPDATE">();
// UPDATE has both OLD and NEW
Expand Down
Loading