Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 74 additions & 26 deletions middleware/src/channels/routeRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,35 @@ import type { HttpMethod } from '@omadia/channel-sdk';

/**
* Per-channel Express route mount. Express itself has no clean way to
* un-register middleware, so we use an indirection: every channel-owned
* route is mounted ONCE at registration time and reads a per-channel
* `active` flag from this registry. Deactivate flips the flag; the handler
* short-circuits with 503 afterwards. Re-activation flips it back.
* un-register middleware, so we use an indirection: every channel-owned route
* is mounted ONCE at first registration and reads its handler from a mutable
* per-route record plus a per-channel `active` flag. Deactivate flips the flag;
* the handler short-circuits with 503 afterwards. Re-activation flips it back,
* and a re-registration by the same channel (hot-reinstall) rebinds the handler
* in place rather than mounting a second, shadowed route.
*
* Not the prettiest pattern, but it avoids restarting the HTTP server and
* keeps behaviour observable in logs. When Slice 2.5 adds dynamic load/
* unload we can swap this out for a mutable `app.use` router-swap variant.
* When a future slice adds dynamic load/unload we can swap this out for a
* mutable `app.use` router-swap variant.
*/

interface RegisteredRoute {
interface RouteMount {
method: HttpMethod;
path: string;
channelId: string;
handler: RequestHandler;
}

interface RouterMount {
prefix: string;
channelId: string;
router: Router;
}

export class ExpressRouteRegistry {
private readonly routes: RegisteredRoute[] = [];
/** Keyed by `${method} ${path}` — the Express dispatch key. */
private readonly routeMounts = new Map<string, RouteMount>();
/** Keyed by mount prefix. */
private readonly routerMounts = new Map<string, RouterMount>();
private readonly activeByChannel = new Map<string, boolean>();

constructor(private readonly app: Express) {}
Expand All @@ -40,22 +50,40 @@ export class ExpressRouteRegistry {
path: string,
handler: RequestHandler,
): void {
const key = `${method} ${path}`;
const existing = this.routeMounts.get(key);
if (existing) {
if (existing.channelId !== channelId) {
throw new Error(
`route '${key}' already owned by channel '${existing.channelId}'`,
);
}
// Same channel re-registering (hot-reinstall): rebind in place.
// Every hot-swap path (config reactivate, version upload) deactivates
// then re-activates the channel, which re-runs the plugin's activate()
// and lands here. The mounted wrapper reads `mount.handler` on each
// dispatch, so overwriting it swaps the freshly-loaded module's handler
// in — no second, shadowing app.<method>() mount (#395).
existing.handler = handler;
this.activeByChannel.set(channelId, true);
console.log(`[channels] route rebound ${key} (channel=${channelId})`);
return;
}

const mount: RouteMount = { method, path, channelId, handler };
const wrapper: RequestHandler = (
req: Request,
res: Response,
next: NextFunction,
) => {
if (!this.activeByChannel.get(channelId)) {
res.status(503).json({
code: 'channel.inactive',
message: `channel '${channelId}' is currently deactivated`,
});
if (!this.activeByChannel.get(mount.channelId)) {
this.sendChannelInactive(res, mount.channelId);
return;
}
void Promise.resolve(handler(req, res, next)).catch((err: unknown) => {
void Promise.resolve(mount.handler(req, res, next)).catch((err: unknown) => {
const message = err instanceof Error ? err.message : String(err);
console.error(
`[channel:${channelId}] unhandled route error on ${method} ${path}:`,
`[channel:${mount.channelId}] unhandled route error on ${method} ${path}:`,
message,
);
if (!res.headersSent) {
Expand Down Expand Up @@ -85,31 +113,44 @@ export class ExpressRouteRegistry {
break;
}

this.routes.push({ method, path, channelId, handler });
this.routeMounts.set(key, mount);
this.activeByChannel.set(channelId, true);
console.log(
`[channels] route registered ${method} ${path} (channel=${channelId})`,
);
}

/**
* Register a full Router under a prefix. Same active-flag gate —
* requests under the prefix return 503 while the channel is inactive.
* Register a full Router under a prefix. Same active-flag gate — requests
* under the prefix return 503 while the channel is inactive. Re-registration
* by the same channel rebinds the router in place, mirroring {@link register}.
*/
registerRouter(channelId: string, prefix: string, router: Router): void {
const existing = this.routerMounts.get(prefix);
if (existing) {
if (existing.channelId !== channelId) {
throw new Error(
`router prefix '${prefix}' already owned by channel '${existing.channelId}'`,
);
}
existing.router = router;
this.activeByChannel.set(channelId, true);
console.log(`[channels] router rebound at ${prefix} (channel=${channelId})`);
return;
}

const mount: RouterMount = { prefix, channelId, router };
this.app.use(
prefix,
(req: Request, res: Response, next: NextFunction) => {
if (!this.activeByChannel.get(channelId)) {
res.status(503).json({
code: 'channel.inactive',
message: `channel '${channelId}' is currently deactivated`,
});
if (!this.activeByChannel.get(mount.channelId)) {
this.sendChannelInactive(res, mount.channelId);
return;
}
router(req, res, next);
mount.router(req, res, next);
},
);
this.routerMounts.set(prefix, mount);
this.activeByChannel.set(channelId, true);
console.log(
`[channels] router registered at ${prefix} (channel=${channelId})`,
Expand All @@ -121,6 +162,13 @@ export class ExpressRouteRegistry {
this.activeByChannel.set(channelId, active);
}

private sendChannelInactive(res: Response, channelId: string): void {
res.status(503).json({
code: 'channel.inactive',
message: `channel '${channelId}' is currently deactivated`,
});
}

/** Mark every route owned by this channel as inactive (returns 503). */
deactivateChannel(channelId: string): void {
this.activeByChannel.set(channelId, false);
Expand All @@ -129,7 +177,7 @@ export class ExpressRouteRegistry {

/** For introspection / dev-tools. */
describe(): Array<{ method: HttpMethod; path: string; channelId: string; active: boolean }> {
return this.routes.map((r) => ({
return Array.from(this.routeMounts.values()).map((r) => ({
method: r.method,
path: r.path,
channelId: r.channelId,
Expand Down
106 changes: 106 additions & 0 deletions middleware/test/channelRouteRebind.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import { strict as assert } from 'node:assert';
import type { AddressInfo } from 'node:net';
import { after, before, describe, it } from 'node:test';

import express, { type Express } from 'express';

import { ExpressRouteRegistry } from '../src/channels/routeRegistry.js';

/** Regression coverage for #395: a plugin hot-reinstall must rebind the
* inbound handler in place, not serve the stale first-mounted route. */
describe('ExpressRouteRegistry · hot-reinstall handler rebind (#395)', () => {
let app: Express;
let registry: ExpressRouteRegistry;
let baseUrl: string;
let server: ReturnType<Express['listen']>;

const CHANNEL = '@test/channel-395';
const PATH = '/api/test-395/messages';

before(async () => {
app = express();
app.use(express.json());
registry = new ExpressRouteRegistry(app);
server = app.listen(0);
await new Promise<void>((resolve) => server.once('listening', resolve));
const { port } = server.address() as AddressInfo;
baseUrl = `http://127.0.0.1:${port}`;
});

after(() => {
server.close();
});

const post = async (): Promise<{ status: number; body: unknown }> => {
const res = await fetch(`${baseUrl}${PATH}`, {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: '{}',
});
return { status: res.status, body: await res.json() };
};

it('serves the freshly-registered handler after a re-registration', async () => {
registry.register(CHANNEL, 'POST', PATH, (_req, res) => {
res.json({ version: 'v1' });
});
registry.setActive(CHANNEL, true);
assert.deepEqual((await post()).body, { version: 'v1' });

// Hot reinstall → v2: deactivate, then re-register with the new handler.
registry.deactivateChannel(CHANNEL);
registry.register(CHANNEL, 'POST', PATH, (_req, res) => {
res.json({ version: 'v2' });
});
registry.setActive(CHANNEL, true);

assert.deepEqual(
(await post()).body,
{ version: 'v2' },
'inbound handler must serve the reinstalled module, not the stale one',
);
});

it('does not stack a second Express route on re-registration', async () => {
const mountsFor = () =>
registry.describe().filter((r) => r.path === PATH && r.method === 'POST');
assert.equal(mountsFor().length, 1, 'exactly one mount before re-register');
registry.register(CHANNEL, 'POST', PATH, (_req, res) => {
res.json({ version: 'v3' });
});
assert.equal(mountsFor().length, 1, 'still one mount after re-register');
assert.deepEqual((await post()).body, { version: 'v3' });
});

it('rejects a different channel claiming an already-owned path/prefix', () => {
assert.throws(
() =>
registry.register('@test/other-channel', 'POST', PATH, (_req, res) => {
res.json({ version: 'intruder' });
}),
/already owned by channel '@test\/channel-395'/,
);

const PREFIX = '/api/test-395-router';
registry.registerRouter(CHANNEL, PREFIX, express.Router());
assert.throws(
() => registry.registerRouter('@test/other-channel', PREFIX, express.Router()),
/already owned by channel '@test\/channel-395'/,
);
// Same channel re-registering (hot-reinstall) must not throw.
assert.doesNotThrow(() =>
registry.registerRouter(CHANNEL, PREFIX, express.Router()),
);
});

it('gates a deactivated channel with 503 until re-activation', async () => {
registry.deactivateChannel(CHANNEL);
const down = await post();
assert.equal(down.status, 503);
assert.equal((down.body as { code?: string }).code, 'channel.inactive');

registry.setActive(CHANNEL, true);
const up = await post();
assert.equal(up.status, 200);
});
});
Loading