Heuristic record end of cycle log time.#351
Conversation
Greptile SummaryThis PR adds a heuristic mechanism to the gRPC log aggregation server to detect and record when a per-cycle application log file has finished receiving data, emitting an
Confidence Score: 5/5Safe to merge. The heuristic completion logic is consistent with the dual-lock design, the new shutdown paths are correctly exception-isolated, and CycleInfoReporter.shutdown() is idempotent so the multi-call pattern on the signal path causes no functional harm. No defects were found on any changed path. The cycle-log completion detection correctly defers marking a cycle complete until it is no longer the latest, and the shutdown path forces completion for all cycles including the current one. Lock discipline is maintained throughout the new scan method, and the test suite covers the key edge cases. grpc_log_server.py is the most complex file in the diff and is worth a careful read, particularly the interaction between buffer_lock acquisition in _scan_cycle_log_completions and the shutdown flush sequence, but no issues were found there. Important Files Changed
Sequence Diagram%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
participant W as Worker Node
participant GS as LogAggregationServicer
participant PF as PeriodicFlushThread
participant CIR as CycleInfoReporter
W->>GS: StreamLogs(train_cycle3.log chunk)
GS->>GS: "last_chunk_time = now (under buffer_lock)"
GS->>GS: append to buffer
PF->>GS: _flush_file_buffer_locked(train_cycle3.log)
GS->>GS: "last_flush_time = now (under buffer_lock)"
PF->>GS: _scan_cycle_log_completions()
GS->>GS: skip: cycle3 is latest in family
W->>GS: StreamLogs(train_cycle4.log chunk)
GS->>GS: "latest_cycle_by_family[train.log] = 4"
PF->>GS: _scan_cycle_log_completions()
GS->>GS: "cycle3 < latest(4) → eligible"
GS->>GS: "scan_time - last_chunk_time >= idle_timeout?"
GS-->>GS: emit Heuristic cycle log complete: train_cycle3.log
Note over GS,CIR: On shutdown (SignalException or normal exit)
CIR->>CIR: shutdown_cycle_info_reporter_safely()
CIR->>CIR: report_cycle_end()
GS->>GS: "_scan_cycle_log_completions(include_latest=True, force_idle=True)"
GS-->>GS: emit Heuristic cycle log complete: train_cycle4.log
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
participant W as Worker Node
participant GS as LogAggregationServicer
participant PF as PeriodicFlushThread
participant CIR as CycleInfoReporter
W->>GS: StreamLogs(train_cycle3.log chunk)
GS->>GS: "last_chunk_time = now (under buffer_lock)"
GS->>GS: append to buffer
PF->>GS: _flush_file_buffer_locked(train_cycle3.log)
GS->>GS: "last_flush_time = now (under buffer_lock)"
PF->>GS: _scan_cycle_log_completions()
GS->>GS: skip: cycle3 is latest in family
W->>GS: StreamLogs(train_cycle4.log chunk)
GS->>GS: "latest_cycle_by_family[train.log] = 4"
PF->>GS: _scan_cycle_log_completions()
GS->>GS: "cycle3 < latest(4) → eligible"
GS->>GS: "scan_time - last_chunk_time >= idle_timeout?"
GS-->>GS: emit Heuristic cycle log complete: train_cycle3.log
Note over GS,CIR: On shutdown (SignalException or normal exit)
CIR->>CIR: shutdown_cycle_info_reporter_safely()
CIR->>CIR: report_cycle_end()
GS->>GS: "_scan_cycle_log_completions(include_latest=True, force_idle=True)"
GS-->>GS: emit Heuristic cycle log complete: train_cycle4.log
Reviews (4): Last reviewed commit: "Address review feedback for log completi..." | Re-trigger Greptile |
e7ed6d9 to
3a373cf
Compare
No description provided.