Skip to content

🧠 (SCS/EEE) Avoid re-locking ledger mutex during batch flush) Ensure conduit keys exist even when floodgate is closed #35

@OppaAI

Description

@OppaAI

⚠️ Issue: Deadlock on Batch Flush (Non-Reentrant Lock)

The _async_worker thread currently triggers a deadlock when the _ledger_queue reaches BATCH_SIZE. The worker acquires _ledger_lock to append the data, then calls _flush_ledger_batch() which immediately attempts to acquire the same lock. Since the lock is not reentrant, the logging thread hangs indefinitely.

🎯 Location:

robot/scs/eee.py -> _async_worker()
robot/scs/eee.py -> _flush_ledger_batch()

🦠 Symptoms:

  • System Freeze: Log processing stops entirely once the 50th message (the BATCH_SIZE threshold) is reached.
  • Shutdown Stall: The system fails to exit because EEEAggregator.shutdown() waits for a worker thread that is deadlocked.
  • Missing Data: Logs are captured in the master log (text) but never appear in the SQLite Ledger after the first batch fills.
  • Bursty Failure: This only triggers during high-volume events—exactly when you need the logs the most to figure out what's breaking.

🩺 Diagnosis:

The system is suffering from Lock Contention (Self-Acquisition). In Python, a standard threading.Lock cannot be acquired multiple times by the same thread. By calling a "Lock-Protected" method from within a "Lock-Protected" block, we’ve created a circular dependency on a single resource.

💡 Proposal:

Atomic Queue Swap or Recursive Locking

  • Option A (Preferred for Performance): Refactor the worker to minimize "Lock Time." Instead of holding the lock during the entire flush, use the lock only to "swap" the queue.
  • with EEEAggregator._ledger_lock:
# Just take the data and get out!
    batch_to_flush = list(EEEAggregator._ledger_queue)
    EEEAggregator._ledger_queue.clear()

if batch_to_flush:
    # Perform the slow SQLite I/O WITHOUT holding the lock
    EEEAggregator._perform_sqlite_insert(batch_to_flush)
  • Option B (The Quick Fix): Change _ledger_lock = threading.Lock() to threading.RLock(). This allows the same thread to acquire the lock multiple times without hanging.
  • Internal/External Pattern: Rename the current flush to _flush_ledger_batch_unlocked() and have the public _flush_ledger_batch() wrap it in a lock. The worker can then call the unlocked version directly.

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Projects

Relationships

None yet

Development

No branches or pull requests

Issue actions