Disclaimer: This is an unofficial, community-maintained SDK. It is not affiliated with, endorsed by, or supported by Bloomberg LP.
A pure-JavaScript Node.js SDK for BlazingMQ — Bloomberg's high-performance, open-source message queue system.
This SDK implements the BlazingMQ binary wire protocol natively over TCP. No C++ bindings,
no native modules, no node-gyp — just TypeScript that speaks the protocol directly.
npm install blazingmq-node
- Quick Start
- How It Works
- API Reference
- Configuration
- Error Handling
- Examples
- Running Tests
- Running the Broker
- Contributing
- License
import { Producer, AckResult } from 'blazingmq-node';
const producer = new Producer({ broker: 'tcp://localhost:30114' });
await producer.start();
await producer.openQueue('bmq://bmq.test.mem.priority/my-queue');
const ack = await producer.publishAndWait({
queueUri: 'bmq://bmq.test.mem.priority/my-queue',
payload: 'Hello, World!',
properties: {
eventType: 'greeting',
priority: 1,
isVip: true,
},
});
console.log('ACK:', AckResult[ack.status]); // "SUCCESS"
await producer.stop();import { Consumer, PropertyType } from 'blazingmq-node';
const consumer = new Consumer({
broker: 'tcp://localhost:30114',
onMessage: (msg, handle) => {
console.log('Received:', msg.data.toString());
for (const [key, entry] of msg.properties) {
console.log(` ${key}: ${entry.value} (${PropertyType[entry.type]})`);
}
handle.confirm(); // Acknowledge the message
},
});
await consumer.start();
await consumer.subscribe({
queueUri: 'bmq://bmq.test.mem.priority/my-queue',
options: { maxUnconfirmedMessages: 1024, consumerPriority: 1 },
});import { Consumer } from 'blazingmq-node';
const consumer = new Consumer({ broker: 'tcp://localhost:30114' });
await consumer.start();
await consumer.subscribe({ queueUri: 'bmq://bmq.test.mem.priority/my-queue' });
for await (const { message, handle } of consumer) {
console.log(message.data.toString());
handle.confirm();
}The SDK is built in four layers, each with a clear responsibility:
┌─────────────────────────────────────────────────────┐
│ Layer 4: High-Level APIs │
│ Producer · Consumer · Admin │
├─────────────────────────────────────────────────────┤
│ Layer 3: Session │
│ (Negotiation, queue lifecycle, ACK correlation, │
│ publish/subscribe, heartbeat) │
├─────────────────────────────────────────────────────┤
│ Layer 2: Protocol Codec │
│ (Binary encoding/decoding for all event types) │
├─────────────────────────────────────────────────────┤
│ Layer 1: Transport │
│ (TCP socket, event framing, stream reassembly) │
└─────────────────────────────────────────────────────┘
│
TCP Socket
│
┌─────────┴─────────┐
│ BlazingMQ Broker │
└───────────────────┘
Layer 1 (Transport) manages the raw TCP connection. It accumulates incoming bytes and reassembles them into complete BlazingMQ events using the 8-byte EventHeader framing protocol.
Layer 2 (Protocol Codec) is a stateless library of pure functions that encode/decode every binary message type: EventHeaders, PUT/PUSH/ACK/CONFIRM messages, control events (JSON), message properties, CRC32-C checksums, and 4-byte-aligned padding.
Layer 3 (Session) is the protocol state machine. It handles the negotiation handshake, manages queue lifecycle (open/configure/close), publishes messages with correlation IDs for ACK matching, dispatches incoming PUSH messages to callbacks, and responds to heartbeats.
Layer 4 (High-Level APIs) wraps the Session into ergonomic, task-oriented interfaces: Producer (publish), Consumer (subscribe with callbacks or async iterators), and Admin (queue management).
BlazingMQ uses a custom binary protocol over TCP. Every transmission is an event — a discrete message with an 8-byte header:
Byte: 0 1 2 3 4 5 6 7
┌───────────────────────────┬───────┬───────┬───────┬───────┐
│ Fragment(1) │ Length(31) │PV│Type│ HdrW │ TySpc │ Rsvd │
└───────────────────────────┴───────┴───────┴───────┴───────┘
The Length field (31 bits, big-endian) tells the transport layer how many bytes to accumulate before it has a complete event. The Type field (6 bits) determines how the payload is encoded:
| Type | Name | Direction | Encoding | Purpose |
|---|---|---|---|---|
| 1 | CONTROL | Bidirectional | JSON | Negotiation, queue ops, disconnect |
| 2 | PUT | Client→Broker | Binary | Publish a message |
| 3 | CONFIRM | Client→Broker | Binary | Confirm a received message |
| 4 | PUSH | Broker→Client | Binary | Deliver a message to consumer |
| 5 | ACK | Broker→Client | Binary | Confirm a published message |
| 11 | HEARTBEAT_REQ | Broker→Client | None | Liveness check (header only) |
| 12 | HEARTBEAT_RSP | Client→Broker | None | Liveness response (header only) |
Control events carry JSON payloads (UTF-8, forward-slashes escaped, padded
to 4-byte boundaries). The first exchange is always a negotiation: the client
sends clientIdentity, the broker responds with brokerResponse.
Binary events (PUT, PUSH, ACK, CONFIRM) use fixed-size headers with bit-packed fields. For example, a PUT message has a 36-byte header containing flags, queue ID, 24-bit correlation ID, CRC32-C checksum, and compression type.
All data is big-endian and 4-byte aligned using a fill-with-count
padding scheme: padding bytes contain their own count (e.g., 3 bytes of
0x03 for 3-byte padding).
📖 For the complete wire format specification, see docs/protocol.md.
The transport layer solves TCP stream reassembly. TCP delivers a continuous
byte stream, but BlazingMQ communicates in discrete events. The BmqConnection
class:
- Accumulates incoming TCP data into a buffer.
- Reads the 4-byte length from the EventHeader.
- Waits until the full event has arrived.
- Extracts the event and emits it via Node.js
EventEmitter. - Loops to handle any remaining data.
This is a classic length-prefixed framing state machine. The connection also supports automatic reconnection with exponential backoff.
📖 See docs/transport.md for details.
The Session is the protocol state machine:
- Negotiation — Exchanges
clientIdentity/brokerResponseon connect. - Queue lifecycle — open (register + configure subscription), close
(deconfigure + deregister), using JSON control messages with
rIdcorrelation. - Publishing — Encodes PUT events with 24-bit correlation IDs. When the broker ACKs, the ID is matched to the original publish callback.
- Consuming — Parses PUSH events, looks up the queue URI, and invokes the registered message callback.
- Heartbeat — Immediately responds to HEARTBEAT_REQ with HEARTBEAT_RSP.
📖 See docs/session.md for the full specification.
Publishing (Client → Broker → ACK):
publishAndWait("Hello")
→ encode PUT event (36-byte header + payload + CRC32-C + padding)
→ TCP write
→ broker stores message, sends ACK event
→ parse ACK, match correlationId
→ resolve promise with Ack { status: SUCCESS }
Consuming (Broker → Client → CONFIRM):
broker has message for subscribed queue
→ TCP delivers PUSH event
→ parse PushHeader, extract GUID + payload + properties
→ invoke onMessage callback
→ user calls handle.confirm()
→ encode CONFIRM event (queueId + GUID)
→ TCP write → broker marks message confirmed
const producer = new Producer(options?: ProducerOptions);| Method | Returns | Description |
|---|---|---|
start() |
Promise<void> |
Connect to broker |
stop() |
Promise<void> |
Close all queues and disconnect |
openQueue(uri, options?) |
Promise<void> |
Open a queue for writing |
closeQueue(uri) |
Promise<void> |
Close a queue |
publish(options) |
number |
Fire-and-forget publish (returns correlationId) |
publishAndWait(options, timeout?) |
Promise<Ack> |
Publish and wait for broker ACK |
publishBatch(messages) |
number[] |
Publish multiple messages |
PublishOptions:
{
queueUri?: string; // Queue URI (or use ProducerOptions.defaultQueueUri)
payload: string | Buffer; // Message payload
properties?: Record<string, boolean | number | bigint | string | Buffer>;
propertyTypeOverrides?: Record<string, PropertyType>;
compression?: CompressionAlgorithmType;
onAck?: (ack: Ack) => void; // Per-message ACK callback
}const consumer = new Consumer(options?: ConsumerOptions);| Method | Returns | Description |
|---|---|---|
start() |
Promise<void> |
Connect to broker |
stop() |
Promise<void> |
Drain, close all queues, disconnect |
subscribe(options) |
Promise<void> |
Subscribe to a queue |
unsubscribe(uri) |
Promise<void> |
Unsubscribe (drain + close) |
reconfigure(uri, options) |
Promise<void> |
Change queue options |
confirm(message) |
void |
Confirm a message |
isSubscribed(uri) |
boolean |
Check subscription status |
[Symbol.asyncIterator]() |
AsyncIterator |
Async iteration over messages |
ConsumerOptions:
{
broker?: string;
onMessage?: (message: Message, handle: MessageHandle) => void;
autoConfirm?: boolean; // Auto-confirm after callback (default: false)
maxIteratorBufferSize?: number; // Async iterator buffer limit (default: 10000)
}const admin = new Admin(options?: AdminOptions);| Method | Returns | Description |
|---|---|---|
start() |
Promise<void> |
Connect to broker |
stop() |
Promise<void> |
Close all managed queues, disconnect |
createQueue(uri, options?) |
Promise<void> |
Open queue with read+write |
deleteQueue(uri) |
Promise<void> |
Close a managed queue |
configureQueue(uri, options) |
Promise<void> |
Reconfigure queue |
drainQueue(uri) |
Promise<void> |
Pause consumption |
restoreQueue(uri, options?) |
Promise<void> |
Resume after drain |
getQueueInfo() |
QueueInfo[] |
Get all managed queue info |
pingBroker() |
boolean |
Check connection health |
The BrokerAdmin class connects to the broker's admin port for monitoring
and management operations not available through the standard client protocol.
import { BrokerAdmin } from 'blazingmq-node';
const admin = new BrokerAdmin({
host: 'localhost',
port: 30114,
timeout: 10000,
});
// Check connectivity
const isUp = await admin.ping();
// Cluster health
const clusters = await admin.listClusters();
const status = await admin.getClusterStatus(clusters[0]);
console.log('Healthy:', status.isHealthy);
console.log('Nodes:', status.nodeStatuses.length);
console.log('Leader:', status.electorInfo.leaderNode);
// Domain and queue management
const domain = await admin.getDomainInfo('bmq.test.mem.priority');
const internals = await admin.getQueueInternals('bmq.test.mem.priority', 'orders');
const purged = await admin.purgeQueue('bmq.test.mem.priority', 'orders');
// Statistics
const stats = await admin.getStats();
console.log('Clients:', stats.clientsCount, 'Queues:', stats.queuesCount);Cluster Management:
| Method | Returns | Description |
|---|---|---|
listClusters() |
Promise<string[]> |
List active clusters |
getClusterStatus(cluster) |
Promise<ClusterStatus> |
Full cluster health |
forceGcQueues(cluster) |
Promise<string> |
GC idle queues |
getClusterStorageSummary(cluster) |
Promise<ClusterStorageSummary> |
Storage usage per partition |
setPartitionState(cluster, id, enable) |
Promise<string> |
Enable/disable partition |
Domain Management:
| Method | Returns | Description |
|---|---|---|
getDomainInfo(domain) |
Promise<DomainInfo> |
Domain config, capacity, queues |
purgeDomain(domain) |
Promise<PurgeResult[]> |
Purge all queues in domain |
purgeQueue(domain, queue, appId?) |
Promise<PurgeResult> |
Purge specific queue |
getQueueInternals(domain, queue) |
Promise<QueueInternals> |
Handles, consumers, storage |
listQueueMessages(domain, queue, offset?, count?, appId?) |
Promise<QueueMessage[]> |
List messages |
reconfigureDomain(domain) |
Promise<string> |
Hot-reload domain config |
Statistics:
| Method | Returns | Description |
|---|---|---|
getStats() |
Promise<BrokerStats> |
Full broker stats (clients, queues, per-queue metrics) |
listTunables() |
Promise<string> |
List tunable parameters |
getTunable(param) |
Promise<string> |
Get tunable value |
setTunable(param, value) |
Promise<string> |
Set tunable value |
getBrokerConfig() |
Promise<BrokerConfig> |
Dump broker runtime config |
Danger Zone:
| Method | Returns | Description |
|---|---|---|
shutdown() |
Promise<string> |
Graceful broker shutdown |
terminate() |
Promise<string> |
Immediate termination |
See docs/broker-admin.md for the complete type reference and broker command documentation.
The Session class provides direct protocol access:
import { Session } from 'blazingmq-node';
const session = new Session({ broker: 'tcp://localhost:30114' });
session.setMessageCallback((msg, handle) => handle.confirm());
await session.start();
await session.openQueue({ queueUri: '...', read: true, write: true });
const ack = await session.postAndWaitForAck({ queueUri: '...', payload: 'Hello' });
await session.closeQueue('...');
await session.stop();{
broker?: string; // "tcp://host:port" (default: "tcp://localhost:30114")
reconnect?: boolean; // Auto-reconnect on disconnect (default: false)
messageCompressionAlgorithm?: CompressionAlgorithmType; // Default: NONE
timeouts?: {
connectTimeout?: number; // ms (default: 5000)
disconnectTimeout?: number; // ms (default: 5000)
openQueueTimeout?: number; // ms (default: 30000)
configureQueueTimeout?: number; // ms (default: 30000)
closeQueueTimeout?: number; // ms (default: 30000)
}
}{
maxUnconfirmedMessages?: number; // default: 1024
maxUnconfirmedBytes?: number; // default: 33554432 (32MB)
consumerPriority?: number; // default: 0 (higher = preferred)
suspendsOnBadHostHealth?: boolean; // default: false
}| Variable | Description | Default |
|---|---|---|
BMQ_BROKER_URI |
Broker URI | tcp://localhost:30114 |
bmq://<domain>/<queue-name>
Domains determine storage and routing behavior:
bmq.test.mem.priority— In-memory, priority-based (for testing)bmq.test.persistent.priority— Persistent, priority-based
The SDK provides a typed error hierarchy:
BlazingMQError — Base class for all SDK errors
├── BrokerTimeoutError — Operation timed out waiting for broker
├── ConnectionError — TCP connection or negotiation failure
├── BrokerRefusedError — Broker explicitly refused an operation
├── InvalidArgumentError — Invalid parameters passed to SDK
└── QueueError — Queue-specific operation failure
All async methods reject with typed errors. Session events provide non-fatal error notifications.
# Build first
npm run build
# End-to-end demo (producer + consumer in one script)
node examples/run-e2e.js
# Standalone producer
node examples/producer.js
# Standalone consumer (runs until Ctrl+C)
node examples/consumer.js# Unit tests (no broker required)
npm run test:unit
# Integration tests (requires broker on localhost:30114)
npm run test:integration
# All tests
npm test
# With coverage report
npm run test:coverage# From the blazingmq repository root:
docker compose -f docker/single-node/docker-compose.yaml up -dEnsure port 30114 is exposed:
services:
bmqbrkr:
ports:
- "30114:30114"src/
├── protocol/
│ ├── constants.ts # Enums, sizes, magic numbers
│ ├── codec.ts # Binary encoder/decoder (stateless pure functions)
│ └── index.ts
├── transport/
│ ├── connection.ts # TCP connection + event framing state machine
│ └── index.ts
├── session.ts # Core protocol state machine
├── producer.ts # High-level Producer API
├── consumer.ts # High-level Consumer API
├── admin.ts # High-level Admin API
├── broker-admin.ts # Broker management API (admin port commands)
├── errors.ts # Error class hierarchy
├── types.ts # TypeScript interfaces
└── index.ts # Public API barrel export
tests/
├── protocol.test.ts # Protocol codec unit tests
├── broker-admin.test.ts # BrokerAdmin unit tests (45 tests, mock TCP)
└── integration.test.ts # Integration tests (requires live broker)
docs/
├── architecture.md # Layered architecture overview
├── protocol.md # Wire protocol specification
├── transport.md # Transport layer documentation
├── session.md # Session layer documentation
└── broker-admin.md # BrokerAdmin API documentation
See CONTRIBUTING.md.