Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

setup(
name="science-synapse",
version="2.2.7",
version="2.2.8",
description="Client library and CLI for the Synapse API",
author="Science Team",
author_email="team@science.xyz",
Expand Down
2 changes: 1 addition & 1 deletion synapse-api
45 changes: 43 additions & 2 deletions synapse/cli/taps.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from synapse.client.taps import Tap
from synapse.api.logging_pb2 import LogEntry
from synapse.utils.log import log_entry_to_str

from rich.console import Console
from rich.table import Table
Expand All @@ -16,19 +18,31 @@ def __init__(self, console: Console):
self.message_count = 0
self.total_bytes = 0
self.start_time = None
self.last_log_entry = None # Store the last decoded log entry

def start(self):
"""Start the monitoring session."""
self.start_time = time.time()
self.message_count = 0
self.total_bytes = 0
self.last_log_entry = None

def update(self, message_size: int) -> Text:
def update(self, message_size: int, message_data: bytes = None) -> Text:
"""Update statistics with a new message and return formatted display text."""
current_time = time.time()
self.message_count += 1
self.total_bytes += message_size

# Try to decode as LogEntry if message_data is provided
if message_data:
try:
log_entry = LogEntry()
log_entry.ParseFromString(message_data)
self.last_log_entry = log_entry
except Exception:
# Not a LogEntry or failed to parse, ignore
pass

# Calculate stats
elapsed_time = current_time - self.start_time
msgs_per_sec = self.message_count / elapsed_time if elapsed_time > 0 else 0
Expand Down Expand Up @@ -66,6 +80,14 @@ def _create_display_text(
stats_text.append(f"{latest_size:,} bytes", style="magenta")
stats_text.append(" | Runtime: ", style="bold")
stats_text.append(f"{time.time() - self.start_time:.1f}s", style="blue")

# Add log entry information if available
if self.last_log_entry:
stats_text.append("\n")
stats_text.append("Last Log: ", style="bold")
log_str = log_entry_to_str(self.last_log_entry)
stats_text.append(log_str, style="white")

return stats_text


Expand Down Expand Up @@ -108,10 +130,24 @@ def stream_taps(args):
print(f"Tap {args.tap_name} not found")
return

# Get the selected tap info to check message type
selected_tap = None
for t in taps:
if t.name == args.tap_name:
selected_tap = t
break

tap.connect(args.tap_name)

console = Console()
console.print(f"[bold cyan]Streaming tap:[/] [green]{args.tap_name}[/]")

# Show message type info if available
if selected_tap and selected_tap.message_type:
console.print(f"[dim]Message type: {selected_tap.message_type}[/]")
if selected_tap.message_type == "synapse.LogEntry":
console.print("[dim]Log messages will be decoded and displayed[/]")

console.print("[dim]Press Ctrl+C to stop[/]\n")

# Initialize health monitor
Expand All @@ -127,7 +163,12 @@ def stream_taps(args):
message_size = len(message)

# Update statistics and get formatted display
stats_text = monitor.update(message_size)
# Pass message data if this might be a LogEntry tap
message_data = None
if selected_tap and selected_tap.message_type == "synapse.LogEntry":
message_data = message

stats_text = monitor.update(message_size, message_data)

# Update the live display
live.update(stats_text)
Expand Down