Skip to content

Refactor replay_mqtt_messages to single-threaded loop with hybrid busy-wait#38

Closed
Copilot wants to merge 2 commits intomainfrom
copilot/refactor-replay-mqtt-messages
Closed

Refactor replay_mqtt_messages to single-threaded loop with hybrid busy-wait#38
Copilot wants to merge 2 commits intomainfrom
copilot/refactor-replay-mqtt-messages

Conversation

Copy link
Copy Markdown
Contributor

Copilot AI commented Mar 4, 2026

The existing threading.Timer-per-message approach exhausts OS thread limits for large recordings, causing RuntimeError: can't start new thread and dropped messages.

Changes

  • Removed threading — replaced with a sequential loop that blocks until each message's target send time before publishing.
  • Drift-free timing — target times computed as t_start + accumulated_delay / REPLAY_SPEED (absolute offsets from start, not relative delays), preventing compounding timing errors.
  • Hybrid wait — sleeps for gaps > 10ms (BUSY_WAIT_THRESHOLD), then yields with time.sleep(0) for sub-10ms precision without pinning a CPU core.
  • High-throughput QoS=1MAX_INFLIGHT_MESSAGES = 30000 prevents the Paho client from blocking on the inflight queue during bursts.
  • Throttled output — console prints rate-limited to PRINT_INTERVAL = 0.5s to avoid stdout I/O becoming a bottleneck.
  • File-not-found early return — original code would NameError on path after printing the error; now returns cleanly.
# Before: one thread per message
threading.Timer(replay_delay, send_message, args=(...)).start()

# After: single loop, blocks until target time, then publishes synchronously
target_time = t_start + accumulated_delay / REPLAY_SPEED
remaining = target_time - time.perf_counter()
if remaining > BUSY_WAIT_THRESHOLD:
    time.sleep(remaining - BUSY_WAIT_THRESHOLD)
while time.perf_counter() < target_time:
    time.sleep(0)
topic, payload_len = send_message_sync(publish_client, ...)
Original prompt

The current implementation of replay_mqtt_messages in record/replay.py uses threading.Timer to schedule message publishing. For large files or high-frequency messages, this creates thousands of threads, leading to resource exhaustion (e.g., RuntimeError: can't start new thread) and dropped messages.

This PR replaces the multi-threaded approach with a single-threaded sequential loop using a hybrid busy-wait mechanism. This ensures:

  1. Reliability: No thread exhaustion, even for millions of messages.
  2. Precision: Uses a hybrid sleep (for >10ms gaps) and busy-wait (for <10ms gaps) to achieve sub-millisecond timing accuracy.
  3. Drift Prevention: Calculates target send times relative to the start time, preventing accumulated timing errors.
  4. Performance: Increases max_inflight_messages_set to 30,000 to allow high throughput for QoS=1 messages without blocking.
  5. Efficiency: Throttles console output to prevent I/O blocking.

Changes:

  • Removed threading usage.
  • Implemented send_message_sync for direct synchronous publishing.
  • Rewrote replay_mqtt_messages to use a high-resolution monotonic clock (time.perf_counter).
  • Added publish_client.max_inflight_messages_set(30000).

This pull request was created from Copilot chat.


✨ Let Copilot coding agent set things up for you — coding agent works faster and does higher quality work when set up for your repo.

…readed loop

Co-authored-by: prasadtalasila <9206466+prasadtalasila@users.noreply.github.com>
Copilot AI changed the title [WIP] Refactor replay_mqtt_messages to use single-threaded loop Refactor replay_mqtt_messages to single-threaded loop with hybrid busy-wait Mar 4, 2026
Comment thread record/replay.py
else:
raise ValueError("Invalid payload format")

qos = record.get("qos", 1)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This statement can be changed to

 qos = record.get("qos", 0)

if message dropping is seen

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants