Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ on:

jobs:
build:
runs-on: ubuntu-20.04
runs-on: ubuntu-latest
strategy:
matrix:
node-version: [ 20.x ]
node-version: [ 24.x ]
steps:
- uses: actions/checkout@v3
name: Checkout repository
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ All the power, none of the complexity.

---

This is very much under construction, and should not be used by anyone, anywhere, for anything. If you do, you're on your own.
This is very much under construction, and should **not be used by anyone, anywhere, for anything**. If you do, you're on your own.

It's custom designed for pb33f and our needs for now.
16 changes: 5 additions & 11 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,18 @@
"scripts": {
"dev": "vite",
"build": "vite build",
"preview": "vite preview",
"test": "vitest",
"coverage": "vitest run --coverage"
"preview": "vite preview"
},
"devDependencies": {
"@vitest/coverage-c8": "^0.31.1",
"@vitest/ui": "^0.31.1",
"c8": "^7.13.0",
"i": "^0.3.7",
"typescript": "^5.0.2",
"vite": "5.4.6",
"vitest": "^0.31.1"
"typescript": "^5.9.2",
"vite": "^7.1.3",
"vite-plugin-dts": "^4.5.4"
},
"files": [
"dist"
],
"dependencies": {
"@stomp/stompjs": "^7.0.0",
"vite-plugin-dts": "^4.3.0"
"@stomp/stompjs": "^7.1.1"
}
}
29 changes: 28 additions & 1 deletion src/bus.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,32 @@
import {Client, IPublishParams, StompConfig} from "@stomp/stompjs";
import {Client, IPublishParams, StompConfig, IFrame} from "@stomp/stompjs";
import {ranch} from "./bus_engine.js";

/**
* A callback for a message received on a channel
*/
export type BusCallback<T = any> = (message: Message<T>) => void

/**
* A callback for STOMP error frames
*/
export type StompErrorCallback = (error: StompError) => void

/**
* Error information extracted from STOMP ERROR frames
*/
export interface StompError {
/** The raw STOMP error frame */
frame: IFrame
/** Error message from frame body or headers */
message: string
/** Additional error details from headers */
details?: string
/** Error code if available */
code?: string
/** Timestamp when error occurred */
timestamp: Date
}

/**
* A subscription to a channel
*/
Expand Down Expand Up @@ -63,6 +84,12 @@ export interface Channel {

export class RanchConfig extends StompConfig {
public mapChannelsOnConnect?: boolean = true;

/**
* Enhanced callback invoked when STOMP ERROR frames are received
* This provides structured error information beyond the basic frame
*/
public onRanchStompError?: StompErrorCallback;
}

/**
Expand Down
75 changes: 72 additions & 3 deletions src/bus_engine.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,46 @@
import {Client, IPublishParams} from "@stomp/stompjs";
import {Client, IPublishParams, IFrame} from "@stomp/stompjs";
import {RanchUtils} from "./utils.js";
import {Bus, BusCallback, Channel, RanchConfig, Subscriber, Subscription} from "./bus.js";
import {Bus, BusCallback, Channel, RanchConfig, StompError, Subscriber, Subscription} from "./bus.js";

const ERROR_CHANNEL_NAME = 'errors';
const DEFAULT_ERROR_MESSAGE = 'An error occurred while communicating with the server';

export class ranch implements Bus {
private _channels: Channel[] = []
private _stompClient: Client | undefined = undefined;
private _preMappedChannels: Map<string, string>
private _activeMappings: Map<string, string>

constructor() {
this._preMappedChannels = new Map<string, string>()
this._activeMappings = new Map<string, string>()
}

/**
* Extracts error information from a STOMP ERROR frame
* @param frame The STOMP ERROR frame received from the server
* @returns Structured error information
*/
private _extractStompError(frame: IFrame): StompError {
// Extract message from frame body or headers
let message = DEFAULT_ERROR_MESSAGE;

// Check frame body first
if (frame.body && frame.body.trim()) {
message = frame.body.trim();
}
// Fallback to 'message' header
else if (frame.headers && frame.headers['message']) {
message = frame.headers['message'];
}

return {
frame,
message,
details: frame.headers?.['details'],
code: frame.headers?.['code'] || frame.headers?.['error-code'],
timestamp: new Date()
};
}

getClient(): Client | undefined {
Expand Down Expand Up @@ -57,10 +89,42 @@ export class ranch implements Bus {
config.onDisconnect(frame);
}
})

// Configure STOMP ERROR frame handler
this._stompClient.onStompError = (frame: IFrame) => {
const stompError = this._extractStompError(frame);

// Log error to console
console.error('STOMP Error received:', stompError.message, stompError);

// Publish to error channel for centralized error handling
const errorChannel = this.getChannel(ERROR_CHANNEL_NAME);
errorChannel.publish({
command: 'STOMP_ERROR',
payload: stompError
});

// Call user-defined enhanced error callback if provided
if (config.onRanchStompError) {
config.onRanchStompError(stompError);
}

// Also call base STOMP error handler if provided
if (config.onStompError) {
config.onStompError(frame);
}
}

this._stompClient.activate()
}

mapChannels() {
// Re-map existing active destinations (lost on reconnection)
this._activeMappings.forEach((channel: string, destination: string) => {
this._mapDestination(destination, channel, false)
});

// Map queued destinations
this._preMappedChannels.forEach((channel: string, destination: string) => {
this._mapDestination(destination, channel)
});
Expand All @@ -81,14 +145,19 @@ export class ranch implements Bus {
}
}

private _mapDestination(destination: string, channel: string) {
private _mapDestination(destination: string, channel: string, persistMapping: boolean = true) {
if (this._stompClient) {
this._stompClient.subscribe(destination, message => {
const chan = this._channels.find(c => c.name === channel)
if (chan) {
chan.publish({payload: JSON.parse(message.body)})
}
});

// Track successful mapping for reconnection
if (persistMapping) {
this._activeMappings.set(destination, channel)
}
}
}
}
Expand Down
Loading