diff --git a/.gitignore b/.gitignore index 7919197..ccdf54b 100644 --- a/.gitignore +++ b/.gitignore @@ -175,3 +175,4 @@ target docs/site/ docs/.venv/ +.review/ diff --git a/CLAUDE.md b/CLAUDE.md index 295ee63..d395cdb 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -113,11 +113,16 @@ test("example", async () => { ### Modifying Migrations -1. Edit `migrations/0000000001_setup.sql` directly (in-place) +**IMPORTANT**: Never edit existing migrations that may have been applied to production databases. Always create new migration files for schema changes. + +1. Create a new migration file with the next sequence number (e.g., `0000000002_feature_name.sql`) 2. Run `just build-migrations` to regenerate types 3. Run tests to verify changes -**Note**: This project is under active development. All SQL changes should be made **in-place** by editing the existing migration file, not by creating new migration files. +Migration files: +- `0000000001_setup.sql` - Core schema, tables, SQL functions (DO NOT EDIT) +- `0000000002_events.sql` - Event subscriptions and triggers (DO NOT EDIT) +- `0000000003_trace_context.sql` - Adds trace_context for OpenTelemetry The migration file contains: - Table schemas (`tasks`, `executions`, `steps`, `failed_executions`, `test_config`) diff --git a/bun.lock b/bun.lock index 9215bed..3c44ad3 100644 --- a/bun.lock +++ b/bun.lock @@ -32,6 +32,30 @@ "typescript": "catalog:", }, }, + "packages/pgconductor-js-instrumentation": { + "name": "@pgconductor/js-instrumentation", + "version": "0.1.0", + "dependencies": { + "@opentelemetry/api": "^1.9.0", + "@opentelemetry/instrumentation": "^0.57.0", + "@opentelemetry/semantic-conventions": "^1.30.0", + }, + "devDependencies": { + "@opentelemetry/sdk-trace-base": "^1.30.0", + "@opentelemetry/sdk-trace-node": "^1.30.0", + "@testcontainers/postgresql": "catalog:", + "@types/bun": "catalog:", + "pgconductor-js": "workspace:*", + "postgres": "catalog:", + "zod": "catalog:", + }, + "peerDependencies": { + "@opentelemetry/api": "^1.9.0", + "@opentelemetry/instrumentation": "^0.57.0", + "pgconductor-js": ">0.1.0", + "typescript": "catalog:", + }, + }, }, "catalog": { "@standard-schema/spec": "1.0.0", @@ -58,6 +82,28 @@ "@js-sdsl/ordered-map": ["@js-sdsl/ordered-map@4.4.2", "", {}, "sha512-iUKgm52T8HOE/makSxjqoWhe95ZJA1/G1sYsGev2JDKUSS14KAgg1LHb+Ba+IPow0xflbnSkOsZcO08C7w1gYw=="], + "@opentelemetry/api": ["@opentelemetry/api@1.9.0", "", {}, "sha512-3giAOQvZiH5F9bMlMiv8+GSPMeqg0dbaeo58/0SlA9sxSqZhnUtxzX9/2FzyhS9sWQf5S0GJE0AKBrFqjpeYcg=="], + + "@opentelemetry/api-logs": ["@opentelemetry/api-logs@0.57.2", "", { "dependencies": { "@opentelemetry/api": "^1.3.0" } }, "sha512-uIX52NnTM0iBh84MShlpouI7UKqkZ7MrUszTmaypHBu4r7NofznSnQRfJ+uUeDtQDj6w8eFGg5KBLDAwAPz1+A=="], + + "@opentelemetry/context-async-hooks": ["@opentelemetry/context-async-hooks@1.30.1", "", { "peerDependencies": { "@opentelemetry/api": ">=1.0.0 <1.10.0" } }, "sha512-s5vvxXPVdjqS3kTLKMeBMvop9hbWkwzBpu+mUO2M7sZtlkyDJGwFe33wRKnbaYDo8ExRVBIIdwIGrqpxHuKttA=="], + + "@opentelemetry/core": ["@opentelemetry/core@1.30.1", "", { "dependencies": { "@opentelemetry/semantic-conventions": "1.28.0" }, "peerDependencies": { "@opentelemetry/api": ">=1.0.0 <1.10.0" } }, "sha512-OOCM2C/QIURhJMuKaekP3TRBxBKxG/TWWA0TL2J6nXUtDnuCtccy49LUJF8xPFXMX+0LMcxFpCo8M9cGY1W6rQ=="], + + "@opentelemetry/instrumentation": ["@opentelemetry/instrumentation@0.57.2", "", { "dependencies": { "@opentelemetry/api-logs": "0.57.2", "@types/shimmer": "^1.2.0", "import-in-the-middle": "^1.8.1", "require-in-the-middle": "^7.1.1", "semver": "^7.5.2", "shimmer": "^1.2.1" }, "peerDependencies": { "@opentelemetry/api": "^1.3.0" } }, "sha512-BdBGhQBh8IjZ2oIIX6F2/Q3LKm/FDDKi6ccYKcBTeilh6SNdNKveDOLk73BkSJjQLJk6qe4Yh+hHw1UPhCDdrg=="], + + "@opentelemetry/propagator-b3": ["@opentelemetry/propagator-b3@1.30.1", "", { "dependencies": { "@opentelemetry/core": "1.30.1" }, "peerDependencies": { "@opentelemetry/api": ">=1.0.0 <1.10.0" } }, "sha512-oATwWWDIJzybAZ4pO76ATN5N6FFbOA1otibAVlS8v90B4S1wClnhRUk7K+2CHAwN1JKYuj4jh/lpCEG5BAqFuQ=="], + + "@opentelemetry/propagator-jaeger": ["@opentelemetry/propagator-jaeger@1.30.1", "", { "dependencies": { "@opentelemetry/core": "1.30.1" }, "peerDependencies": { "@opentelemetry/api": ">=1.0.0 <1.10.0" } }, "sha512-Pj/BfnYEKIOImirH76M4hDaBSx6HyZ2CXUqk+Kj02m6BB80c/yo4BdWkn/1gDFfU+YPY+bPR2U0DKBfdxCKwmg=="], + + "@opentelemetry/resources": ["@opentelemetry/resources@1.30.1", "", { "dependencies": { "@opentelemetry/core": "1.30.1", "@opentelemetry/semantic-conventions": "1.28.0" }, "peerDependencies": { "@opentelemetry/api": ">=1.0.0 <1.10.0" } }, "sha512-5UxZqiAgLYGFjS4s9qm5mBVo433u+dSPUFWVWXmLAD4wB65oMCoXaJP1KJa9DIYYMeHu3z4BZcStG3LC593cWA=="], + + "@opentelemetry/sdk-trace-base": ["@opentelemetry/sdk-trace-base@1.30.1", "", { "dependencies": { "@opentelemetry/core": "1.30.1", "@opentelemetry/resources": "1.30.1", "@opentelemetry/semantic-conventions": "1.28.0" }, "peerDependencies": { "@opentelemetry/api": ">=1.0.0 <1.10.0" } }, "sha512-jVPgBbH1gCy2Lb7X0AVQ8XAfgg0pJ4nvl8/IiQA6nxOsPvS+0zMJaFSs2ltXe0J6C8dqjcnpyqINDJmU30+uOg=="], + + "@opentelemetry/sdk-trace-node": ["@opentelemetry/sdk-trace-node@1.30.1", "", { "dependencies": { "@opentelemetry/context-async-hooks": "1.30.1", "@opentelemetry/core": "1.30.1", "@opentelemetry/propagator-b3": "1.30.1", "@opentelemetry/propagator-jaeger": "1.30.1", "@opentelemetry/sdk-trace-base": "1.30.1", "semver": "^7.5.2" }, "peerDependencies": { "@opentelemetry/api": ">=1.0.0 <1.10.0" } }, "sha512-cBjYOINt1JxXdpw1e5MlHmFRc5fgj4GW/86vsKFxJCJ8AL4PdVtYH41gWwl4qd4uQjqEL1oJVrXkSy5cnduAnQ=="], + + "@opentelemetry/semantic-conventions": ["@opentelemetry/semantic-conventions@1.39.0", "", {}, "sha512-R5R9tb2AXs2IRLNKLBJDynhkfmx7mX0vi8NkhZb3gUkPWHn6HXk5J8iQ/dql0U3ApfWym4kXXmBDRGO+oeOfjg=="], + "@oxfmt/darwin-arm64": ["@oxfmt/darwin-arm64@0.15.0", "", { "os": "darwin", "cpu": "arm64" }, "sha512-M5xiXkqtwG/1yVlNZJaXeFJGs1jVTT7Q6gfdIU3nO8wXIj0lsfcirc55XPObWohA8T73oLh3Cp1oSQluxVXQrQ=="], "@oxfmt/darwin-x64": ["@oxfmt/darwin-x64@0.15.0", "", { "os": "darwin", "cpu": "x64" }, "sha512-HXBZBV1oqmZWcmXXQE+zXNpe8nOXyeY9oLeW6KfflR1HCicR75Tmer6Y5/euVOb/mDfegkkjReRoQnYxJ6CmtQ=="], @@ -102,6 +148,8 @@ "@oxlint/win32-x64": ["@oxlint/win32-x64@1.30.0", "", { "os": "win32", "cpu": "x64" }, "sha512-tbPnJIBUKke9KpceV+DpGyfN3LdhGaEPJHSuD4/mUEwP9Kk6IKSoDNih681RVGhgvaEZg3uHmQr6n9Uh0P3Yrg=="], + "@pgconductor/js-instrumentation": ["@pgconductor/js-instrumentation@workspace:packages/pgconductor-js-instrumentation"], + "@pkgjs/parseargs": ["@pkgjs/parseargs@0.11.0", "", {}, "sha512-+1VkjdD0QBLPodGrJUeqarH8VAIvQODIbwh9XpP5Syisf7YoQgsJKPNFoqqLQlu+VQ/tVSshMR6loPMn8U+dPg=="], "@protobufjs/aspromise": ["@protobufjs/aspromise@1.1.2", "", {}, "sha512-j+gKExEuLmKwvz3OgROXtrJ2UG2x8Ch2YZUxahh+s1F2HZ+wAceUNLkvy6zKCPVRkU++ZWQrdxsUeQXmcg4uoQ=="], @@ -128,7 +176,7 @@ "@testcontainers/postgresql": ["@testcontainers/postgresql@11.8.1", "", { "dependencies": { "testcontainers": "^11.8.1" } }, "sha512-uPTFk9IY5v9Dm6HcVTOSvZQQ+HrdGiwDk+/LDG+v67yD81kgBlYpH730JJhZoO72d1pJHKwTAJ+5WPOvecl1pw=="], - "@types/bun": ["@types/bun@1.3.4", "", { "dependencies": { "bun-types": "1.3.4" } }, "sha512-EEPTKXHP+zKGPkhRLv+HI0UEX8/o+65hqARxLy8Ov5rIxMBPNTjeZww00CIihrIQGEQBYg+0roO5qOnS/7boGA=="], + "@types/bun": ["@types/bun@1.3.7", "", { "dependencies": { "bun-types": "1.3.7" } }, "sha512-lmNuMda+Z9b7tmhA0tohwy8ZWFSnmQm1UDWXtH5r9F7wZCfkeO3Jx7wKQ1EOiKq43yHts7ky6r8SDJQWRNupkA=="], "@types/docker-modem": ["@types/docker-modem@3.0.6", "", { "dependencies": { "@types/node": "*", "@types/ssh2": "*" } }, "sha512-yKpAGEuKRSS8wwx0joknWxsmLha78wNMe9R2S3UNsVOkZded8UqOrV8KoeDXoXsjndxwyF3eIhyClGbO1SEhEg=="], @@ -136,12 +184,18 @@ "@types/node": ["@types/node@24.7.0", "", { "dependencies": { "undici-types": "~7.14.0" } }, "sha512-IbKooQVqUBrlzWTi79E8Fw78l8k1RNtlDDNWsFZs7XonuQSJ8oNYfEeclhprUldXISRMLzBpILuKgPlIxm+/Yw=="], + "@types/shimmer": ["@types/shimmer@1.2.0", "", {}, "sha512-UE7oxhQLLd9gub6JKIAhDq06T0F6FnztwMNRvYgjeQSBeMc1ZG/tA47EwfduvkuQS8apbkM/lpLpWsaCeYsXVg=="], + "@types/ssh2": ["@types/ssh2@0.5.52", "", { "dependencies": { "@types/node": "*", "@types/ssh2-streams": "*" } }, "sha512-lbLLlXxdCZOSJMCInKH2+9V/77ET2J6NPQHpFI0kda61Dd1KglJs+fPQBchizmzYSOJBgdTajhPqBO1xxLywvg=="], "@types/ssh2-streams": ["@types/ssh2-streams@0.1.13", "", { "dependencies": { "@types/node": "*" } }, "sha512-faHyY3brO9oLEA0QlcO8N2wT7R0+1sHWZvQ+y3rMLwdY1ZyS1z0W3t65j9PqT4HmQ6ALzNe7RZlNuCNE0wBSWA=="], "abort-controller": ["abort-controller@3.0.0", "", { "dependencies": { "event-target-shim": "^5.0.0" } }, "sha512-h8lQ8tacZYnR3vNQTgibj+tODHI5/+l06Au2Pcriv/Gmet0eaj4TwWH41sO9wnHDiQsEj19q0drzdWdeAHtweg=="], + "acorn": ["acorn@8.15.0", "", { "bin": { "acorn": "bin/acorn" } }, "sha512-NZyJarBfL7nWwIq+FDL6Zp/yHEhePMNnnJ0y3qfieCrmNvYct8uvtiV41UvlSe6apAfk0fY1FbWx+NwfmpvtTg=="], + + "acorn-import-attributes": ["acorn-import-attributes@1.9.5", "", { "peerDependencies": { "acorn": "^8" } }, "sha512-n02Vykv5uA3eHGM/Z2dQrcD56kL8TyDb2p1+0P83PClMnC/nc+anbQRhIOWnSq4Ke/KvDPrY3C9hDtC/A3eHnQ=="], + "ansi-regex": ["ansi-regex@5.0.1", "", {}, "sha512-quJQXlTSUGL2LH9SUXo8VwsY4soanhgo6LNSm84E1LBcE8s3O0wpdiRzyR9z/ZZJMlMWv37qOOb9pdJlMUEKFQ=="], "ansi-styles": ["ansi-styles@4.3.0", "", { "dependencies": { "color-convert": "^2.0.1" } }, "sha512-zbB9rCJAT1rbjiVDb2hqKFHNYLxgtk8NURxZ3IZwD3F6NtxbXZQCnnSi1Lkx+IDohdPlFp222wVALIheZJQSEg=="], @@ -186,12 +240,14 @@ "buildcheck": ["buildcheck@0.0.6", "", {}, "sha512-8f9ZJCUXyT1M35Jx7MkBgmBMo3oHTTBIPLiY9xyL0pl3T5RwcPEY8cUHr5LBNfu/fk6c2T4DJZuVM/8ZZT2D2A=="], - "bun-types": ["bun-types@1.3.4", "", { "dependencies": { "@types/node": "*" } }, "sha512-5ua817+BZPZOlNaRgGBpZJOSAQ9RQ17pkwPD0yR7CfJg+r8DgIILByFifDTa+IPDDxzf5VNhtNlcKqFzDgJvlQ=="], + "bun-types": ["bun-types@1.3.7", "", { "dependencies": { "@types/node": "*" } }, "sha512-qyschsA03Qz+gou+apt6HNl6HnI+sJJLL4wLDke4iugsE6584CMupOtTY1n+2YC9nGVrEKUlTs99jjRLKgWnjQ=="], "byline": ["byline@5.0.0", "", {}, "sha512-s6webAy+R4SR8XVuJWt2V2rGvhnrhxN+9S15GNuTK3wKPOXFF6RNc+8ug2XhH+2s4f+uudG4kUVYmYOQWL2g0Q=="], "chownr": ["chownr@1.1.4", "", {}, "sha512-jJ0bqzaylmJtVnNgzTeSOs8DPavpbYgEr/b0YL8/2GO3xJEhInFmhKMUnEJQjZumK7KXGFhUy89PrsJWlakBVg=="], + "cjs-module-lexer": ["cjs-module-lexer@1.4.3", "", {}, "sha512-9z8TZaGM1pfswYeXrUpzPrkx8UnWYdhJclsiYMm6x/w5+nN+8Tf/LnAgfLGQCm59qAOxU8WwHEq2vNwF6i4j+Q=="], + "cliui": ["cliui@8.0.1", "", { "dependencies": { "string-width": "^4.2.0", "strip-ansi": "^6.0.1", "wrap-ansi": "^7.0.0" } }, "sha512-BSeNnyus75C4//NQ9gQt1/csTXyo/8Sb+afLAkzAptFuMsod9HFokGNudZpi/oQV73hnVK+sR+5PVRMd+Dr7YQ=="], "color-convert": ["color-convert@2.0.1", "", { "dependencies": { "color-name": "~1.1.4" } }, "sha512-RRECPsj7iu/xb5oKYcsFHSppFNnsj/52OVTRKb4zP5onXwVF3zVmmToNcOfGC+CRDpfK/U584fMg38ZHCaElKQ=="], @@ -242,6 +298,8 @@ "fs-constants": ["fs-constants@1.0.0", "", {}, "sha512-y6OAwoSIf7FyjMIv94u+b5rdheZEjzR63GTyZJm5qh4Bi+2YgwLCcI/fPFZkL5PSixOt6ZNKm+w+Hfp/Bciwow=="], + "function-bind": ["function-bind@1.1.2", "", {}, "sha512-7XHNxH7qX9xG5mIwxkhumTox/MIRNcOgDrxWsMt2pAr23WHp6MrRlN7FBSFpCpr+oVO0F744iUgR82nJMfG2SA=="], + "get-caller-file": ["get-caller-file@2.0.5", "", {}, "sha512-DyFP3BM/3YHTQOCUL/w0OZHR0lpKeGrxotcHWcqNEdnltqFwXVfhEBQ94eIo34AfQpo0rGki4cyIiftY06h2Fg=="], "get-port": ["get-port@7.1.0", "", {}, "sha512-QB9NKEeDg3xxVwCCwJQ9+xycaz6pBB6iQ76wiWMl1927n0Kir6alPiP+yuiICLLU4jpMe08dXfpebuQppFA2zw=="], @@ -250,10 +308,16 @@ "graceful-fs": ["graceful-fs@4.2.11", "", {}, "sha512-RbJ5/jmFcNNCcDV5o9eTnBLJ/HszWV0P73bc+Ff4nS/rJj+YaS6IGyiOL0VoBYX+l1Wrl3k63h/KrH+nhJ0XvQ=="], + "hasown": ["hasown@2.0.2", "", { "dependencies": { "function-bind": "^1.1.2" } }, "sha512-0hJU9SCPvmMzIBdZFqNPXWa6dqh7WdH0cII9y+CyS8rG3nL48Bclra9HmKhVVUHyPWNH5Y7xDwAB7bfgSjkUMQ=="], + "ieee754": ["ieee754@1.2.1", "", {}, "sha512-dcyqhDvX1C46lXZcVqCpK+FtMRQVdIMN6/Df5js2zouUsqG7I6sFxitIC+7KYK29KdXOLHdu9zL4sFnoVQnqaA=="], + "import-in-the-middle": ["import-in-the-middle@1.15.0", "", { "dependencies": { "acorn": "^8.14.0", "acorn-import-attributes": "^1.9.5", "cjs-module-lexer": "^1.2.2", "module-details-from-path": "^1.0.3" } }, "sha512-bpQy+CrsRmYmoPMAE/0G33iwRqwW4ouqdRg8jgbH3aKuCtOc8lxgmYXg2dMM92CRiGP660EtBcymH/eVUpCSaA=="], + "inherits": ["inherits@2.0.4", "", {}, "sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ=="], + "is-core-module": ["is-core-module@2.16.1", "", { "dependencies": { "hasown": "^2.0.2" } }, "sha512-UfoeMA6fIJ8wTYFEUjelnaGI67v6+N7qXJEvQuIGa99l4xsCruSYOVSQ0uPANn4dAzm8lkYPaKLrrijLq7x23w=="], + "is-fullwidth-code-point": ["is-fullwidth-code-point@3.0.0", "", {}, "sha512-zymm5+u+sCsSWyD9qNaejV3DFvhCKclKdizYaJUuHA83RLjb7nSuGnddCHGv0hk+KY7BMAlsWeK4Ueg6EV6XQg=="], "is-stream": ["is-stream@2.0.1", "", {}, "sha512-hFoiJiTl63nn+kstHGBtewWSKnQLpyb155KHheA1l39uvtO9nWIop1p3udqPcUd/xbF1VLMO4n7OI6p7RbngDg=="], @@ -284,6 +348,8 @@ "mkdirp-classic": ["mkdirp-classic@0.5.3", "", {}, "sha512-gKLcREMhtuZRwRAfqP3RFW+TK4JqApVBtOIftVgjuABpAtpxhPGaDcfvbhNvD0B8iD1oUr/txX35NjcaY6Ns/A=="], + "module-details-from-path": ["module-details-from-path@1.0.4", "", {}, "sha512-EGWKgxALGMgzvxYF1UyGTy0HXX/2vHLkw6+NvDKW2jypWbHpjQuj4UMcqQWXHERJhVGKikolT06G3bcKe4fi7w=="], + "ms": ["ms@2.1.3", "", {}, "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA=="], "nan": ["nan@2.23.1", "", {}, "sha512-r7bBUGKzlqk8oPBDYxt6Z0aEdF1G1rwlMcLk8LCOMbOzf0mG+JUfUzG4fIMWwHWP0iyaLWEQZJmtB7nOHEm/qw=="], @@ -302,6 +368,8 @@ "path-key": ["path-key@3.1.1", "", {}, "sha512-ojmeN0qd+y0jszEtoY48r0Peq5dwMEkIlCOu6Q5f41lfkswXuKtYrhgoTpLnyIcHm24Uhqx+5Tqm2InSwLhE6Q=="], + "path-parse": ["path-parse@1.0.7", "", {}, "sha512-LDJzPVEEEPR+y48z93A0Ed0yXb8pAByGWo/k5YYdYgpY2/2EsOsksJrq7lOHxryrVOn1ejG6oAp8ahvOIQD8sw=="], + "path-scurry": ["path-scurry@1.11.1", "", { "dependencies": { "lru-cache": "^10.2.0", "minipass": "^5.0.0 || ^6.0.2 || ^7.0.0" } }, "sha512-Xa4Nw17FS9ApQFJ9umLiJS4orGjm7ZzwUrwamcGQuHSzDyth9boKDaycYdDcZDuqYATXw4HFXgaqWTctW/v1HA=="], "pgconductor-js": ["pgconductor-js@workspace:packages/pgconductor-js"], @@ -328,16 +396,24 @@ "require-directory": ["require-directory@2.1.1", "", {}, "sha512-fGxEI7+wsG9xrvdjsrlmL22OMTTiHRwAMroiEeMgq8gzoLC/PQr7RsRDSTLUg/bZAZtF+TVIkHc6/4RIKrui+Q=="], + "require-in-the-middle": ["require-in-the-middle@7.5.2", "", { "dependencies": { "debug": "^4.3.5", "module-details-from-path": "^1.0.3", "resolve": "^1.22.8" } }, "sha512-gAZ+kLqBdHarXB64XpAe2VCjB7rIRv+mU8tfRWziHRJ5umKsIHN2tLLv6EtMw7WCdP19S0ERVMldNvxYCHnhSQ=="], + + "resolve": ["resolve@1.22.11", "", { "dependencies": { "is-core-module": "^2.16.1", "path-parse": "^1.0.7", "supports-preserve-symlinks-flag": "^1.0.0" }, "bin": { "resolve": "bin/resolve" } }, "sha512-RfqAvLnMl313r7c9oclB1HhUEAezcpLjz95wFH4LVuhk9JF/r22qmVP9AMmOU4vMX7Q8pN8jwNg/CSpdFnMjTQ=="], + "retry": ["retry@0.12.0", "", {}, "sha512-9LkiTwjUh6rT555DtE9rTX+BKByPfrMzEAtnlEtdEwr3Nkffwiihqe2bWADg+OQRjt9gl6ICdmB/ZFDCGAtSow=="], "safe-buffer": ["safe-buffer@5.2.1", "", {}, "sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ=="], "safer-buffer": ["safer-buffer@2.1.2", "", {}, "sha512-YZo3K82SD7Riyi0E1EQPojLz7kpepnSQI9IyPbHHg1XXXevb5dJI7tpyN2ADxGcQbHG7vcyRHk0cbwqcQriUtg=="], + "semver": ["semver@7.7.3", "", { "bin": { "semver": "bin/semver.js" } }, "sha512-SdsKMrI9TdgjdweUSR9MweHA4EJ8YxHn8DFaDisvhVlUOe4BF1tLD7GAj0lIqWVl+dPb/rExr0Btby5loQm20Q=="], + "shebang-command": ["shebang-command@2.0.0", "", { "dependencies": { "shebang-regex": "^3.0.0" } }, "sha512-kHxr2zZpYtdmrN1qDjrrX/Z1rR1kG8Dx+gkpK1G4eXmvXswmcE1hTWBWYUzlraYw1/yZp6YuDY77YtvbN0dmDA=="], "shebang-regex": ["shebang-regex@3.0.0", "", {}, "sha512-7++dFhtcx3353uBaq8DDR4NuxBetBzC7ZQOhmTQInHEd6bSrXdiEyzCvG07Z44UYdLShWUyXt5M/yhz8ekcb1A=="], + "shimmer": ["shimmer@1.2.1", "", {}, "sha512-sQTKC1Re/rM6XyFM6fIAGHRPVGvyXfgzIDvzoq608vM+jeyVD0Tu1E6Np0Kc2zAIFWIj963V2800iF/9LPieQw=="], + "signal-exit": ["signal-exit@3.0.7", "", {}, "sha512-wnD2ZE+l+SPC/uoS0vXeE9L1+0wuaMqKlfz9AMUo38JsyLSBWSFcHR1Rri62LZc12vLr1gb3jl7iwQhgwpAbGQ=="], "split-ca": ["split-ca@1.0.1", "", {}, "sha512-Q5thBSxp5t8WPTTJQS59LrGqOZqOsrhDGDVm8azCqIBjSBd7nd9o2PM+mDulQQkh8h//4U6hFZnc/mul8t5pWQ=="], @@ -358,6 +434,8 @@ "strip-ansi-cjs": ["strip-ansi@6.0.1", "", { "dependencies": { "ansi-regex": "^5.0.1" } }, "sha512-Y38VPSHcqkFrCpFnQ9vuSXmquuv5oXOKpGeT6aGrr3o3Gc9AlVa6JBfUSOCnbxGGZF+/0ooI7KrPuUSztUdU5A=="], + "supports-preserve-symlinks-flag": ["supports-preserve-symlinks-flag@1.0.0", "", {}, "sha512-ot0WnXS9fgdkgIcePe6RHNk1WA8+muPa6cSjeR3V8K27q9BB1rTE3R1p7Hv0z1ZyAc8s6Vvv8DIyWf681MAt0w=="], + "tar-fs": ["tar-fs@3.1.1", "", { "dependencies": { "pump": "^3.0.0", "tar-stream": "^3.1.5" }, "optionalDependencies": { "bare-fs": "^4.0.1", "bare-path": "^3.0.0" } }, "sha512-LZA0oaPOc2fVo82Txf3gw+AkEd38szODlptMYejQUhndHMLQ9M059uXR+AfS7DNo0NpINvSqDsvyaCrBVkptWg=="], "tar-stream": ["tar-stream@3.1.7", "", { "dependencies": { "b4a": "^1.6.4", "fast-fifo": "^1.2.0", "streamx": "^2.15.0" } }, "sha512-qJj60CXt7IU1Ffyc3NJMjh6EkuCFej46zUqJ4J7pqYlThyd9bO0XBTmcOIhSzZJVWfsLks0+nle/j538YAW9RQ=="], @@ -408,6 +486,12 @@ "@isaacs/cliui/wrap-ansi": ["wrap-ansi@8.1.0", "", { "dependencies": { "ansi-styles": "^6.1.0", "string-width": "^5.0.1", "strip-ansi": "^7.0.1" } }, "sha512-si7QWI6zUMq56bESFvagtmzMdGOtoxfR+Sez11Mobfc7tm+VkUckk9bW2UeffTGVUbOksxmSw0AA2gs8g71NCQ=="], + "@opentelemetry/core/@opentelemetry/semantic-conventions": ["@opentelemetry/semantic-conventions@1.28.0", "", {}, "sha512-lp4qAiMTD4sNWW4DbKLBkfiMZ4jbAboJIGOQr5DvciMRI494OapieI9qiODpOt0XBr1LjIDy1xAGAnVs5supTA=="], + + "@opentelemetry/resources/@opentelemetry/semantic-conventions": ["@opentelemetry/semantic-conventions@1.28.0", "", {}, "sha512-lp4qAiMTD4sNWW4DbKLBkfiMZ4jbAboJIGOQr5DvciMRI494OapieI9qiODpOt0XBr1LjIDy1xAGAnVs5supTA=="], + + "@opentelemetry/sdk-trace-base/@opentelemetry/semantic-conventions": ["@opentelemetry/semantic-conventions@1.28.0", "", {}, "sha512-lp4qAiMTD4sNWW4DbKLBkfiMZ4jbAboJIGOQr5DvciMRI494OapieI9qiODpOt0XBr1LjIDy1xAGAnVs5supTA=="], + "bl/buffer": ["buffer@5.7.1", "", { "dependencies": { "base64-js": "^1.3.1", "ieee754": "^1.1.13" } }, "sha512-EHcyIPBQ4BSGlvjB16k5KgAJ27CIsHY/2JBmCRReo48y9rQ3MaUzWX3KVlBa4U7MyX02HdVj0K7C3WaB3ju7FQ=="], "bl/readable-stream": ["readable-stream@3.6.2", "", { "dependencies": { "inherits": "^2.0.3", "string_decoder": "^1.1.1", "util-deprecate": "^1.0.1" } }, "sha512-9u/sniCrY3D5WdsERHzHE4G2YCXqoG5FTHUiCC4SIbr6XcLZBY05ya9EKjYek9O5xOAwjGq+1JdGBAS7Q9ScoA=="], diff --git a/migrations/0000000003_trace_context.sql b/migrations/0000000003_trace_context.sql new file mode 100644 index 0000000..b8baeb0 --- /dev/null +++ b/migrations/0000000003_trace_context.sql @@ -0,0 +1,261 @@ +-- Add trace_context column for OpenTelemetry context propagation +-- This stores W3C traceparent/tracestate for distributed tracing + +-- Add column to executions table +alter table pgconductor._private_executions +add column if not exists trace_context jsonb; + +-- Add trace_context to execution_spec type +alter type pgconductor.execution_spec add attribute trace_context jsonb; + +drop function pgconductor.invoke; + +-- Update invoke_batch function to pass trace_context through +-- Preserves all existing logic, just adds trace_context column +create or replace function pgconductor.invoke_batch( + specs pgconductor.execution_spec[] +) + returns table(id uuid) + language plpgsql + volatile + set search_path to '' +as $function$ +declare + v_now timestamptz; +begin + v_now := pgconductor._private_current_time(); + + -- clear locked dedupe keys before batch insert + update pgconductor._private_executions as e + set + dedupe_key = null, + locked_by = null, + locked_at = null, + failed_at = v_now, + last_error = 'superseded by reinvoke' + from unnest(specs) as spec + where e.dedupe_key = spec.dedupe_key + and e.task_key = spec.task_key + and e.queue = coalesce(spec.queue, 'default') + and e.locked_at is not null + and spec.dedupe_key is not null; + + -- batch insert all executions + -- note: duplicate dedupe_keys within same batch will cause error + -- users should deduplicate client-side if needed + return query + insert into pgconductor._private_executions ( + id, + task_key, + queue, + payload, + run_at, + dedupe_key, + singleton_on, + cron_expression, + priority, + trace_context + ) + select + pgconductor._private_portable_uuidv7(), + spec.task_key, + coalesce(spec.queue, 'default'), + spec.payload, + coalesce(spec.run_at, v_now), + spec.dedupe_key, + case + when spec.dedupe_seconds is not null then + 'epoch'::timestamptz + '1 second'::interval * ( + spec.dedupe_seconds * floor( + extract(epoch from v_now) / spec.dedupe_seconds + ) + ) + else null + end, + spec.cron_expression, + coalesce(spec.priority, 0), + spec.trace_context + from unnest(specs) as spec + on conflict (task_key, dedupe_key, queue) do update set + payload = excluded.payload, + run_at = excluded.run_at, + priority = excluded.priority, + cron_expression = excluded.cron_expression, + singleton_on = excluded.singleton_on, + trace_context = excluded.trace_context + returning pgconductor._private_executions.id; +end; +$function$ +; + +-- Update invoke function to pass trace_context through +-- Preserves all existing logic, just adds trace_context parameter and column +create or replace function pgconductor.invoke( + p_task_key text, + p_queue text default 'default', + p_payload jsonb default null, + p_run_at timestamptz default null, + p_dedupe_key text default null, + p_dedupe_seconds integer default null, + p_dedupe_next_slot boolean default false, + p_cron_expression text default null, + p_priority integer default null, + p_trace_context jsonb default null +) + returns table(id uuid) + language plpgsql + volatile + set search_path to '' +as $function$ +declare + v_now timestamptz; + v_singleton_on timestamptz; + v_next_singleton_on timestamptz; + v_run_at timestamptz; + v_new_id uuid; +begin + v_now := pgconductor._private_current_time(); + v_run_at := coalesce(p_run_at, v_now); + + -- clear locked dedupe key before insert (supersede pattern) + if p_dedupe_key is not null then + update pgconductor._private_executions + set + dedupe_key = null, + locked_by = null, + locked_at = null, + failed_at = v_now, + last_error = 'superseded by reinvoke' + where dedupe_key = p_dedupe_key + and task_key = p_task_key + and queue = p_queue + and locked_at is not null; + end if; + + -- singleton throttle/debounce logic + if p_dedupe_seconds is not null then + -- calculate current time slot (pg-boss formula) + v_singleton_on := 'epoch'::timestamptz + '1 second'::interval * ( + p_dedupe_seconds * floor( + extract(epoch from v_now) / p_dedupe_seconds + ) + ); + + if p_dedupe_next_slot = false then + -- throttle: try current slot, return empty if blocked + return query + insert into pgconductor._private_executions ( + id, + task_key, + queue, + payload, + run_at, + dedupe_key, + singleton_on, + cron_expression, + priority, + trace_context + ) values ( + pgconductor._private_portable_uuidv7(), + p_task_key, + p_queue, + p_payload, + v_run_at, + p_dedupe_key, + v_singleton_on, + p_cron_expression, + coalesce(p_priority, 0), + p_trace_context + ) + on conflict (task_key, singleton_on, coalesce(dedupe_key, ''), queue) + where singleton_on is not null and completed_at is null and failed_at is null and cancelled = false + do nothing + returning _private_executions.id; + return; + else + -- debounce: upsert into next slot + v_next_singleton_on := v_singleton_on + (p_dedupe_seconds || ' seconds')::interval; + + return query + insert into pgconductor._private_executions ( + id, + task_key, + queue, + payload, + run_at, + dedupe_key, + singleton_on, + cron_expression, + priority, + trace_context + ) values ( + pgconductor._private_portable_uuidv7(), + p_task_key, + p_queue, + p_payload, + v_next_singleton_on, + p_dedupe_key, + v_next_singleton_on, + p_cron_expression, + coalesce(p_priority, 0), + p_trace_context + ) + on conflict (task_key, singleton_on, coalesce(dedupe_key, ''), queue) + where singleton_on is not null and completed_at is null and failed_at is null and cancelled = false + do update set + payload = excluded.payload, + run_at = excluded.run_at, + priority = excluded.priority, + trace_context = excluded.trace_context + returning _private_executions.id; + return; + end if; + end if; + + -- regular invoke (no singleton) + if p_dedupe_key is not null then + -- clear keys that are currently locked so a subsequent insert can succeed. + update pgconductor._private_executions as e + set + dedupe_key = null, + locked_by = null, + locked_at = null, + failed_at = pgconductor._private_current_time(), + last_error = 'superseded by reinvoke' + where e.dedupe_key = p_dedupe_key + and e.task_key = p_task_key + and e.queue = p_queue + and e.locked_at is not null; + end if; + + return query insert into pgconductor._private_executions as e ( + id, + task_key, + queue, + payload, + run_at, + dedupe_key, + cron_expression, + priority, + trace_context + ) values ( + pgconductor._private_portable_uuidv7(), + p_task_key, + p_queue, + p_payload, + v_run_at, + p_dedupe_key, + p_cron_expression, + coalesce(p_priority, 0), + p_trace_context + ) + on conflict (task_key, dedupe_key, queue) do update set + payload = excluded.payload, + run_at = excluded.run_at, + priority = excluded.priority, + cron_expression = excluded.cron_expression, + trace_context = excluded.trace_context + returning e.id; +end; +$function$ +; diff --git a/package.json b/package.json index 7edec01..bfab382 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "pgconductor", "type": "module", - "workspaces": ["packages/pgconductor-js"], + "workspaces": ["packages/pgconductor-js", "packages/pgconductor-js-instrumentation"], "catalog": { "postgres": "3.4.7", "prettier": "3.4.2", diff --git a/packages/pgconductor-js-instrumentation/package.json b/packages/pgconductor-js-instrumentation/package.json new file mode 100644 index 0000000..9167c04 --- /dev/null +++ b/packages/pgconductor-js-instrumentation/package.json @@ -0,0 +1,55 @@ +{ + "name": "@pgconductor/js-instrumentation", + "version": "0.1.0", + "description": "OpenTelemetry instrumentation for pgconductor-js", + "module": "dist/index.js", + "type": "module", + "files": [ + "dist", + "README.md" + ], + "repository": { + "type": "git", + "url": "https://github.com/psteinroe/pgconductor" + }, + "keywords": [ + "postgres", + "opentelemetry", + "instrumentation", + "tracing", + "observability", + "pgconductor" + ], + "author": "psteinroe", + "license": "MIT", + "publishConfig": { + "access": "public", + "tag": "latest", + "provenance": true + }, + "scripts": { + "build": "bun build src/index.ts --outdir dist --target node", + "test": "bun test", + "typecheck": "tsc --noEmit" + }, + "devDependencies": { + "@testcontainers/postgresql": "catalog:", + "@types/bun": "catalog:", + "@opentelemetry/sdk-trace-base": "^1.30.0", + "@opentelemetry/sdk-trace-node": "^1.30.0", + "postgres": "catalog:", + "zod": "catalog:", + "pgconductor-js": "workspace:*" + }, + "peerDependencies": { + "typescript": "catalog:", + "@opentelemetry/api": "^1.9.0", + "@opentelemetry/instrumentation": "^0.57.0", + "pgconductor-js": ">0.1.0" + }, + "dependencies": { + "@opentelemetry/api": "^1.9.0", + "@opentelemetry/instrumentation": "^0.57.0", + "@opentelemetry/semantic-conventions": "^1.30.0" + } +} diff --git a/packages/pgconductor-js-instrumentation/src/index.ts b/packages/pgconductor-js-instrumentation/src/index.ts new file mode 100644 index 0000000..d330532 --- /dev/null +++ b/packages/pgconductor-js-instrumentation/src/index.ts @@ -0,0 +1,4 @@ +export { PgConductorInstrumentation } from "./instrumentation"; +export * from "./semantic-conventions"; +export * from "./types"; +export { VERSION } from "./version"; diff --git a/packages/pgconductor-js-instrumentation/src/instrumentation.ts b/packages/pgconductor-js-instrumentation/src/instrumentation.ts new file mode 100644 index 0000000..1d02544 --- /dev/null +++ b/packages/pgconductor-js-instrumentation/src/instrumentation.ts @@ -0,0 +1,701 @@ +import { + type Span, + type Link, + SpanKind, + SpanStatusCode, + context, + propagation, + trace, + ROOT_CONTEXT, +} from "@opentelemetry/api"; +import { + InstrumentationBase, + InstrumentationNodeModuleDefinition, + isWrapped, +} from "@opentelemetry/instrumentation"; + +import type { PgConductorInstrumentationConfig, TraceContext } from "./types"; +import { VERSION } from "./version"; +import * as SemanticConventions from "./semantic-conventions"; + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +type AnyFunction = (...args: any[]) => any; + +/** + * OpenTelemetry instrumentation for Postgres Conductor. + * + * Provides tracing for: + * - Task invocations (producer spans) + * - Task executions (consumer spans) + * - Step operations (internal spans) + * - Sleep/invoke child operations + */ +export class PgConductorInstrumentation extends InstrumentationBase { + constructor(config: PgConductorInstrumentationConfig = {}) { + super(SemanticConventions.INSTRUMENTATION_NAME, VERSION, config); + } + + protected init(): InstrumentationNodeModuleDefinition[] { + return [ + new InstrumentationNodeModuleDefinition( + "pgconductor-js", + [">=0.1.0"], + (moduleExports) => { + this.patchConductor(moduleExports); + this.patchWorker(moduleExports); + this.patchTaskContext(moduleExports); + return moduleExports; + }, + (moduleExports) => { + this.unpatchConductor(moduleExports); + this.unpatchWorker(moduleExports); + this.unpatchTaskContext(moduleExports); + return moduleExports; + }, + ), + ]; + } + + override setConfig(config: PgConductorInstrumentationConfig): void { + super.setConfig(config); + } + + override getConfig(): PgConductorInstrumentationConfig { + return this._config; + } + + private get propagateContext(): boolean { + return this.getConfig().propagateContext !== false; + } + + private get requireParentSpan(): boolean { + return this.getConfig().requireParentSpan === true; + } + + /** + * Manually patch pgconductor-js module exports. + * Use this in test environments where automatic module hooking doesn't work (e.g., Bun). + */ + patchModule(moduleExports: { + Conductor?: { prototype: Record }; + Worker?: { prototype: Record }; + TaskContext?: { prototype: Record }; + }): void { + if (moduleExports.Conductor) { + this.patchConductor( + moduleExports as { Conductor: { prototype: Record } }, + ); + } + if (moduleExports.Worker) { + this.patchWorker(moduleExports as { Worker: { prototype: Record } }); + } + if (moduleExports.TaskContext) { + this.patchTaskContext( + moduleExports as { TaskContext: { prototype: Record } }, + ); + } + } + + /** + * Manually unpatch pgconductor-js module exports. + */ + unpatchModule(moduleExports: { + Conductor?: { prototype: Record }; + Worker?: { prototype: Record }; + TaskContext?: { prototype: Record }; + }): void { + if (moduleExports.Conductor) { + this.unpatchConductor( + moduleExports as { Conductor: { prototype: Record } }, + ); + } + if (moduleExports.Worker) { + this.unpatchWorker(moduleExports as { Worker: { prototype: Record } }); + } + if (moduleExports.TaskContext) { + this.unpatchTaskContext( + moduleExports as { TaskContext: { prototype: Record } }, + ); + } + } + + // ==================== Conductor Patching ==================== + + private patchConductor(moduleExports: { + Conductor: { prototype: Record }; + }): void { + const Conductor = moduleExports.Conductor; + if (!Conductor?.prototype) return; + + if (isWrapped(Conductor.prototype.invoke)) { + this._unwrap(Conductor.prototype, "invoke"); + } + this._wrap(Conductor.prototype, "invoke", this.patchConductorInvoke.bind(this)); + + if (isWrapped(Conductor.prototype.emit)) { + this._unwrap(Conductor.prototype, "emit"); + } + this._wrap(Conductor.prototype, "emit", this.patchConductorEmit.bind(this)); + } + + private unpatchConductor(moduleExports: { + Conductor: { prototype: Record }; + }): void { + const Conductor = moduleExports.Conductor; + if (!Conductor?.prototype) return; + + if (isWrapped(Conductor.prototype.invoke)) { + this._unwrap(Conductor.prototype, "invoke"); + } + if (isWrapped(Conductor.prototype.emit)) { + this._unwrap(Conductor.prototype, "emit"); + } + } + + private patchConductorInvoke(original: AnyFunction): AnyFunction { + // eslint-disable-next-line @typescript-eslint/no-this-alias + const instrumentation = this; + return async function ( + this: unknown, + task: { name: string; queue?: string }, + payloadOrItems: unknown, + opts?: Record, + ) { + const queue = (task.queue as string) || "default"; + const taskName = task.name; + const spanName = `${queue} ${taskName} publish`; + + // Check if we should create a span + if (instrumentation.requireParentSpan && !trace.getActiveSpan()) { + return original.call(this, task, payloadOrItems, opts); + } + + const tracer = instrumentation.tracer; + const span = tracer.startSpan(spanName, { + kind: SpanKind.PRODUCER, + attributes: { + [SemanticConventions.MESSAGING_SYSTEM]: SemanticConventions.MESSAGING_SYSTEM_VALUE, + [SemanticConventions.MESSAGING_OPERATION]: + SemanticConventions.MESSAGING_OPERATION_PUBLISH, + [SemanticConventions.MESSAGING_DESTINATION_NAME]: queue, + [SemanticConventions.PGCONDUCTOR_TASK_NAME]: taskName, + [SemanticConventions.PGCONDUCTOR_TASK_QUEUE]: queue, + }, + }); + + // Call produce hook + const config = instrumentation.getConfig(); + if (config.produceHook) { + const result = config.produceHook(span, taskName, payloadOrItems); + if (result === false) { + span.end(); + return original.call(this, task, payloadOrItems, opts); + } + } + + // Inject trace context if propagation is enabled + // Important: inject from the span's context, not from active context + // because the span hasn't been set in active context yet + let traceContext: TraceContext | undefined; + if (instrumentation.propagateContext) { + traceContext = {}; + propagation.inject(trace.setSpan(context.active(), span), traceContext); + } + + // Merge trace_context into opts or items + let modifiedOpts = opts; + let modifiedItems = payloadOrItems; + + if (traceContext) { + if (Array.isArray(payloadOrItems)) { + // Batch invoke - add trace_context to each item + modifiedItems = (payloadOrItems as Record[]).map((item) => ({ + ...item, + trace_context: traceContext, + })); + } else { + // Single invoke - add to opts + modifiedOpts = { ...opts, trace_context: traceContext }; + } + } + + const result = await context.with(trace.setSpan(context.active(), span), () => + original.call(this, task, modifiedItems, modifiedOpts), + ); + + // Set execution ID if returned + if (typeof result === "string") { + span.setAttribute(SemanticConventions.PGCONDUCTOR_EXECUTION_ID, result); + } else if (Array.isArray(result)) { + span.setAttribute(SemanticConventions.PGCONDUCTOR_BATCH_SIZE, result.length); + } + + span.setStatus({ code: SpanStatusCode.OK }); + span.end(); + return result; + }; + } + + private patchConductorEmit(original: AnyFunction): AnyFunction { + // eslint-disable-next-line @typescript-eslint/no-this-alias + const instrumentation = this; + return async function (this: unknown, event: string, payload: unknown) { + const spanName = `default emit ${event}`; + + if (instrumentation.requireParentSpan && !trace.getActiveSpan()) { + return original.call(this, event, payload); + } + + const tracer = instrumentation.tracer; + const span = tracer.startSpan(spanName, { + kind: SpanKind.PRODUCER, + attributes: { + [SemanticConventions.MESSAGING_SYSTEM]: SemanticConventions.MESSAGING_SYSTEM_VALUE, + [SemanticConventions.MESSAGING_OPERATION]: + SemanticConventions.MESSAGING_OPERATION_PUBLISH, + [SemanticConventions.PGCONDUCTOR_TASK_NAME]: event, + }, + }); + + // emit is fire-and-forget, no error recording needed + const result = await context.with(trace.setSpan(context.active(), span), () => + original.call(this, event, payload), + ); + span.setStatus({ code: SpanStatusCode.OK }); + span.end(); + return result; + }; + } + + // ==================== Worker Patching ==================== + + private patchWorker(moduleExports: { Worker: { prototype: Record } }): void { + const Worker = moduleExports.Worker; + if (!Worker?.prototype) return; + + if (isWrapped(Worker.prototype.executeSingleTask)) { + this._unwrap(Worker.prototype, "executeSingleTask"); + } + this._wrap(Worker.prototype, "executeSingleTask", this.patchExecuteSingleTask.bind(this)); + + if (isWrapped(Worker.prototype.executeBatchTask)) { + this._unwrap(Worker.prototype, "executeBatchTask"); + } + this._wrap(Worker.prototype, "executeBatchTask", this.patchExecuteBatchTask.bind(this)); + } + + private unpatchWorker(moduleExports: { + Worker: { prototype: Record }; + }): void { + const Worker = moduleExports.Worker; + if (!Worker?.prototype) return; + + if (isWrapped(Worker.prototype.executeSingleTask)) { + this._unwrap(Worker.prototype, "executeSingleTask"); + } + if (isWrapped(Worker.prototype.executeBatchTask)) { + this._unwrap(Worker.prototype, "executeBatchTask"); + } + } + + private patchExecuteSingleTask(original: AnyFunction): AnyFunction { + // eslint-disable-next-line @typescript-eslint/no-this-alias + const instrumentation = this; + return async function ( + this: { queueName: string }, + task: { name: string }, + exec: { + id: string; + task_key: string; + queue: string; + payload: unknown; + trace_context?: TraceContext | null; + }, + ) { + const queue = this.queueName || exec.queue; + const taskKey = exec.task_key; + const spanName = `${queue} ${taskKey} process`; + + const tracer = instrumentation.tracer; + + // Extract parent context from trace_context if available + let parentContext = context.active(); + const links: Link[] = []; + + if (instrumentation.propagateContext && exec.trace_context) { + const extractedContext = propagation.extract(ROOT_CONTEXT, exec.trace_context); + const spanContext = trace.getSpanContext(extractedContext); + if (spanContext) { + // Link to the producer span instead of making it a parent + links.push({ context: spanContext }); + } + } + + const span = tracer.startSpan( + spanName, + { + kind: SpanKind.CONSUMER, + attributes: { + [SemanticConventions.MESSAGING_SYSTEM]: SemanticConventions.MESSAGING_SYSTEM_VALUE, + [SemanticConventions.MESSAGING_OPERATION]: + SemanticConventions.MESSAGING_OPERATION_PROCESS, + [SemanticConventions.MESSAGING_DESTINATION_NAME]: queue, + [SemanticConventions.PGCONDUCTOR_TASK_NAME]: taskKey, + [SemanticConventions.PGCONDUCTOR_TASK_QUEUE]: queue, + [SemanticConventions.PGCONDUCTOR_EXECUTION_ID]: exec.id, + }, + links, + }, + parentContext, + ); + + // Call consume hook + const config = instrumentation.getConfig(); + if (config.consumeHook) { + const result = config.consumeHook(span, exec); + if (result === false) { + span.end(); + return original.call(this, task, exec); + } + } + + return context.with(trace.setSpan(context.active(), span), async () => { + try { + const result = (await original.call(this, task, exec)) as { + status: string; + error?: string; + }; + + // Set status based on result + const status = result?.status; + span.setAttribute(SemanticConventions.PGCONDUCTOR_EXECUTION_STATUS, status || "unknown"); + + if (status === "completed") { + span.setStatus({ code: SpanStatusCode.OK }); + } else if (status === "failed" || status === "permanently_failed") { + span.setStatus({ code: SpanStatusCode.ERROR, message: result?.error }); + if (result?.error) { + span.setAttribute(SemanticConventions.PGCONDUCTOR_ERROR_MESSAGE, result.error); + } + span.setAttribute( + SemanticConventions.PGCONDUCTOR_ERROR_RETRYABLE, + status !== "permanently_failed", + ); + } else if (status === "released" || status === "invoke_child") { + span.setStatus({ code: SpanStatusCode.OK }); + } + + return result; + } catch (error) { + instrumentation.recordError(span, error); + throw error; + } finally { + span.end(); + } + }); + }; + } + + private patchExecuteBatchTask(original: AnyFunction): AnyFunction { + // eslint-disable-next-line @typescript-eslint/no-this-alias + const instrumentation = this; + return async function ( + this: { queueName: string }, + task: { name: string }, + taskKey: string, + executions: { id: string; queue: string; trace_context?: TraceContext | null }[], + ) { + const queue = this.queueName || executions[0]?.queue || "default"; + const spanName = `${queue} ${taskKey} process batch`; + + const tracer = instrumentation.tracer; + + // Build links from all execution trace contexts + const links: Link[] = []; + if (instrumentation.propagateContext) { + for (const exec of executions) { + if (exec.trace_context) { + const extractedContext = propagation.extract(ROOT_CONTEXT, exec.trace_context); + const spanContext = trace.getSpanContext(extractedContext); + if (spanContext) { + links.push({ context: spanContext }); + } + } + } + } + + const span = tracer.startSpan(spanName, { + kind: SpanKind.CONSUMER, + attributes: { + [SemanticConventions.MESSAGING_SYSTEM]: SemanticConventions.MESSAGING_SYSTEM_VALUE, + [SemanticConventions.MESSAGING_OPERATION]: + SemanticConventions.MESSAGING_OPERATION_PROCESS, + [SemanticConventions.MESSAGING_DESTINATION_NAME]: queue, + [SemanticConventions.PGCONDUCTOR_TASK_NAME]: taskKey, + [SemanticConventions.PGCONDUCTOR_TASK_QUEUE]: queue, + [SemanticConventions.PGCONDUCTOR_BATCH_SIZE]: executions.length, + }, + links, + }); + + return context.with(trace.setSpan(context.active(), span), async () => { + try { + const results = (await original.call(this, task, taskKey, executions)) as { + status: string; + error?: string; + }[]; + + // Count statuses + let completed = 0; + let failed = 0; + for (const result of results) { + if (result.status === "completed") completed++; + else if (result.status === "failed" || result.status === "permanently_failed") failed++; + } + + if (failed === 0) { + span.setStatus({ code: SpanStatusCode.OK }); + } else if (completed === 0) { + span.setStatus({ code: SpanStatusCode.ERROR, message: "All executions failed" }); + } else { + span.setStatus({ code: SpanStatusCode.OK }); + } + + return results; + } catch (error) { + instrumentation.recordError(span, error); + throw error; + } finally { + span.end(); + } + }); + }; + } + + // ==================== TaskContext Patching ==================== + + private patchTaskContext(moduleExports: { + TaskContext: { prototype: Record }; + }): void { + const TaskContext = moduleExports.TaskContext; + if (!TaskContext?.prototype) return; + + if (isWrapped(TaskContext.prototype.step)) { + this._unwrap(TaskContext.prototype, "step"); + } + this._wrap(TaskContext.prototype, "step", this.patchStep.bind(this)); + + if (isWrapped(TaskContext.prototype.invoke)) { + this._unwrap(TaskContext.prototype, "invoke"); + } + this._wrap(TaskContext.prototype, "invoke", this.patchContextInvoke.bind(this)); + + if (isWrapped(TaskContext.prototype.sleep)) { + this._unwrap(TaskContext.prototype, "sleep"); + } + this._wrap(TaskContext.prototype, "sleep", this.patchSleep.bind(this)); + } + + private unpatchTaskContext(moduleExports: { + TaskContext: { prototype: Record }; + }): void { + const TaskContext = moduleExports.TaskContext; + if (!TaskContext?.prototype) return; + + if (isWrapped(TaskContext.prototype.step)) { + this._unwrap(TaskContext.prototype, "step"); + } + if (isWrapped(TaskContext.prototype.invoke)) { + this._unwrap(TaskContext.prototype, "invoke"); + } + if (isWrapped(TaskContext.prototype.sleep)) { + this._unwrap(TaskContext.prototype, "sleep"); + } + } + + private patchStep(original: AnyFunction): AnyFunction { + // eslint-disable-next-line @typescript-eslint/no-this-alias + const instrumentation = this; + return async function ( + this: { opts: { execution: { task_key: string; id: string } } }, + name: string, + fn: () => unknown, + ) { + const taskKey = this.opts?.execution?.task_key || "unknown"; + const executionId = this.opts?.execution?.id || "unknown"; + const spanName = `${taskKey} step ${name}`; + + const tracer = instrumentation.tracer; + const span = tracer.startSpan(spanName, { + kind: SpanKind.INTERNAL, + attributes: { + [SemanticConventions.PGCONDUCTOR_TASK_NAME]: taskKey, + [SemanticConventions.PGCONDUCTOR_STEP_NAME]: name, + [SemanticConventions.PGCONDUCTOR_EXECUTION_ID]: executionId, + }, + }); + + return context.with(trace.setSpan(context.active(), span), async () => { + try { + // The original step() will check if cached and return early + // We detect caching by checking if fn was called + let fnCalled = false; + const wrappedFn = async () => { + fnCalled = true; + return fn(); + }; + + const result = await original.call(this, name, wrappedFn); + + const cached = !fnCalled; + span.setAttribute(SemanticConventions.PGCONDUCTOR_STEP_CACHED, cached); + + const config = instrumentation.getConfig(); + if (config.stepHook) { + config.stepHook(span, name, cached); + } + + span.setStatus({ code: SpanStatusCode.OK }); + return result; + } catch (error) { + instrumentation.recordError(span, error); + throw error; + } finally { + span.end(); + } + }); + }; + } + + private patchContextInvoke(original: AnyFunction): AnyFunction { + // eslint-disable-next-line @typescript-eslint/no-this-alias + const instrumentation = this; + return async function ( + this: { + opts: { + execution: { task_key: string; id: string }; + abortController?: { signal: AbortSignal }; + }; + }, + key: string, + task: { name: string; queue?: string }, + payload: unknown, + timeout?: number, + ) { + const parentTaskKey = this.opts?.execution?.task_key || "unknown"; + const executionId = this.opts?.execution?.id || "unknown"; + const childTaskName = task.name; + const spanName = `${parentTaskKey} invoke ${childTaskName}`; + + const tracer = instrumentation.tracer; + const span = tracer.startSpan(spanName, { + kind: SpanKind.INTERNAL, + attributes: { + [SemanticConventions.PGCONDUCTOR_TASK_NAME]: parentTaskKey, + [SemanticConventions.PGCONDUCTOR_EXECUTION_ID]: executionId, + [SemanticConventions.PGCONDUCTOR_STEP_NAME]: key, + }, + }); + + // Invoke can either: + // 1. Return cached result if step exists in DB + // 2. Trigger abort (child-invocation) and return a never-resolving promise + // We need to end the span in both cases (no error recording - this is control flow) + + const abortController = this.opts?.abortController; + let abortListener: (() => void) | undefined; + + // Set up abort listener to end span when invoke triggers hangup + if (abortController?.signal) { + abortListener = () => { + span.setAttribute(SemanticConventions.PGCONDUCTOR_EXECUTION_STATUS, "invoke_child"); + span.setStatus({ code: SpanStatusCode.OK }); + span.end(); + }; + abortController.signal.addEventListener("abort", abortListener, { once: true }); + } + + const result = await context.with(trace.setSpan(context.active(), span), () => + original.call(this, key, task, payload, timeout), + ); + + // If we get here, invoke returned (from cache) + if (abortListener && abortController?.signal) { + abortController.signal.removeEventListener("abort", abortListener); + } + + span.setStatus({ code: SpanStatusCode.OK }); + span.end(); + return result; + }; + } + + private patchSleep(original: AnyFunction): AnyFunction { + // eslint-disable-next-line @typescript-eslint/no-this-alias + const instrumentation = this; + return async function ( + this: { + opts: { + execution: { task_key: string; id: string }; + abortController?: { signal: AbortSignal }; + }; + }, + id: string, + ms: number, + ) { + const taskKey = this.opts?.execution?.task_key || "unknown"; + const executionId = this.opts?.execution?.id || "unknown"; + const spanName = `${taskKey} sleep`; + + const tracer = instrumentation.tracer; + const span = tracer.startSpan(spanName, { + kind: SpanKind.INTERNAL, + attributes: { + [SemanticConventions.PGCONDUCTOR_TASK_NAME]: taskKey, + [SemanticConventions.PGCONDUCTOR_EXECUTION_ID]: executionId, + [SemanticConventions.PGCONDUCTOR_STEP_NAME]: id, + }, + }); + + // Sleep can either: + // 1. Return immediately if already cached (step exists in DB) + // 2. Trigger abort and return a never-resolving promise (not cached) + // We need to end the span in both cases (no error recording - this is control flow) + + const abortController = this.opts?.abortController; + let abortListener: (() => void) | undefined; + + // Set up abort listener to end span when sleep triggers hangup + if (abortController?.signal) { + abortListener = () => { + span.setAttribute(SemanticConventions.PGCONDUCTOR_EXECUTION_STATUS, "released"); + span.setStatus({ code: SpanStatusCode.OK }); + span.end(); + }; + abortController.signal.addEventListener("abort", abortListener, { once: true }); + } + + const result = await context.with(trace.setSpan(context.active(), span), () => + original.call(this, id, ms), + ); + + // If we get here, sleep returned (from cache) + if (abortListener && abortController?.signal) { + abortController.signal.removeEventListener("abort", abortListener); + } + + span.setStatus({ code: SpanStatusCode.OK }); + span.end(); + return result; + }; + } + + // ==================== Helpers ==================== + + private recordError(span: Span, error: unknown): void { + const err = error instanceof Error ? error : new Error(String(error)); + span.recordException(err); + span.setStatus({ code: SpanStatusCode.ERROR, message: err.message }); + span.setAttribute(SemanticConventions.PGCONDUCTOR_ERROR_MESSAGE, err.message); + } +} diff --git a/packages/pgconductor-js-instrumentation/src/semantic-conventions.ts b/packages/pgconductor-js-instrumentation/src/semantic-conventions.ts new file mode 100644 index 0000000..b48206f --- /dev/null +++ b/packages/pgconductor-js-instrumentation/src/semantic-conventions.ts @@ -0,0 +1,45 @@ +/** + * Semantic conventions for Postgres Conductor instrumentation. + * Following OpenTelemetry messaging semantic conventions where applicable. + */ + +// Standard OTel messaging attributes +export const MESSAGING_SYSTEM = "messaging.system"; +export const MESSAGING_OPERATION = "messaging.operation"; +export const MESSAGING_DESTINATION_NAME = "messaging.destination.name"; + +// Postgres Conductor specific attributes +export const PGCONDUCTOR_TASK_NAME = "pgconductor.task.name"; +export const PGCONDUCTOR_TASK_QUEUE = "pgconductor.task.queue"; + +export const PGCONDUCTOR_EXECUTION_ID = "pgconductor.execution.id"; +export const PGCONDUCTOR_EXECUTION_ATTEMPT = "pgconductor.execution.attempt"; +export const PGCONDUCTOR_EXECUTION_STATUS = "pgconductor.execution.status"; + +export const PGCONDUCTOR_STEP_NAME = "pgconductor.step.name"; +export const PGCONDUCTOR_STEP_CACHED = "pgconductor.step.cached"; + +export const PGCONDUCTOR_PARENT_EXECUTION_ID = "pgconductor.parent.execution_id"; +export const PGCONDUCTOR_CHILD_EXECUTION_ID = "pgconductor.child.execution_id"; + +export const PGCONDUCTOR_BATCH_SIZE = "pgconductor.batch.size"; + +export const PGCONDUCTOR_ERROR_MESSAGE = "pgconductor.error.message"; +export const PGCONDUCTOR_ERROR_RETRYABLE = "pgconductor.error.retryable"; + +// Messaging system value +export const MESSAGING_SYSTEM_VALUE = "pgconductor"; + +// Messaging operation values +export const MESSAGING_OPERATION_PUBLISH = "publish"; +export const MESSAGING_OPERATION_PROCESS = "process"; + +// Execution status values +export const EXECUTION_STATUS_COMPLETED = "completed"; +export const EXECUTION_STATUS_FAILED = "failed"; +export const EXECUTION_STATUS_RELEASED = "released"; +export const EXECUTION_STATUS_INVOKE_CHILD = "invoke_child"; +export const EXECUTION_STATUS_CANCELLED = "cancelled"; + +// Instrumentation name +export const INSTRUMENTATION_NAME = "@pgconductor-js/instrumentation"; diff --git a/packages/pgconductor-js-instrumentation/src/types.ts b/packages/pgconductor-js-instrumentation/src/types.ts new file mode 100644 index 0000000..d14c967 --- /dev/null +++ b/packages/pgconductor-js-instrumentation/src/types.ts @@ -0,0 +1,93 @@ +import type { Span } from "@opentelemetry/api"; +import type { InstrumentationConfig } from "@opentelemetry/instrumentation"; + +/** + * Configuration options for PgConductorInstrumentation. + */ +export interface PgConductorInstrumentationConfig extends InstrumentationConfig { + /** + * If true, spans will only be created if there is an active parent span. + * @default false + */ + requireParentSpan?: boolean; + + /** + * If true, trace context will be propagated through executions. + * This enables linking producer spans (invoke) to consumer spans (process). + * @default true + */ + propagateContext?: boolean; + + /** + * Hook called before a producer span is created (invoke/emit). + * Return false to skip span creation. + * @param span - The span being created + * @param taskName - The task name being invoked + * @param payload - The task payload + */ + produceHook?: (span: Span, taskName: string, payload: unknown) => void | false; + + /** + * Hook called before a consumer span is created (process). + * Return false to skip span creation. + * @param span - The span being created + * @param execution - The execution being processed + */ + consumeHook?: ( + span: Span, + execution: { + id: string; + task_key: string; + queue: string; + payload: unknown; + }, + ) => void | false; + + /** + * Hook called when a step span is created. + * @param span - The span being created + * @param name - The step name + * @param cached - Whether the step result was cached + */ + stepHook?: (span: Span, name: string, cached: boolean) => void; +} + +/** + * Trace context stored in executions table. + */ +export interface TraceContext { + traceparent?: string; + tracestate?: string; +} + +/** + * Task invocation info for producer spans. + */ +export interface TaskInvocationInfo { + taskName: string; + queue: string; + payload?: unknown; + executionId?: string; +} + +/** + * Execution info for consumer spans. + */ +export interface ExecutionInfo { + id: string; + task_key: string; + queue: string; + payload: unknown; + attempt?: number; + trace_context?: TraceContext | null; +} + +/** + * Step execution info for step spans. + */ +export interface StepInfo { + name: string; + executionId: string; + taskKey: string; + cached: boolean; +} diff --git a/packages/pgconductor-js-instrumentation/src/version.ts b/packages/pgconductor-js-instrumentation/src/version.ts new file mode 100644 index 0000000..52c905c --- /dev/null +++ b/packages/pgconductor-js-instrumentation/src/version.ts @@ -0,0 +1 @@ +export const VERSION = "0.1.0"; diff --git a/packages/pgconductor-js-instrumentation/tests/context-propagation.test.ts b/packages/pgconductor-js-instrumentation/tests/context-propagation.test.ts new file mode 100644 index 0000000..5902854 --- /dev/null +++ b/packages/pgconductor-js-instrumentation/tests/context-propagation.test.ts @@ -0,0 +1,268 @@ +import { describe, test, expect, beforeAll, afterAll, beforeEach, afterEach } from "bun:test"; +import { SpanKind, trace, context } from "@opentelemetry/api"; +import { + setupOtel, + cleanupOtel, + resetSpans, + getSpans, + findSpanByName, + type TestContext, +} from "./setup"; +import { TestDatabasePool, type TestDatabase } from "./fixtures/test-database"; +import { Conductor, Orchestrator, TaskSchemas } from "pgconductor-js"; +import { z } from "zod"; + +const testTaskDefinitions = [ + { + name: "simple-task", + queue: "default", + payload: z.object({ value: z.number() }), + returns: z.number(), + }, + { + name: "parent-task", + queue: "default", + payload: z.object({ value: z.number() }), + returns: z.number(), + }, + { + name: "child-task", + queue: "default", + payload: z.object({ value: z.number() }), + returns: z.number(), + }, +] as const; + +describe("Context Propagation", () => { + let otelCtx: TestContext; + let pool: TestDatabasePool; + let db: TestDatabase; + + beforeAll(async () => { + otelCtx = setupOtel({ propagateContext: true }); + pool = await TestDatabasePool.create(); + }, 60000); + + afterAll(async () => { + await cleanupOtel(otelCtx); + await pool?.destroy(); + }); + + beforeEach(async () => { + db = await pool.child(); + }); + + afterEach(async () => { + resetSpans(otelCtx); + await db?.destroy(); + }); + + test("invoke() creates producer span with trace context injection", async () => { + const conductor = Conductor.create({ + sql: db.sql, + tasks: TaskSchemas.fromSchema(testTaskDefinitions), + context: {}, + }); + + conductor.createTask({ name: "simple-task" }, { invocable: true }, async (event) => { + return event.payload.value * 2; + }); + + await conductor.ensureInstalled(); + + // Create a parent span to test context propagation + const tracer = trace.getTracer("test"); + const parentSpan = tracer.startSpan("test-parent"); + + await context.with(trace.setSpan(context.active(), parentSpan), async () => { + await conductor.invoke({ name: "simple-task" }, { value: 42 }); + }); + + parentSpan.end(); + + const spans = getSpans(otelCtx); + const publishSpan = findSpanByName(spans, "default simple-task publish"); + + expect(publishSpan).toBeDefined(); + expect(publishSpan?.kind).toBe(SpanKind.PRODUCER); + + // Verify the publish span has a valid trace ID + expect(publishSpan?.spanContext().traceId).toBeDefined(); + expect(publishSpan?.spanContext().traceId.length).toBe(32); + }); + + test("execute() creates consumer span linked to producer", async () => { + const conductor = Conductor.create({ + sql: db.sql, + tasks: TaskSchemas.fromSchema(testTaskDefinitions), + context: {}, + }); + + const simpleTask = conductor.createTask( + { name: "simple-task" }, + { invocable: true }, + async (event) => { + return event.payload.value * 2; + }, + ); + + await conductor.ensureInstalled(); + + const worker = conductor.createWorker({ + queue: "default", + tasks: [simpleTask], + config: { pollIntervalMs: 50 }, + }); + + const orchestrator = Orchestrator.create({ + conductor, + workers: [worker], + }); + + await orchestrator.start(); + + // Invoke task + await conductor.invoke({ name: "simple-task" }, { value: 42 }); + + // Wait for processing + await new Promise((resolve) => setTimeout(resolve, 500)); + + await orchestrator.stop(); + + const spans = getSpans(otelCtx); + const publishSpan = findSpanByName(spans, "default simple-task publish"); + const processSpan = findSpanByName(spans, "default simple-task process"); + + expect(publishSpan).toBeDefined(); + expect(processSpan).toBeDefined(); + + // Consumer span should have links to producer span (not parent-child) + const links = processSpan?.links || []; + expect(links.length).toBeGreaterThan(0); + + // Verify the link points to the producer span's trace + expect(links[0]?.context.traceId).toBe(publishSpan?.spanContext().traceId); + }); + + test("ctx.invoke() creates internal span for child invocation", async () => { + const conductor = Conductor.create({ + sql: db.sql, + tasks: TaskSchemas.fromSchema(testTaskDefinitions), + context: {}, + }); + + const childTask = conductor.createTask( + { name: "child-task" }, + { invocable: true }, + async (event) => { + return event.payload.value * 3; + }, + ); + + const parentTask = conductor.createTask( + { name: "parent-task" }, + { invocable: true }, + async (event, ctx) => { + const childResult = await ctx.invoke( + "invoke-child", + { name: "child-task" }, + { value: event.payload.value }, + ); + return childResult; + }, + ); + + await conductor.ensureInstalled(); + + const worker = conductor.createWorker({ + queue: "default", + tasks: [parentTask, childTask], + config: { pollIntervalMs: 50 }, + }); + + const orchestrator = Orchestrator.create({ + conductor, + workers: [worker], + }); + + // Set fake time to speed up child execution + await db.client.setFakeTime({ date: new Date() }); + + await orchestrator.start(); + await conductor.invoke({ name: "parent-task" }, { value: 10 }); + + // Wait for parent to start and invoke child + await new Promise((resolve) => setTimeout(resolve, 300)); + + // Advance time to allow child to complete and parent to resume + for (let i = 0; i < 5; i++) { + await new Promise((resolve) => setTimeout(resolve, 100)); + const newTime = new Date(Date.now() + 1000 * (i + 1)); + await db.client.setFakeTime({ date: newTime }); + } + + await orchestrator.stop(); + await db.client.clearFakeTime(); + + const spans = getSpans(otelCtx); + + // Find parent process span and the ctx.invoke internal span + const parentProcessSpan = findSpanByName(spans, "default parent-task process"); + const invokeSpan = findSpanByName(spans, "parent-task invoke child-task"); + + expect(parentProcessSpan).toBeDefined(); + expect(invokeSpan).toBeDefined(); + + // The invoke span should be INTERNAL kind (internal operation within task) + expect(invokeSpan!.kind).toBe(SpanKind.INTERNAL); + + // The invoke span should be part of the same trace as the parent process span + expect(invokeSpan!.spanContext().traceId).toBe(parentProcessSpan!.spanContext().traceId); + }); + + test("no link created when propagateContext=false", async () => { + // Create new context with propagation disabled + const noPropCtx = setupOtel({ propagateContext: false }); + + const conductor = Conductor.create({ + sql: db.sql, + tasks: TaskSchemas.fromSchema(testTaskDefinitions), + context: {}, + }); + + const simpleTask = conductor.createTask( + { name: "simple-task" }, + { invocable: true }, + async (event) => { + return event.payload.value * 2; + }, + ); + + await conductor.ensureInstalled(); + + const worker = conductor.createWorker({ + queue: "default", + tasks: [simpleTask], + config: { pollIntervalMs: 50 }, + }); + + const orchestrator = Orchestrator.create({ + conductor, + workers: [worker], + }); + + await orchestrator.start(); + await conductor.invoke({ name: "simple-task" }, { value: 42 }); + await new Promise((resolve) => setTimeout(resolve, 500)); + await orchestrator.stop(); + + const spans = getSpans(noPropCtx); + const processSpan = findSpanByName(spans, "default simple-task process"); + + // When propagation is disabled, there should be no links + const links = processSpan?.links || []; + expect(links.length).toBe(0); + + await cleanupOtel(noPropCtx); + }); +}); diff --git a/packages/pgconductor-js-instrumentation/tests/error-handling.test.ts b/packages/pgconductor-js-instrumentation/tests/error-handling.test.ts new file mode 100644 index 0000000..b962497 --- /dev/null +++ b/packages/pgconductor-js-instrumentation/tests/error-handling.test.ts @@ -0,0 +1,251 @@ +import { describe, test, expect, beforeAll, afterAll, beforeEach, afterEach } from "bun:test"; +import { SpanStatusCode } from "@opentelemetry/api"; +import { + setupOtel, + cleanupOtel, + resetSpans, + getSpans, + findSpanByName, + getSpanAttribute, + type TestContext, +} from "./setup"; +import * as SemanticConventions from "../src/semantic-conventions"; +import { TestDatabasePool, type TestDatabase } from "./fixtures/test-database"; +import { Conductor, Orchestrator, TaskSchemas } from "pgconductor-js"; +import { z } from "zod"; + +const testTaskDefinitions = [ + { + name: "error-task", + queue: "default", + payload: z.object({}), + returns: z.object({ done: z.boolean() }), + }, + { + name: "retry-task", + queue: "default", + payload: z.object({ failCount: z.number() }), + returns: z.string(), + }, +] as const; + +describe("Error Handling", () => { + let otelCtx: TestContext; + let pool: TestDatabasePool; + let db: TestDatabase; + + beforeAll(async () => { + otelCtx = setupOtel(); + pool = await TestDatabasePool.create(); + }, 60000); + + afterAll(async () => { + await cleanupOtel(otelCtx); + await pool?.destroy(); + }); + + beforeEach(async () => { + db = await pool.child(); + }); + + afterEach(async () => { + resetSpans(otelCtx); + await db?.destroy(); + }); + + test("failed task records exception on span", async () => { + const conductor = Conductor.create({ + sql: db.sql, + tasks: TaskSchemas.fromSchema(testTaskDefinitions), + context: {}, + }); + + const errorTask = conductor.createTask( + { name: "error-task" }, + { invocable: true }, + async () => { + throw new Error("Intentional test error"); + }, + ); + + await conductor.ensureInstalled(); + + const worker = conductor.createWorker({ + queue: "default", + tasks: [errorTask], + config: { pollIntervalMs: 50 }, + }); + + const orchestrator = Orchestrator.create({ + conductor, + workers: [worker], + }); + + await orchestrator.start(); + await conductor.invoke({ name: "error-task" }, {}); + await new Promise((resolve) => setTimeout(resolve, 500)); + await orchestrator.stop(); + + const spans = getSpans(otelCtx); + const processSpan = findSpanByName(spans, "default error-task process"); + + expect(processSpan).toBeDefined(); + expect(processSpan?.status.code).toBe(SpanStatusCode.ERROR); + + // Check error attributes + const errorMsg = getSpanAttribute(processSpan!, SemanticConventions.PGCONDUCTOR_ERROR_MESSAGE); + expect(errorMsg).toContain("Intentional test error"); + }); + + test("retryable error has error.retryable=true", async () => { + const conductor = Conductor.create({ + sql: db.sql, + tasks: TaskSchemas.fromSchema(testTaskDefinitions), + context: {}, + }); + + let attempts = 0; + const retryTask = conductor.createTask( + { name: "retry-task" }, + { invocable: true }, + async (event) => { + attempts++; + if (attempts < event.payload.failCount) { + throw new Error(`Attempt ${attempts} failed`); + } + return `Success on attempt ${attempts}`; + }, + ); + + await conductor.ensureInstalled(); + + const worker = conductor.createWorker({ + queue: "default", + tasks: [retryTask], + config: { pollIntervalMs: 50 }, + }); + + const orchestrator = Orchestrator.create({ + conductor, + workers: [worker], + }); + + await orchestrator.start(); + await conductor.invoke({ name: "retry-task" }, { failCount: 3 }); + await new Promise((resolve) => setTimeout(resolve, 500)); + await orchestrator.stop(); + + const spans = getSpans(otelCtx); + const processSpans = spans.filter((s) => s.name === "default retry-task process"); + + // First execution should fail with retryable=true + expect(processSpans.length).toBeGreaterThan(0); + const failedSpan = processSpans.find((s) => s.status.code === SpanStatusCode.ERROR); + expect(failedSpan).toBeDefined(); + const retryable = getSpanAttribute( + failedSpan!, + SemanticConventions.PGCONDUCTOR_ERROR_RETRYABLE, + ); + expect(retryable).toBe(true); + }); + + test("error spans record error attributes", async () => { + const conductor = Conductor.create({ + sql: db.sql, + tasks: TaskSchemas.fromSchema(testTaskDefinitions), + context: {}, + }); + + // Task that always fails + const errorTask = conductor.createTask( + { name: "error-task" }, + { invocable: true }, + async () => { + throw new Error("Always fails"); + }, + ); + + await conductor.ensureInstalled(); + + const worker = conductor.createWorker({ + queue: "default", + tasks: [errorTask], + config: { pollIntervalMs: 50 }, + }); + + const orchestrator = Orchestrator.create({ + conductor, + workers: [worker], + }); + + await orchestrator.start(); + await conductor.invoke({ name: "error-task" }, {}); + await new Promise((resolve) => setTimeout(resolve, 500)); + await orchestrator.stop(); + + const spans = getSpans(otelCtx); + const processSpans = spans.filter((s) => s.name === "default error-task process"); + + // Should have at least one error span + expect(processSpans.length).toBeGreaterThan(0); + + // Find an error span + const errorSpan = processSpans.find((s) => s.status.code === SpanStatusCode.ERROR); + expect(errorSpan).toBeDefined(); + + // Error span should have error message attribute + const errorMsg = getSpanAttribute(errorSpan!, SemanticConventions.PGCONDUCTOR_ERROR_MESSAGE); + expect(errorMsg).toContain("Always fails"); + + // Error span should have retryable attribute (true for first attempts) + const retryable = getSpanAttribute(errorSpan!, SemanticConventions.PGCONDUCTOR_ERROR_RETRYABLE); + expect(retryable).toBeDefined(); + }); + + test("cancelled task has appropriate status", async () => { + const conductor = Conductor.create({ + sql: db.sql, + tasks: TaskSchemas.fromSchema(testTaskDefinitions), + context: {}, + }); + + // Task that takes a while + const errorTask = conductor.createTask( + { name: "error-task" }, + { invocable: true }, + async (_event, ctx) => { + await ctx.sleep("long-sleep", 60000); // 1 minute sleep + return { done: true }; + }, + ); + + await conductor.ensureInstalled(); + + const worker = conductor.createWorker({ + queue: "default", + tasks: [errorTask], + config: { pollIntervalMs: 50 }, + }); + + const orchestrator = Orchestrator.create({ + conductor, + workers: [worker], + }); + + await orchestrator.start(); + const executionId = await conductor.invoke({ name: "error-task" }, {}); + + // Wait for task to start + await new Promise((resolve) => setTimeout(resolve, 200)); + + // Cancel the execution + await conductor.cancel(executionId as string); + + await new Promise((resolve) => setTimeout(resolve, 300)); + await orchestrator.stop(); + + // Verify cancel was called (the span should reflect the operation) + const spans = getSpans(otelCtx); + expect(spans.length).toBeGreaterThan(0); + }); +}); diff --git a/packages/pgconductor-js-instrumentation/tests/fixtures/test-database.ts b/packages/pgconductor-js-instrumentation/tests/fixtures/test-database.ts new file mode 100644 index 0000000..f533b29 --- /dev/null +++ b/packages/pgconductor-js-instrumentation/tests/fixtures/test-database.ts @@ -0,0 +1,146 @@ +import postgres, { type Sql } from "postgres"; +import { GenericContainer, type StartedTestContainer, Wait } from "testcontainers"; + +export class TestDatabase { + public readonly sql: Sql; + public readonly name: string; + public readonly url: string; + private readonly masterUrl: string; + + private constructor(sql: Sql, name: string, masterUrl: string, url: string) { + this.sql = sql; + this.name = name; + this.masterUrl = masterUrl; + this.url = url; + } + + /** + * Helper object for time manipulation in tests. + */ + public readonly client = { + setFakeTime: async ({ date }: { date: Date }): Promise => { + await this.sql.unsafe(`set pgconductor.fake_now = '${date.toISOString()}'`); + }, + clearFakeTime: async (): Promise => { + await this.sql.unsafe(`set pgconductor.fake_now = ''`); + }, + }; + + static async create(masterUrl: string): Promise { + const name = `pgc_test_${crypto.randomUUID().replace(/-/g, "_")}`; + + const master = postgres(masterUrl, { max: 1 }); + + try { + await master.unsafe(`CREATE DATABASE ${name}`); + } finally { + await master.end(); + } + + const testDbUrl = masterUrl.replace(/\/[^/]+$/, `/${name}`); + // Use max: 1 to ensure only one connection, making fake time work reliably + const sql = postgres(testDbUrl, { max: 1 }); + + await sql`CREATE EXTENSION IF NOT EXISTS "uuid-ossp"`; + + return new TestDatabase(sql, name, masterUrl, testDbUrl); + } + + async close(): Promise { + await this.sql.end(); + } + + async destroy(): Promise { + await this.sql.end(); + + const master = postgres(this.masterUrl, { max: 1 }); + try { + await master.unsafe(`DROP DATABASE IF EXISTS ${this.name}`); + } finally { + await master.end(); + } + } +} + +export class TestDatabasePool { + private readonly container: StartedTestContainer | null; + private readonly masterUrl: string; + private readonly children: TestDatabase[] = []; + + private constructor(container: StartedTestContainer | null, masterUrl: string) { + this.container = container; + this.masterUrl = masterUrl; + } + + static async create(): Promise { + // If DATABASE_URL is set (e.g., in CI), use it instead of starting a container + const databaseUrl = process.env.DATABASE_URL; + + if (databaseUrl) { + // CI mode: use existing Postgres service + const testSql = postgres(databaseUrl, { max: 1, connect_timeout: 5 }); + try { + await testSql`SELECT 1`; + } finally { + await testSql.end(); + } + + // Return pool without container (container will be null) + return new TestDatabasePool(null, databaseUrl); + } + + // Local mode: start testcontainer + const containerConfig = new GenericContainer("postgres:15") + .withEnvironment({ + POSTGRES_PASSWORD: "postgres", + POSTGRES_USER: "postgres", + POSTGRES_DB: "postgres", + }) + .withCommand(["postgres", "-c", "wal_level=logical"]) + .withExposedPorts(5432) + .withWaitStrategy(Wait.forLogMessage(/database system is ready to accept connections/, 2)); + + const container = await containerConfig.start(); + + const host = container.getHost(); + const port = container.getMappedPort(5432); + const masterUrl = `postgres://postgres:postgres@${host}:${port}/postgres`; + + const testSql = postgres(masterUrl, { max: 1, connect_timeout: 5 }); + try { + await testSql`SELECT 1`; + } finally { + await testSql.end(); + } + + return new TestDatabasePool(container, masterUrl); + } + + async child(): Promise { + const db = await TestDatabase.create(this.masterUrl); + this.children.push(db); + return db; + } + + async destroy(): Promise { + // Close all child connections + await Promise.all(this.children.map((child) => child.sql.end())); + + // Drop all databases + const master = postgres(this.masterUrl, { max: 1 }); + try { + await Promise.all( + this.children.map((child) => master.unsafe(`DROP DATABASE IF EXISTS ${child.name}`)), + ); + } finally { + await master.end(); + } + + this.children.length = 0; + + // Stop container if running in local mode + if (this.container) { + await this.container.stop(); + } + } +} diff --git a/packages/pgconductor-js-instrumentation/tests/fixtures/test-tasks.ts b/packages/pgconductor-js-instrumentation/tests/fixtures/test-tasks.ts new file mode 100644 index 0000000..baff4fa --- /dev/null +++ b/packages/pgconductor-js-instrumentation/tests/fixtures/test-tasks.ts @@ -0,0 +1,49 @@ +import { z } from "zod"; + +/** + * Simple task definitions for testing. + */ +export const testTaskDefinitions = [ + { + name: "simple-task", + queue: "default", + payload: z.object({ value: z.number() }), + returns: z.number(), + }, + { + name: "step-task", + queue: "default", + payload: z.object({ input: z.string() }), + returns: z.string(), + }, + { + name: "sleep-task", + queue: "default", + payload: z.object({}), + returns: z.void(), + }, + { + name: "invoke-child-task", + queue: "default", + payload: z.object({ childValue: z.number() }), + returns: z.number(), + }, + { + name: "child-task", + queue: "default", + payload: z.object({ value: z.number() }), + returns: z.number(), + }, + { + name: "error-task", + queue: "default", + payload: z.object({}), + returns: z.void(), + }, + { + name: "retry-task", + queue: "default", + payload: z.object({ failCount: z.number() }), + returns: z.string(), + }, +] as const; diff --git a/packages/pgconductor-js-instrumentation/tests/instrumentation.test.ts b/packages/pgconductor-js-instrumentation/tests/instrumentation.test.ts new file mode 100644 index 0000000..5ff5902 --- /dev/null +++ b/packages/pgconductor-js-instrumentation/tests/instrumentation.test.ts @@ -0,0 +1,274 @@ +import { describe, test, expect, beforeAll, afterAll, beforeEach, afterEach } from "bun:test"; +import { SpanKind, SpanStatusCode } from "@opentelemetry/api"; +import { + setupOtel, + cleanupOtel, + resetSpans, + getSpans, + findSpanByName, + getSpanAttribute, + type TestContext, +} from "./setup"; +import * as SemanticConventions from "../src/semantic-conventions"; +import { TestDatabasePool, type TestDatabase } from "./fixtures/test-database"; +import { Conductor, Orchestrator, TaskSchemas } from "pgconductor-js"; +import { z } from "zod"; + +const testTaskDefinitions = [ + { + name: "simple-task", + queue: "default", + payload: z.object({ value: z.number() }), + returns: z.number(), + }, +] as const; + +describe("PgConductorInstrumentation", () => { + let otelCtx: TestContext; + let pool: TestDatabasePool; + let db: TestDatabase; + + beforeAll(async () => { + otelCtx = setupOtel(); + pool = await TestDatabasePool.create(); + }, 60000); + + afterAll(async () => { + await cleanupOtel(otelCtx); + await pool?.destroy(); + }); + + beforeEach(async () => { + db = await pool.child(); + }); + + afterEach(async () => { + resetSpans(otelCtx); + await db?.destroy(); + }); + + test("registers instrumentation without error", () => { + expect(otelCtx.instrumentation).toBeDefined(); + expect(otelCtx.instrumentation.instrumentationName).toBe( + SemanticConventions.INSTRUMENTATION_NAME, + ); + }); + + test("can be enabled and disabled", () => { + otelCtx.instrumentation.disable(); + otelCtx.instrumentation.enable(); + expect(otelCtx.instrumentation).toBeDefined(); + }); + + test("creates producer span with correct attributes", async () => { + const conductor = Conductor.create({ + sql: db.sql, + tasks: TaskSchemas.fromSchema(testTaskDefinitions), + context: {}, + }); + + conductor.createTask({ name: "simple-task" }, { invocable: true }, async (event) => { + if (event.name === "pgconductor.invoke") { + return event.payload.value * 2; + } + return 0; + }); + + await conductor.ensureInstalled(); + + await conductor.invoke({ name: "simple-task" }, { value: 42 }); + + const spans = getSpans(otelCtx); + const publishSpan = findSpanByName(spans, "default simple-task publish"); + + expect(publishSpan).toBeDefined(); + expect(publishSpan?.kind).toBe(SpanKind.PRODUCER); + expect(getSpanAttribute(publishSpan!, SemanticConventions.MESSAGING_SYSTEM)).toBe( + SemanticConventions.MESSAGING_SYSTEM_VALUE, + ); + expect(getSpanAttribute(publishSpan!, SemanticConventions.MESSAGING_OPERATION)).toBe( + SemanticConventions.MESSAGING_OPERATION_PUBLISH, + ); + expect(getSpanAttribute(publishSpan!, SemanticConventions.PGCONDUCTOR_TASK_NAME)).toBe( + "simple-task", + ); + expect(getSpanAttribute(publishSpan!, SemanticConventions.PGCONDUCTOR_TASK_QUEUE)).toBe( + "default", + ); + }); + + test("sets execution ID on span", async () => { + const conductor = Conductor.create({ + sql: db.sql, + tasks: TaskSchemas.fromSchema(testTaskDefinitions), + context: {}, + }); + + conductor.createTask({ name: "simple-task" }, { invocable: true }, async (event) => { + if (event.name === "pgconductor.invoke") { + return event.payload.value * 2; + } + return 0; + }); + + await conductor.ensureInstalled(); + + const executionId = await conductor.invoke({ name: "simple-task" }, { value: 1 }); + + const spans = getSpans(otelCtx); + const publishSpan = findSpanByName(spans, "default simple-task publish"); + + expect(publishSpan).toBeDefined(); + expect(getSpanAttribute(publishSpan!, SemanticConventions.PGCONDUCTOR_EXECUTION_ID)).toBe( + executionId, + ); + }); + + test("sets span status to OK on success", async () => { + const conductor = Conductor.create({ + sql: db.sql, + tasks: TaskSchemas.fromSchema(testTaskDefinitions), + context: {}, + }); + + conductor.createTask({ name: "simple-task" }, { invocable: true }, async (event) => { + if (event.name === "pgconductor.invoke") { + return event.payload.value * 2; + } + return 0; + }); + + await conductor.ensureInstalled(); + + await conductor.invoke({ name: "simple-task" }, { value: 1 }); + + const spans = getSpans(otelCtx); + const publishSpan = findSpanByName(spans, "default simple-task publish"); + + expect(publishSpan).toBeDefined(); + expect(publishSpan?.status.code).toBe(SpanStatusCode.OK); + }); + + test("creates consumer span with correct attributes", async () => { + const conductor = Conductor.create({ + sql: db.sql, + tasks: TaskSchemas.fromSchema(testTaskDefinitions), + context: {}, + }); + + const simpleTask = conductor.createTask( + { name: "simple-task" }, + { invocable: true }, + async (event) => { + if (event.name === "pgconductor.invoke") { + return event.payload.value * 2; + } + return 0; + }, + ); + + await conductor.ensureInstalled(); + + const worker = conductor.createWorker({ + queue: "default", + tasks: [simpleTask], + config: { pollIntervalMs: 50 }, + }); + + const orchestrator = Orchestrator.create({ + conductor, + workers: [worker], + }); + + await orchestrator.start(); + await conductor.invoke({ name: "simple-task" }, { value: 5 }); + await new Promise((resolve) => setTimeout(resolve, 500)); + await orchestrator.stop(); + + const spans = getSpans(otelCtx); + const processSpan = findSpanByName(spans, "default simple-task process"); + + expect(processSpan).toBeDefined(); + expect(processSpan?.kind).toBe(SpanKind.CONSUMER); + expect(getSpanAttribute(processSpan!, SemanticConventions.MESSAGING_SYSTEM)).toBe( + SemanticConventions.MESSAGING_SYSTEM_VALUE, + ); + expect(getSpanAttribute(processSpan!, SemanticConventions.MESSAGING_OPERATION)).toBe( + SemanticConventions.MESSAGING_OPERATION_PROCESS, + ); + }); + + test("sets execution status attribute", async () => { + const conductor = Conductor.create({ + sql: db.sql, + tasks: TaskSchemas.fromSchema(testTaskDefinitions), + context: {}, + }); + + const simpleTask = conductor.createTask( + { name: "simple-task" }, + { invocable: true }, + async (event) => { + if (event.name === "pgconductor.invoke") { + return event.payload.value * 2; + } + return 0; + }, + ); + + await conductor.ensureInstalled(); + + const worker = conductor.createWorker({ + queue: "default", + tasks: [simpleTask], + config: { pollIntervalMs: 50 }, + }); + + const orchestrator = Orchestrator.create({ + conductor, + workers: [worker], + }); + + await orchestrator.start(); + await conductor.invoke({ name: "simple-task" }, { value: 10 }); + await new Promise((resolve) => setTimeout(resolve, 500)); + await orchestrator.stop(); + + const spans = getSpans(otelCtx); + const processSpan = findSpanByName(spans, "default simple-task process"); + + expect(processSpan).toBeDefined(); + expect(getSpanAttribute(processSpan!, SemanticConventions.PGCONDUCTOR_EXECUTION_STATUS)).toBe( + "completed", + ); + }); + + test("respects requireParentSpan config", async () => { + // Create conductor with requireParentSpan + const strictCtx = setupOtel({ requireParentSpan: true }); + + const conductor = Conductor.create({ + sql: db.sql, + tasks: TaskSchemas.fromSchema(testTaskDefinitions), + context: {}, + }); + + conductor.createTask({ name: "simple-task" }, { invocable: true }, async (event) => { + if (event.name === "pgconductor.invoke") { + return event.payload.value * 2; + } + return 0; + }); + + await conductor.ensureInstalled(); + + // Invoke without parent span - should not create span + await conductor.invoke({ name: "simple-task" }, { value: 1 }); + + const spans = getSpans(strictCtx); + // With requireParentSpan=true, no span should be created without a parent + expect(spans.length).toBe(0); + + await cleanupOtel(strictCtx); + }); +}); diff --git a/packages/pgconductor-js-instrumentation/tests/setup.ts b/packages/pgconductor-js-instrumentation/tests/setup.ts new file mode 100644 index 0000000..cee2184 --- /dev/null +++ b/packages/pgconductor-js-instrumentation/tests/setup.ts @@ -0,0 +1,93 @@ +import { NodeTracerProvider } from "@opentelemetry/sdk-trace-node"; +import { + InMemorySpanExporter, + SimpleSpanProcessor, + type ReadableSpan, +} from "@opentelemetry/sdk-trace-base"; +import { PgConductorInstrumentation, type PgConductorInstrumentationConfig } from "../src"; +import * as pgconductorJs from "pgconductor-js"; + +export interface TestContext { + spanExporter: InMemorySpanExporter; + instrumentation: PgConductorInstrumentation; + tracerProvider: NodeTracerProvider; +} + +/** + * Set up OpenTelemetry SDK for testing. + * Manually patches pgconductor-js modules since Bun doesn't support Node.js module hooks. + */ +export function setupOtel(config?: PgConductorInstrumentationConfig): TestContext { + const spanExporter = new InMemorySpanExporter(); + + const tracerProvider = new NodeTracerProvider(); + tracerProvider.addSpanProcessor(new SimpleSpanProcessor(spanExporter)); + tracerProvider.register(); + + const instrumentation = new PgConductorInstrumentation(config); + instrumentation.setTracerProvider(tracerProvider); + instrumentation.enable(); + + // Manually patch modules for Bun compatibility + // Node.js module hooking doesn't work in Bun + // eslint-disable-next-line @typescript-eslint/no-explicit-any + instrumentation.patchModule(pgconductorJs as any); + + return { + spanExporter, + instrumentation, + tracerProvider, + }; +} + +/** + * Clean up OpenTelemetry SDK. + */ +export async function cleanupOtel(ctx: TestContext): Promise { + // Manually unpatch modules + // eslint-disable-next-line @typescript-eslint/no-explicit-any + ctx.instrumentation.unpatchModule(pgconductorJs as any); + ctx.instrumentation.disable(); + await ctx.tracerProvider.shutdown(); +} + +/** + * Reset span exporter for next test. + */ +export function resetSpans(ctx: TestContext): void { + ctx.spanExporter.reset(); +} + +/** + * Get finished spans. + */ +export function getSpans(ctx: TestContext): ReadableSpan[] { + return ctx.spanExporter.getFinishedSpans(); +} + +/** + * Find span by name. + */ +export function findSpanByName(spans: ReadableSpan[], name: string): ReadableSpan | undefined { + return spans.find((s) => s.name === name); +} + +/** + * Find spans by name pattern. + */ +export function findSpansByPattern(spans: ReadableSpan[], pattern: RegExp): ReadableSpan[] { + return spans.filter((s) => pattern.test(s.name)); +} + +/** + * Get attribute value from span. + */ +export function getSpanAttribute( + span: ReadableSpan, + key: string, +): string | number | boolean | undefined { + const attr = span.attributes[key]; + if (attr === undefined) return undefined; + if (Array.isArray(attr)) return attr[0] as string | number | boolean; + return attr as string | number | boolean; +} diff --git a/packages/pgconductor-js-instrumentation/tests/step-spans.test.ts b/packages/pgconductor-js-instrumentation/tests/step-spans.test.ts new file mode 100644 index 0000000..facf560 --- /dev/null +++ b/packages/pgconductor-js-instrumentation/tests/step-spans.test.ts @@ -0,0 +1,288 @@ +import { describe, test, expect, beforeAll, afterAll, beforeEach, afterEach } from "bun:test"; +import { SpanKind, SpanStatusCode } from "@opentelemetry/api"; +import { + setupOtel, + cleanupOtel, + resetSpans, + getSpans, + findSpanByName, + findSpansByPattern, + getSpanAttribute, + type TestContext, +} from "./setup"; +import * as SemanticConventions from "../src/semantic-conventions"; +import { TestDatabasePool, type TestDatabase } from "./fixtures/test-database"; +import { Conductor, Orchestrator, TaskSchemas } from "pgconductor-js"; +import { z } from "zod"; + +const testTaskDefinitions = [ + { + name: "step-task", + queue: "default", + payload: z.object({ input: z.string() }), + returns: z.string(), + }, + { + name: "sleep-task", + queue: "default", + payload: z.object({}), + returns: z.object({ completed: z.boolean() }), + }, +] as const; + +describe("Step Spans", () => { + let otelCtx: TestContext; + let pool: TestDatabasePool; + let db: TestDatabase; + + beforeAll(async () => { + otelCtx = setupOtel(); + pool = await TestDatabasePool.create(); + }, 60000); + + afterAll(async () => { + await cleanupOtel(otelCtx); + await pool?.destroy(); + }); + + beforeEach(async () => { + db = await pool.child(); + }); + + afterEach(async () => { + resetSpans(otelCtx); + await db?.destroy(); + }); + + test("step() creates span with step.name attribute", async () => { + const conductor = Conductor.create({ + sql: db.sql, + tasks: TaskSchemas.fromSchema(testTaskDefinitions), + context: {}, + }); + + const stepTask = conductor.createTask( + { name: "step-task" }, + { invocable: true }, + async (event, ctx) => { + const result = await ctx.step("process-input", async () => { + return `processed: ${event.payload.input}`; + }); + return result; + }, + ); + + await conductor.ensureInstalled(); + + const worker = conductor.createWorker({ + queue: "default", + tasks: [stepTask], + config: { pollIntervalMs: 50 }, + }); + + const orchestrator = Orchestrator.create({ + conductor, + workers: [worker], + }); + + await orchestrator.start(); + await conductor.invoke({ name: "step-task" }, { input: "hello" }); + await new Promise((resolve) => setTimeout(resolve, 500)); + await orchestrator.stop(); + + const spans = getSpans(otelCtx); + const stepSpan = findSpanByName(spans, "step-task step process-input"); + + expect(stepSpan).toBeDefined(); + expect(stepSpan?.kind).toBe(SpanKind.INTERNAL); + expect(getSpanAttribute(stepSpan!, SemanticConventions.PGCONDUCTOR_STEP_NAME)).toBe( + "process-input", + ); + expect(getSpanAttribute(stepSpan!, SemanticConventions.PGCONDUCTOR_TASK_NAME)).toBe( + "step-task", + ); + }); + + test("uncached step has step.cached=false", async () => { + const conductor = Conductor.create({ + sql: db.sql, + tasks: TaskSchemas.fromSchema(testTaskDefinitions), + context: {}, + }); + + const stepTask = conductor.createTask( + { name: "step-task" }, + { invocable: true }, + async (event, ctx) => { + const result = await ctx.step("first-run", async () => { + return `result: ${event.payload.input}`; + }); + return result; + }, + ); + + await conductor.ensureInstalled(); + + const worker = conductor.createWorker({ + queue: "default", + tasks: [stepTask], + config: { pollIntervalMs: 50 }, + }); + + const orchestrator = Orchestrator.create({ + conductor, + workers: [worker], + }); + + await orchestrator.start(); + await conductor.invoke({ name: "step-task" }, { input: "test" }); + await new Promise((resolve) => setTimeout(resolve, 500)); + await orchestrator.stop(); + + const spans = getSpans(otelCtx); + const stepSpan = findSpanByName(spans, "step-task step first-run"); + + expect(stepSpan).toBeDefined(); + // On first execution, step is not cached + expect(getSpanAttribute(stepSpan!, SemanticConventions.PGCONDUCTOR_STEP_CACHED)).toBe(false); + }); + + test("sleep() creates span with step.name attribute", async () => { + const conductor = Conductor.create({ + sql: db.sql, + tasks: TaskSchemas.fromSchema(testTaskDefinitions), + context: {}, + }); + + const sleepTask = conductor.createTask( + { name: "sleep-task" }, + { invocable: true }, + async (_event, ctx) => { + await ctx.sleep("wait-period", 100); + return { completed: true }; + }, + ); + + await conductor.ensureInstalled(); + + const worker = conductor.createWorker({ + queue: "default", + tasks: [sleepTask], + config: { pollIntervalMs: 50 }, + }); + + const orchestrator = Orchestrator.create({ + conductor, + workers: [worker], + }); + + await orchestrator.start(); + await conductor.invoke({ name: "sleep-task" }, {}); + await new Promise((resolve) => setTimeout(resolve, 500)); + await orchestrator.stop(); + + const spans = getSpans(otelCtx); + const sleepSpan = findSpanByName(spans, "sleep-task sleep"); + + expect(sleepSpan).toBeDefined(); + expect(sleepSpan?.kind).toBe(SpanKind.INTERNAL); + expect(getSpanAttribute(sleepSpan!, SemanticConventions.PGCONDUCTOR_STEP_NAME)).toBe( + "wait-period", + ); + }); + + test("nested steps create nested spans", async () => { + const conductor = Conductor.create({ + sql: db.sql, + tasks: TaskSchemas.fromSchema(testTaskDefinitions), + context: {}, + }); + + const stepTask = conductor.createTask( + { name: "step-task" }, + { invocable: true }, + async (event, ctx) => { + const step1 = await ctx.step("step-1", async () => { + return `first: ${event.payload.input}`; + }); + const step2 = await ctx.step("step-2", async () => { + return `second: ${step1}`; + }); + return step2; + }, + ); + + await conductor.ensureInstalled(); + + const worker = conductor.createWorker({ + queue: "default", + tasks: [stepTask], + config: { pollIntervalMs: 50 }, + }); + + const orchestrator = Orchestrator.create({ + conductor, + workers: [worker], + }); + + await orchestrator.start(); + await conductor.invoke({ name: "step-task" }, { input: "nested" }); + await new Promise((resolve) => setTimeout(resolve, 500)); + await orchestrator.stop(); + + const spans = getSpans(otelCtx); + const stepSpans = findSpansByPattern(spans, /step-task step/); + + expect(stepSpans.length).toBe(2); + + const step1Span = findSpanByName(spans, "step-task step step-1"); + const step2Span = findSpanByName(spans, "step-task step step-2"); + + expect(step1Span).toBeDefined(); + expect(step2Span).toBeDefined(); + }); + + test("step error recorded on span", async () => { + const conductor = Conductor.create({ + sql: db.sql, + tasks: TaskSchemas.fromSchema(testTaskDefinitions), + context: {}, + }); + + const stepTask = conductor.createTask( + { name: "step-task" }, + { invocable: true }, + async (_event, ctx) => { + await ctx.step("error-step", async () => { + throw new Error("Step failed"); + }); + return "never reached"; + }, + ); + + await conductor.ensureInstalled(); + + const worker = conductor.createWorker({ + queue: "default", + tasks: [stepTask], + config: { pollIntervalMs: 50 }, + }); + + const orchestrator = Orchestrator.create({ + conductor, + workers: [worker], + }); + + await orchestrator.start(); + await conductor.invoke({ name: "step-task" }, { input: "will-fail" }); + await new Promise((resolve) => setTimeout(resolve, 500)); + await orchestrator.stop(); + + const spans = getSpans(otelCtx); + const stepSpan = findSpanByName(spans, "step-task step error-step"); + + expect(stepSpan).toBeDefined(); + expect(stepSpan?.status.code).toBe(SpanStatusCode.ERROR); + expect(stepSpan?.status.message).toContain("Step failed"); + }); +}); diff --git a/packages/pgconductor-js/package.json b/packages/pgconductor-js/package.json index a3adf91..e9916b4 100644 --- a/packages/pgconductor-js/package.json +++ b/packages/pgconductor-js/package.json @@ -4,6 +4,12 @@ "description": "Durable execution using only Postgres", "module": "dist/index.js", "type": "module", + "exports": { + ".": { + "types": "./src/index.ts", + "default": "./src/index.ts" + } + }, "bin": { "pgconductor": "./dist/cli/index.js" }, diff --git a/packages/pgconductor-js/src/database-client.ts b/packages/pgconductor-js/src/database-client.ts index c388fa2..9a813a2 100644 --- a/packages/pgconductor-js/src/database-client.ts +++ b/packages/pgconductor-js/src/database-client.ts @@ -36,6 +36,7 @@ export interface ExecutionSpec { parent_execution_id?: string | null; parent_step_key?: string | null; parent_timeout_ms?: number | null; + trace_context?: TraceContext | null; } export interface TaskSpec { @@ -60,8 +61,13 @@ export interface Execution { dedupe_key?: string | null; cron_expression?: string | null; slot_group_number?: number | null; + trace_context?: TraceContext | null; } +export type TraceContext = { + [key: string]: string | undefined; +}; + // todo: move all of this to query-builder too or create new types.ts file export type ExecutionResult = diff --git a/packages/pgconductor-js/src/generated/sql.ts b/packages/pgconductor-js/src/generated/sql.ts index 94bedab..55c7804 100644 --- a/packages/pgconductor-js/src/generated/sql.ts +++ b/packages/pgconductor-js/src/generated/sql.ts @@ -1187,5 +1187,268 @@ as $_$ values (p_event_key, p_payload) returning id; $_$; +`, + "0000000003_trace_context.sql": String.raw` +-- Add trace_context column for OpenTelemetry context propagation +-- This stores W3C traceparent/tracestate for distributed tracing + +-- Add column to executions table +alter table pgconductor._private_executions +add column if not exists trace_context jsonb; + +-- Add trace_context to execution_spec type +alter type pgconductor.execution_spec add attribute trace_context jsonb; + +drop function pgconductor.invoke; + +-- Update invoke_batch function to pass trace_context through +-- Preserves all existing logic, just adds trace_context column +create or replace function pgconductor.invoke_batch( + specs pgconductor.execution_spec[] +) + returns table(id uuid) + language plpgsql + volatile + set search_path to '' +as $function$ +declare + v_now timestamptz; +begin + v_now := pgconductor._private_current_time(); + + -- clear locked dedupe keys before batch insert + update pgconductor._private_executions as e + set + dedupe_key = null, + locked_by = null, + locked_at = null, + failed_at = v_now, + last_error = 'superseded by reinvoke' + from unnest(specs) as spec + where e.dedupe_key = spec.dedupe_key + and e.task_key = spec.task_key + and e.queue = coalesce(spec.queue, 'default') + and e.locked_at is not null + and spec.dedupe_key is not null; + + -- batch insert all executions + -- note: duplicate dedupe_keys within same batch will cause error + -- users should deduplicate client-side if needed + return query + insert into pgconductor._private_executions ( + id, + task_key, + queue, + payload, + run_at, + dedupe_key, + singleton_on, + cron_expression, + priority, + trace_context + ) + select + pgconductor._private_portable_uuidv7(), + spec.task_key, + coalesce(spec.queue, 'default'), + spec.payload, + coalesce(spec.run_at, v_now), + spec.dedupe_key, + case + when spec.dedupe_seconds is not null then + 'epoch'::timestamptz + '1 second'::interval * ( + spec.dedupe_seconds * floor( + extract(epoch from v_now) / spec.dedupe_seconds + ) + ) + else null + end, + spec.cron_expression, + coalesce(spec.priority, 0), + spec.trace_context + from unnest(specs) as spec + on conflict (task_key, dedupe_key, queue) do update set + payload = excluded.payload, + run_at = excluded.run_at, + priority = excluded.priority, + cron_expression = excluded.cron_expression, + singleton_on = excluded.singleton_on, + trace_context = excluded.trace_context + returning pgconductor._private_executions.id; +end; +$function$ +; + +-- Update invoke function to pass trace_context through +-- Preserves all existing logic, just adds trace_context parameter and column +create or replace function pgconductor.invoke( + p_task_key text, + p_queue text default 'default', + p_payload jsonb default null, + p_run_at timestamptz default null, + p_dedupe_key text default null, + p_dedupe_seconds integer default null, + p_dedupe_next_slot boolean default false, + p_cron_expression text default null, + p_priority integer default null, + p_trace_context jsonb default null +) + returns table(id uuid) + language plpgsql + volatile + set search_path to '' +as $function$ +declare + v_now timestamptz; + v_singleton_on timestamptz; + v_next_singleton_on timestamptz; + v_run_at timestamptz; + v_new_id uuid; +begin + v_now := pgconductor._private_current_time(); + v_run_at := coalesce(p_run_at, v_now); + + -- clear locked dedupe key before insert (supersede pattern) + if p_dedupe_key is not null then + update pgconductor._private_executions + set + dedupe_key = null, + locked_by = null, + locked_at = null, + failed_at = v_now, + last_error = 'superseded by reinvoke' + where dedupe_key = p_dedupe_key + and task_key = p_task_key + and queue = p_queue + and locked_at is not null; + end if; + + -- singleton throttle/debounce logic + if p_dedupe_seconds is not null then + -- calculate current time slot (pg-boss formula) + v_singleton_on := 'epoch'::timestamptz + '1 second'::interval * ( + p_dedupe_seconds * floor( + extract(epoch from v_now) / p_dedupe_seconds + ) + ); + + if p_dedupe_next_slot = false then + -- throttle: try current slot, return empty if blocked + return query + insert into pgconductor._private_executions ( + id, + task_key, + queue, + payload, + run_at, + dedupe_key, + singleton_on, + cron_expression, + priority, + trace_context + ) values ( + pgconductor._private_portable_uuidv7(), + p_task_key, + p_queue, + p_payload, + v_run_at, + p_dedupe_key, + v_singleton_on, + p_cron_expression, + coalesce(p_priority, 0), + p_trace_context + ) + on conflict (task_key, singleton_on, coalesce(dedupe_key, ''), queue) + where singleton_on is not null and completed_at is null and failed_at is null and cancelled = false + do nothing + returning _private_executions.id; + return; + else + -- debounce: upsert into next slot + v_next_singleton_on := v_singleton_on + (p_dedupe_seconds || ' seconds')::interval; + + return query + insert into pgconductor._private_executions ( + id, + task_key, + queue, + payload, + run_at, + dedupe_key, + singleton_on, + cron_expression, + priority, + trace_context + ) values ( + pgconductor._private_portable_uuidv7(), + p_task_key, + p_queue, + p_payload, + v_next_singleton_on, + p_dedupe_key, + v_next_singleton_on, + p_cron_expression, + coalesce(p_priority, 0), + p_trace_context + ) + on conflict (task_key, singleton_on, coalesce(dedupe_key, ''), queue) + where singleton_on is not null and completed_at is null and failed_at is null and cancelled = false + do update set + payload = excluded.payload, + run_at = excluded.run_at, + priority = excluded.priority, + trace_context = excluded.trace_context + returning _private_executions.id; + return; + end if; + end if; + + -- regular invoke (no singleton) + if p_dedupe_key is not null then + -- clear keys that are currently locked so a subsequent insert can succeed. + update pgconductor._private_executions as e + set + dedupe_key = null, + locked_by = null, + locked_at = null, + failed_at = pgconductor._private_current_time(), + last_error = 'superseded by reinvoke' + where e.dedupe_key = p_dedupe_key + and e.task_key = p_task_key + and e.queue = p_queue + and e.locked_at is not null; + end if; + + return query insert into pgconductor._private_executions as e ( + id, + task_key, + queue, + payload, + run_at, + dedupe_key, + cron_expression, + priority, + trace_context + ) values ( + pgconductor._private_portable_uuidv7(), + p_task_key, + p_queue, + p_payload, + v_run_at, + p_dedupe_key, + p_cron_expression, + coalesce(p_priority, 0), + p_trace_context + ) + on conflict (task_key, dedupe_key, queue) do update set + payload = excluded.payload, + run_at = excluded.run_at, + priority = excluded.priority, + cron_expression = excluded.cron_expression, + trace_context = excluded.trace_context + returning e.id; +end; +$function$ +; `, }); diff --git a/packages/pgconductor-js/src/index.ts b/packages/pgconductor-js/src/index.ts index 9689989..44c4b92 100644 --- a/packages/pgconductor-js/src/index.ts +++ b/packages/pgconductor-js/src/index.ts @@ -2,5 +2,7 @@ export { Conductor } from "./conductor"; export { Orchestrator } from "./orchestrator"; export { Worker } from "./worker"; export { Task } from "./task"; +export { TaskContext } from "./task-context"; export { SchemaManager } from "./schema-manager"; export { MigrationStore } from "./migration-store"; +export { TaskSchemas } from "./schemas"; diff --git a/packages/pgconductor-js/src/query-builder.ts b/packages/pgconductor-js/src/query-builder.ts index a7f3a0f..074dc50 100644 --- a/packages/pgconductor-js/src/query-builder.ts +++ b/packages/pgconductor-js/src/query-builder.ts @@ -288,7 +288,8 @@ export class QueryBuilder { _private_executions.last_error, _private_executions.dedupe_key, _private_executions.cron_expression, - null as slot_group_number + null as slot_group_number, + _private_executions.trace_context `; } @@ -462,7 +463,8 @@ export class QueryBuilder { e.last_error, e.dedupe_key, e.cron_expression, - p.slot_group_number + p.slot_group_number, + e.trace_context `; } @@ -923,7 +925,8 @@ export class QueryBuilder { p_dedupe_seconds := ${dedupe_seconds}::integer, p_dedupe_next_slot := ${dedupe_next_slot}::boolean, p_cron_expression := ${spec.cron_expression || null}::text, - p_priority := ${spec.priority || null}::integer + p_priority := ${spec.priority || null}::integer, + p_trace_context := ${spec.trace_context ? this.sql.json(spec.trace_context) : null}::jsonb ) `; } @@ -1029,6 +1032,7 @@ export class QueryBuilder { dedupe_next_slot, cron_expression: spec.cron_expression || null, priority: spec.priority, + trace_context: spec.trace_context || null, }; }); diff --git a/packages/pgconductor-js/src/versions.ts b/packages/pgconductor-js/src/versions.ts index 3273eeb..a8bc96b 100644 --- a/packages/pgconductor-js/src/versions.ts +++ b/packages/pgconductor-js/src/versions.ts @@ -1,3 +1,3 @@ /* This file is auto-generated by `just build-migrations`; DO NOT EDIT */ export const PACKAGE_VERSION = "0.1.0"; -export const MIGRATION_NUMBER = 2; +export const MIGRATION_NUMBER = 3;