Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
7465a5f
Fix: Auto-cleanup existing database containers before running prep
claude Nov 24, 2025
979daf8
feat: add Auth0 authentication integration to site template
cherriechang Nov 21, 2025
e1b93f2
fix: prevent Header from overwriting Auth0 user ID with session-based ID
cherriechang Nov 21, 2025
f94b994
fix: resolve RabbitMQ heartbeat timeout in workers
claude Nov 23, 2025
a5290a0
docs: add changeset for RabbitMQ heartbeat fix
claude Nov 23, 2025
07dfbfa
fix: use Docker service names instead of localhost for worker DB conn…
claude Nov 25, 2025
1ec95ed
chore: update yarn.lock after dependency install
claude Nov 25, 2025
e2c178d
finalize setting up experiment workers for local deployment; ensure
cherriechang Nov 26, 2025
b0883db
add: support for experiment workers to connect to RDS via SSL encryption
cherriechang Dec 1, 2025
ccca6d4
fix: experiments sharing the same experiment worker due to bug in ecs…
cherriechang Dec 1, 2025
6e5a71c
add .claude to .gitignore
cherriechang Dec 2, 2025
b46d185
log the warning if this check fails for a reason other than grep find…
becky-gilbert Mar 20, 2026
0bc9e5f
fix undefined var e in catch block (pre-existing bug)
becky-gilbert Mar 20, 2026
55c4cb3
switch from compose stop/rm to down - same thing but also removes nam…
becky-gilbert Mar 20, 2026
6a31819
Merge branch 'fix/aws-deployment' into fix/experiment-workers and fix…
becky-gilbert Mar 25, 2026
7e4b3a1
use DB_HOST/TRANS_HOST from pushkinYAML so that the docker service na…
becky-gilbert Mar 25, 2026
929ecb3
remove referenes to old DB_SSL env variable and sslConfig in workers …
becky-gilbert Apr 7, 2026
0c50a41
merge fix/prep-graceful-handle-container-exist to fix db password iss…
becky-gilbert Apr 8, 2026
cb3be45
amqp connection heartbeat should be set via query param
becky-gilbert Apr 8, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .changeset/fix-rabbitmq-heartbeat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
"pushkin-worker": patch
"@pushkin-templates/site-basic": patch
---

Fix RabbitMQ heartbeat timeout causing worker crashes

**Bug Fix:**
- Resolves "Heartbeat timeout" errors that prevented experiment workers from completing database operations
- Workers would crash with "Error: Heartbeat timeout at Heart.<anonymous>" during experiment execution

**Changes:**
- Added `heartbeat: 30` configuration to `amqp.connect()` in pushkin-worker to send heartbeats every 30 seconds
- Upgraded RabbitMQ from version 3.6 to 3.12 in docker-compose.dev.yml template
- Added `RABBITMQ_HEARTBEAT: '30'` environment variable to RabbitMQ service configuration

**Impact:**
This fix ensures stable RabbitMQ connections during long-running experiment tasks and prevents connection timeouts that were blocking database persistence of user data and experiment results.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,5 @@ playwright/.cache/

# CLAUDE files
CLAUDE.local.md
CLAUDE/*
CLAUDE/*
.claude
153 changes: 77 additions & 76 deletions packages/pushkin-api/src/api.js
Original file line number Diff line number Diff line change
@@ -1,87 +1,88 @@
import express from 'express';
import amqp from 'amqplib';
import { v4 as uuid } from 'uuid';
import bodyParser from 'body-parser';
import cors from 'cors';
import cookieSession from 'cookie-session';
import fs from 'fs';
import express from "express";
import amqp from "amqplib";
import { v4 as uuid } from "uuid";
import bodyParser from "body-parser";
import cors from "cors";
import cookieSession from "cookie-session";
import fs from "fs";

export default class PushkinAPI {
constructor(expressPort, amqpAddress, key) {
this.expressPort = expressPort;
this.amqpAddress = amqpAddress;
this.initialized = false;
constructor(expressPort, amqpAddress, key) {
this.expressPort = expressPort;
this.amqpAddress = amqpAddress;
this.initialized = false;

// Check if .env file exists
if (fs.existsSync('.env')) {
// If .env file exists, append cookie secret key variable
fs.appendFileSync('.env', `COOKIE_SESSION_SECRET=${key}\n`);
console.log('Cookie session secret key appended to .env file.');
} else {
// If .env file does not exist, create it and set cookie secret key variable
fs.writeFileSync('.env', `COOKIE_SESSION_SECRET=${key}\n`);
console.log('Secret key generated and stored in .env file.');
}

this.app = express();
this.app.set('trust-proxy', 1);
this.app.use(cookieSession({
name: 'session',
maxAge: 24 * 60 * 60 * 1000,
keys: [key]
}));
this.app.use( (req, res, next) => {
req.session.id = req.session.id || uuid();
console.log(`API got request for ${req}`);
next();
});
this.app.use(bodyParser.json());
this.app.use(cors());
this.expressListening = false;
this.server = null;
this.app.get('/', function (req, res) {
res.send('👨‍🔬💬👩‍🔬')
})
// Check if .env file exists
if (fs.existsSync(".env")) {
// If .env file exists, append cookie secret key variable
fs.appendFileSync(".env", `COOKIE_SESSION_SECRET=${key}\n`);
console.log("Cookie session secret key appended to .env file.");
} else {
// If .env file does not exist, create it and set cookie secret key variable
fs.writeFileSync(".env", `COOKIE_SESSION_SECRET=${key}\n`);
console.log("Secret key generated and stored in .env file.");
}

async init() {
return new Promise((resolve, reject) => {
amqp.connect(this.amqpAddress)
.then(conn => {
this.conn = conn;
this.initialized = true;
console.log('API init connected');
resolve();
})
.catch(err => {
reject(`Error connecting to message queue: ${err}`);
});
});
}
this.app = express();
this.app.set("trust-proxy", 1);
this.app.use(
cookieSession({
name: "session",
maxAge: 24 * 60 * 60 * 1000,
keys: [key],
}),
);
this.app.use((req, res, next) => {
req.session.id = req.session.id || uuid();
console.log(`API got request for ${req}`);
next();
});
this.app.use(bodyParser.json());
this.app.use(cors());
this.expressListening = false;
this.server = null;
this.app.get("/", function (req, res) {
res.send("👨‍🔬💬👩‍🔬");
});
}

useController(route, controller) {
if (this.expressListening)
throw new Error('Unable to add controllers after the API has started.');
console.log('API using controller');
this.app.use(route, controller); }
async init() {
return new Promise((resolve, reject) => {
amqp
.connect(this.amqpAddress)
.then((conn) => {
this.conn = conn;
this.initialized = true;
console.log("API init connected");
resolve();
})
.catch((err) => {
reject(`Error connecting to message queue: ${err}`);
});
});
}

usePushkinController(route, pushkinController) {
if (this.expressListening)
throw new Error('Unable to add controllers after the API has started.');
if (!this.initialized)
throw new Error('The API must first be initialized by calling .init().');
this.useController(route, pushkinController.getConnFunction()(this.conn));
}
useController(route, controller) {
if (this.expressListening)
throw new Error("Unable to add controllers after the API has started.");
console.log("API using controller");
this.app.use(route, controller);
}

//enableCoreRoutes() { this.usePushkinController('/api', coreRouter); }
usePushkinController(route, pushkinController) {
if (this.expressListening)
throw new Error("Unable to add controllers after the API has started.");
if (!this.initialized) throw new Error("The API must first be initialized by calling .init().");
this.useController(route, pushkinController.getConnFunction()(this.conn));
}

start() {
if (!this.initialized)
throw new Error('The API hasn\'t been successfully initialized');
this.expressListening = true;
this.server = this.app.listen(this.expressPort, async () => {
console.log(`Pushkin API listening on port ${this.expressPort}`);
});
}
//enableCoreRoutes() { this.usePushkinController('/api', coreRouter); }

start() {
if (!this.initialized) throw new Error("The API hasn't been successfully initialized");
this.expressListening = true;
this.server = this.app.listen(this.expressPort, async () => {
console.log(`Pushkin API listening on port ${this.expressPort}`);
});
}
}
1 change: 0 additions & 1 deletion packages/pushkin-cli/src/commands/aws/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2161,7 +2161,6 @@ const ecsTaskCreator = async (
DB_DB: dbInfoByTask["Main"].name,
DB_PASS: dbInfoByTask["Main"].password,
DB_URL: dbInfoByTask["Main"].endpoint,
DB_SSL: "true",
//"TRANS_URL": `postgres://${dbInfoByTask['Transaction'].username}:${dbInfoByTask['Transaction'].password}@${dbInfoByTask['Transaction'].endpoint}:/${dbInfoByTask['Transaction'].port}/${dbInfoByTask['Transaction'].name}`
TRANS_HOST: dbInfoByTask["Transaction"].endpoint,
TRANS_USER: dbInfoByTask["Transaction"].username,
Expand Down
55 changes: 31 additions & 24 deletions packages/pushkin-cli/src/commands/prep/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import path from "path";
import fs from "graceful-fs";
import jsYaml from "js-yaml";
import util from "util";
import { execSync } from 'child_process'; // eslint-disable-line
import { execSync } from "child_process"; // eslint-disable-line
const exec = util.promisify(require("child_process").exec);
const execFile = util.promisify(require("child_process").execFile);
import pacMan from "../../pMan.js"; //which package manager is available?
Expand Down Expand Up @@ -45,8 +45,11 @@ export const updatePasswords = async () => {
composeFileData.services.test_transaction_db.environment.POSTGRES_PASSWORD = transactionDBPass;
exps.forEach((exp) => {
const workerName = exp.toLowerCase().concat("_worker");
composeFileData.services[workerName].environment.DB_PASS = testDBPass;
composeFileData.services[workerName].environment.TRANS_PASS = transactionDBPass;
// Check if worker service exists before trying to update it
if (composeFileData.services[workerName] && composeFileData.services[workerName].environment) {
composeFileData.services[workerName].environment.DB_PASS = testDBPass;
composeFileData.services[workerName].environment.TRANS_PASS = transactionDBPass;
}
});
const newComposeFile = fs.promises.writeFile(
"pushkin/docker-compose.dev.yml",
Expand Down Expand Up @@ -102,29 +105,33 @@ const publishLocalPackage = async (modDir, modName, verbose) => {
let pluginsToAdd = [];
let pluginsToUpgrade = [];
plugins.forEach((plugin) => {
// Create a regex to find the version number specified after the import statement
let versionMatch = new RegExp(`(?<=${plugin}'; \/\/ version:).+?(?= \/\/)`, "g");
let pluginVersion = "";
if (expJs.includes(`${plugin}'; // version:`)) {
pluginVersion = expJs.match(versionMatch)[0];
}
// If any jsPsych plugins are not yet added to package.json, add them
if (!packageJson.dependencies[plugin]) {
if (pluginVersion === "") {
// Just add the plugin name if no version/tag is specified
pluginsToAdd.push(plugin);
} else {
// Append the version/tag if specified
pluginsToAdd.push(plugin + "@" + pluginVersion);
// Check if plugins exist before iterating (match returns null if no matches)
if (plugins) {
// Create a regex to find the version number specified after the import statement
let versionMatch = new RegExp(`(?<=${plugin}'; \/\/ version:).+?(?= \/\/)`, "g");
let pluginVersion = "";
if (expJs.includes(`${plugin}'; // version:`)) {
pluginVersion = expJs.match(versionMatch)[0];
}
} else {
// package is already added to package.json
// Check if version/tag is specified and differs from the one in package.json
if (pluginVersion !== "" && packageJson.dependencies[plugin] !== pluginVersion) {
pluginsToUpgrade.push(plugin + "@" + pluginVersion);
// If any jsPsych plugins are not yet added to package.json, add them
if (!packageJson.dependencies[plugin]) {
if (pluginVersion === "") {
// Just add the plugin name if no version/tag is specified
pluginsToAdd.push(plugin);
} else {
// Append the version/tag if specified
pluginsToAdd.push(plugin + "@" + pluginVersion);
}
} else {
// package is already added to package.json
// Check if version/tag is specified and differs from the one in package.json
if (pluginVersion !== "" && packageJson.dependencies[plugin] !== pluginVersion) {
pluginsToUpgrade.push(plugin + "@" + pluginVersion);
}
}
}
});

// If any plugins need to be added or upgraded, do so
if (pluginsToAdd.length > 0) {
if (verbose)
Expand Down Expand Up @@ -553,8 +560,8 @@ export const prep = async (experimentsDir, coreDir, verbose) => {
pushkinYAML.databases.localtransactiondb.host;
compFile.services[workerName].environment.TRANS_DB =
pushkinYAML.databases.localtransactiondb.name;
compFile.services[workerName].environment.TRANS_PORT =
pushkinYAML.databases.localtransactiondb.port;
// Use internal container port (5432), not host-mapped port
compFile.services[workerName].environment.TRANS_PORT = "5432";

let workerBuild;
try {
Expand Down
73 changes: 63 additions & 10 deletions packages/pushkin-cli/src/commands/setupdb/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -482,24 +482,77 @@ async function migrateExperimentsDB(dbMigrationsMap, coreDBs, verbose) {
return runMigrations(dbMigrationsMap, coreDBs, verbose);
}

/**
* Ensures a clean state by detecting and removing existing database containers
* This prevents issues with stale containers that may have different credentials
* @param {boolean} verbose Output extra debugging info
* @returns {Promise<void>}
*/
async function ensureCleanState(verbose) {
if (verbose) console.log("--verbose flag set inside ensureCleanState()");

try {
// Check if database containers are running or stopped
const { stdout } = await exec(
`docker ps -a --format "{{.Names}}" | grep -E "pushkin[-_](test_db|test_transaction_db)[-_]"`
);

if (stdout.trim()) {
// Found existing containers - clean them up
console.log('⚠️ Found existing database containers. Cleaning up...');

const dockerPath = path.join(process.cwd(), "pushkin");
const dockerConfig = "docker-compose.dev.yml";

try {
// Remove containers and named volumes so new containers start with fresh credentials
await compose.down({
cwd: dockerPath,
config: dockerConfig,
commandOptions: ["--volumes"], // removes named volumes (e.g. test_transaction_db_volume)
});
} catch (err) {
if (verbose) console.warn("Warning removing containers:", err.message);
}

console.log('✓ Cleanup complete. Starting fresh databases...');
} else {
if (verbose) console.log("No existing database containers found. Proceeding with fresh setup.");
}
} catch (e) {
// If grep finds nothing, it returns exit code 1, which throws an error
// This is expected when no containers exist, so we can safely ignore it
if (e.code === 1 && e.stderr === '') {
if (verbose) console.log("No existing database containers found (grep returned no matches).");
} else {
// Actual error - log it but don't fail the entire setup
console.warn("Warning: Could not check for existing containers:", e.message);
}
}
}

/**
* Set up all databases by running migrations and seeds.
* This is the main orchestration function that:
* 1. Starts Docker containers for test databases:
* 1. Ensures clean state (removes stale containers/volumes)
* 2. Starts Docker containers for test databases:
* - test_db: Main database for experiments and user accounts
* - test_transaction_db: Audit log database for query tracking/debugging
* 2. Collects all migrations from experiments and users
* 3. Waits for database containers to be healthy
* 4. Runs migrations and seeds in parallel
* 5. Stops all database containers when done
* 3. Collects all migrations from experiments and users
* 4. Waits for database containers to be healthy
* 5. Runs migrations and seeds in parallel
* 6. Stops all database containers when done
* @param {object} coreDBs - Database configurations object from pushkin.yaml
* @param {string} usersDir - Path to users directory
* @param {string} mainExpDir - Path to main experiments directory
* @param {boolean} verbose - Whether to enable verbose logging
* @returns {Promise} Promise that resolves when setup is complete
*/
export async function setupdb(coreDBs, usersDir, mainExpDir, verbose) {
// === 1. Start Docker containers for test databases ===
// === 1. Ensure clean state (remove stale containers/volumes) ===
await ensureCleanState(verbose);

// === 2. Start Docker containers for test databases ===
// test_db: Main database for experiments/users
// test_transaction_db: Audit log (records all queries for debugging)
// Note: Knex requires all migrations for the same DB to be run together
Expand All @@ -509,20 +562,20 @@ export async function setupdb(coreDBs, usersDir, mainExpDir, verbose) {
config: "docker-compose.dev.yml",
});

// === 2. Collect all migrations from experiments and users ===
// === 3. Collect all migrations from experiments and users ===
const dbMigrationsMap = await getMigrations(usersDir, mainExpDir, false, verbose);

// === 3. Wait for database containers to start before proceeding ===
// === 4. Wait for database containers to start before proceeding ===
await dbPromise;

// === 4. Run migrations and seeds in parallel ===
// === 5. Run migrations and seeds in parallel ===
const setupTransactionsTable = migrateTransactionsDB(coreDBs, verbose);
const experimentMigrationsPromise = migrateExperimentsDB(dbMigrationsMap, coreDBs, verbose);
await Promise.all([experimentMigrationsPromise, setupTransactionsTable]);

if (verbose) console.log("Finished running all migrations. Shutting down database containers.");

// === 5. Stop all database containers ===
// === 6. Stop all database containers ===
return await compose.stop({
cwd: path.join(process.cwd(), "pushkin"),
config: "docker-compose.dev.yml",
Expand Down
Loading