This document describes the Session layer — the protocol state machine that manages the BlazingMQ client lifecycle: negotiation, queue operations, message publishing, message consumption, ACK handling, and heartbeat monitoring.
new Session(options)
│
▼
┌───────────┐
│ CREATED │ No connection yet
└─────┬─────┘
│ start()
▼
┌───────────┐ ┌────────────┐
│ CONNECTING │────>│ NEGOTIATE │ TCP connected, send clientIdentity
└───────────┘ └─────┬──────┘
│ brokerResponse received
▼
┌───────────┐
│ STARTED │ Ready for queue ops
└─────┬─────┘
│ stop()
▼
┌───────────┐
│ STOPPING │ Close queues, send disconnect
└─────┬─────┘
│
▼
┌───────────┐
│ STOPPED │ All resources released
└───────────┘
When start() is called, the Session:
- Calls
BmqConnection.connect()to establish the TCP connection. - Sends a
clientIdentityCONTROL event with SDK metadata. - Waits for the
brokerResponseCONTROL event. - Extracts heartbeat configuration from the response.
- Starts the heartbeat monitor.
Why sdkLanguage: "E_JAVA"?
The BlazingMQ broker's protocol enum doesn't include a value for Node.js.
We use E_JAVA because the Java SDK also uses a pure-protocol implementation
(no C++ bindings), and the broker's behavior for E_JAVA clients is compatible
with our approach. The actual language doesn't affect protocol behavior.
Opening a queue is a two-step process:
- openQueue — Register the queue with the broker. The broker assigns internal routing based on the queue domain and flags.
- configureStream (consumers only) — Configure the subscription parameters: max unconfirmed messages/bytes, consumer priority.
The Session assigns a monotonically increasing queueId (starting from 0) to
each opened queue. This numeric ID is used in all subsequent binary messages
(PUT, PUSH, ACK, CONFIRM) instead of the full URI string.
Closing a queue is also multi-step:
- configureStream (consumers) — Send empty subscriptions to drain the consumer.
- closeQueue — Send the close request with the original handle parameters (including the original readCount/writeCount values).
Every control request includes an rId (request ID) — a monotonically increasing
integer. The Session stores a PendingRequest object for each:
interface PendingRequest {
resolve: (value: any) => void; // Promise resolve
reject: (error: Error) => void; // Promise reject
timer: NodeJS.Timeout; // Timeout timer
}When a control response arrives, its rId is matched against the pending requests
map. The timer is cleared and the promise is resolved or rejected.
If no response arrives within the timeout, the timer fires, the pending request
is removed, and the promise is rejected with a BrokerTimeoutError.
When post() or postAndWaitForAck() is called:
- Look up the queue's numeric
queueIdfrom the URI. - Assign a 24-bit
correlationId(wraps at 0xFFFFFF). - Encode message properties (if any) using the MessageProperties format.
- Build the PUT event: PutHeader (36 bytes) + properties + payload + padding.
- Compute CRC32-C checksum of the padded application data.
- Send the event via
BmqConnection.send().
If postAndWaitForAck() was used, a PendingAck is stored with the correlationId
and a timeout timer. The promise resolves when the matching ACK arrives.
When an ACK event arrives from the broker:
- Parse the ACK event → extract
status,correlationId,guid,queueId. - Map the wire status code (4-bit) to an
AckResultenum value. - Look up the
PendingAckbycorrelationId. - Clear the timeout timer and invoke the stored callback.
- Emit an
'ack'event on the Session for any listeners.
When a PUSH event arrives:
- Parse the event → extract
queueId,guid,payload,properties,compressionType. - If compressed (ZLIB), decompress the application data.
- If the MESSAGE_PROPERTIES flag is set, decode properties from the application data.
- Look up the
queueUrifrom thequeueId. - Create a
Messageobject and aMessageHandle(with aconfirm()closure). - Invoke the registered message callback.
- Emit a
'message'event on the Session.
When handle.confirm() (or Session.confirm()) is called:
- Build a CONFIRM event with the
queueIdandguid. - Send via
BmqConnection.send().
The broker uses CONFIRM messages to track which messages have been processed, enabling at-least-once delivery semantics.
After negotiation, the Session starts a heartbeat monitor:
- The broker sends
HEARTBEAT_REQevents at a configured interval (e.g., 3 seconds). - The Session immediately responds with
HEARTBEAT_RSP. - The monitor checks if heartbeats are being received. If too many are missed
(based on the broker's
maxMissedHeartbeatsconfiguration), aCONNECTION_LOSTsession event is emitted.
The heartbeat interval timer uses unref() so it doesn't prevent the Node.js
process from exiting naturally.
When stop() is called:
- Close all open queues — sends configureStream (empty) and closeQueue for each.
- Send disconnect — notifies the broker of the intentional disconnection.
- Stop heartbeat monitor — clears the interval timer.
- Disconnect TCP — destroys the socket.
- Clean up state — clears all maps, rejects pending requests with "Session stopped".
This ordering ensures the broker properly cleans up its resources for this client.
Errors in the Session can arise from:
- TCP errors — forwarded from
BmqConnection→ SessionEvent(ERROR). - Protocol errors — malformed events → logged as SessionEvent(ERROR), not thrown.
- Timeout errors — pending requests/ACKs timeout → promise rejection.
- Broker rejections — control response indicates failure →
BrokerRefusedError. - Callback errors — caught and emitted as SessionEvent(ERROR).
The Session never throws from event handlers — errors are always channeled through the SessionEvent mechanism or promise rejections.