Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions synapse/client/protobuf_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from typing import Optional, Type, TypeVar
from google.protobuf.message import Message

# Generic type for protobuf messages
T = TypeVar("T", bound=Message)


def parse_protobuf(data: bytes, message_type: Type[T]) -> Optional[T]:
"""Parse raw bytes into a protobuf message of the specified type.

Args:
data (bytes): Raw binary data.
message_type (Type[T]): The protobuf message class to use for parsing.

Returns:
Optional[T]: The parsed protobuf message, or None if parsing failed.
"""
if not data:
return None

try:
message = message_type()
message.ParseFromString(data)
return message
except Exception:
return None
172 changes: 172 additions & 0 deletions synapse/client/taps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
import logging
import zmq
from typing import Optional, Generator

from synapse.api.query_pb2 import QueryRequest
from synapse.api.status_pb2 import StatusCode


class Tap(object):
def __init__(self, uri, verbose=False):
"""Initialize a Tap client to connect to the Synapse device.

Args:
uri (str): The URI of the Synapse device.
verbose (bool, optional): Whether to enable verbose logging. Defaults to False.
"""
self.uri = uri
self.verbose = verbose
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.DEBUG if verbose else logging.INFO)

# ZMQ context (will be initialized upon connection)
self.zmq_context = None
self.zmq_socket = None
self.connected_tap = None

def list_taps(self):
"""List all available taps on the device.

Returns:
list: List of TapConnection objects.
"""
from synapse.client.device import Device

device = Device(self.uri, self.verbose)

request = QueryRequest()
request.query_type = QueryRequest.kListTaps
request.list_taps_query.SetInParent()

response = device.query(request)

if not response or response.status.code != StatusCode.kOk:
self.logger.error(
f"Failed to list taps: {response.status.message if response else 'No response'}"
)
return []

return response.list_taps_response.taps

def connect(self, name: str) -> bool:
"""Connect to a specific tap by name.

Args:
name (str): The name of the tap to connect to.

Returns:
bool: True if connected successfully, False otherwise.
"""
taps = self.list_taps()

# Find the tap with the specified name
selected_tap = None
for tap in taps:
if tap.name == name:
selected_tap = tap
break

if not selected_tap:
self.logger.error(f"Tap '{name}' not found")
return False

# Store the connected tap
self.connected_tap = selected_tap

# Initialize ZMQ context and socket
self.zmq_context = zmq.Context()
self.zmq_socket = self.zmq_context.socket(zmq.SUB)

# Replace the endpoint with our device URI if needed
endpoint = selected_tap.endpoint
if "://" in endpoint:
# Extract the protocol and port
protocol, address = endpoint.split("://")
_, port = address.split(":")

# Use the device URI with the same port
endpoint = f"{protocol}://{self.uri.split(':')[0]}:{port}"

try:
print(f"Connecting to tap '{name}' at {endpoint}")
self.zmq_socket.connect(endpoint)
self.zmq_socket.setsockopt(zmq.SUBSCRIBE, b"") # Subscribe to all messages
return True
except zmq.ZMQError as e:
self.logger.error(f"Failed to connect to tap: {e}")
self._cleanup()
return False

def read(self, timeout_ms: int = 1000) -> Optional[bytes]:
"""Read raw data from the tap with timeout.

Args:
timeout_ms (int, optional): Timeout in milliseconds. Defaults to 1000.

Returns:
Optional[bytes]: Raw message data or None if timeout/error.
"""
if not self.zmq_socket:
self.logger.error("Not connected to any tap")
return None

try:
# Set socket timeout
self.zmq_socket.setsockopt(zmq.RCVTIMEO, timeout_ms)

# Receive data (will timeout if no data available)
return self.zmq_socket.recv()
except zmq.Again:
# Timeout occurred
return None
except zmq.ZMQError as e:
self.logger.error(f"Error receiving message: {e}")
return None

def stream(self, timeout_ms: int = 1000) -> Generator[bytes, None, None]:
"""Stream raw data from the tap.

Args:
timeout_ms (int, optional): Timeout between messages in milliseconds. Defaults to 1000.

Yields:
Generator[bytes, None, None]: Stream of raw message data.
"""
if not self.zmq_socket:
self.logger.error("Not connected to any tap")
return

# Set socket timeout
self.zmq_socket.setsockopt(zmq.RCVTIMEO, timeout_ms)

try:
while True:
try:
data = self.zmq_socket.recv()
yield data
except zmq.Again:
# Timeout occurred, continue to next iteration
continue
except KeyboardInterrupt:
self.logger.info("Stream interrupted")
except zmq.ZMQError as e:
self.logger.error(f"Error streaming messages: {e}")
finally:
# Don't close the socket here, let the user call disconnect()
pass

def disconnect(self):
"""Disconnect from the tap."""
self._cleanup()

def _cleanup(self):
"""Clean up ZMQ resources."""
if self.zmq_socket:
self.zmq_socket.close()
self.zmq_socket = None

if self.zmq_context:
self.zmq_context.term()
self.zmq_context = None

self.connected_tap = None
187 changes: 187 additions & 0 deletions synapse/examples/tap_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
#!/usr/bin/env python3
import sys
import argparse
import binascii
import importlib
from typing import Type

from synapse.client.taps import Tap
from synapse.client.protobuf_helpers import parse_protobuf
from google.protobuf.message import Message


def load_message_type(message_type_path: str) -> Type[Message]:
"""Load a protobuf message type from its fully qualified path.

Args:
message_type_path (str): The fully qualified path to the message type (e.g. 'synapse.api.datatype_pb2.BroadbandFrame')

Returns:
Type[Message]: The message type class
"""
parts = message_type_path.split(".")
class_name = parts[-1]
module_name = ".".join(parts[:-1])

try:
module = importlib.import_module(module_name)
return getattr(module, class_name)
except (ImportError, AttributeError) as e:
raise ImportError(f"Could not load message type {message_type_path}: {e}")


def format_data(data):
"""Format raw bytes for display."""
if isinstance(data, bytes) or isinstance(data, bytearray):
# For binary data, show a hex summary
if len(data) > 100:
hex_data = binascii.hexlify(data[:50]).decode("ascii")
return f"Binary data ({len(data)} bytes): {hex_data}... [truncated]"
else:
hex_data = binascii.hexlify(data).decode("ascii")
return f"Binary data ({len(data)} bytes): {hex_data}"
else:
# For other types, use default string representation
return str(data)


def main():
parser = argparse.ArgumentParser(
description="Example for using the Synapse Tap API"
)
parser.add_argument(
"--uri", "-u", type=str, required=True, help="URI of the Synapse device"
)
parser.add_argument("--name", "-n", type=str, help="Name of the tap to connect to")
parser.add_argument(
"--verbose", "-v", action="store_true", help="Enable verbose logging"
)
parser.add_argument(
"--timeout",
"-t",
type=int,
default=5000,
help="Timeout in milliseconds (default: 5000)",
)
parser.add_argument(
"--count",
"-c",
type=int,
default=None,
help="Number of messages to receive (default: infinite)",
)
parser.add_argument(
"--message-type",
"-m",
type=str,
help="Fully qualified message type path (e.g. 'synapse.api.datatype_pb2.BroadbandFrame')",
)
args = parser.parse_args()

# Create a tap client
tap = Tap(args.uri, args.verbose)

# List available taps
print("Available taps:")
taps = tap.list_taps()

if not taps:
print("No taps found")
return 1

for i, tap_info in enumerate(taps):
print(
f" {i + 1}. {tap_info.name} - Type: {tap_info.message_type}, Endpoint: {tap_info.endpoint}"
)

# Determine which tap to connect to
tap_name = args.name
if not tap_name:
# If name not provided, ask the user to select one
try:
selection = int(input("\nSelect a tap by number: "))
if 1 <= selection <= len(taps):
tap_name = taps[selection - 1].name
else:
print(f"Invalid selection: {selection}")
return 1
except ValueError:
print("Invalid input. Please enter a number.")
return 1

# Load message type if provided
message_type_class = None
if args.message_type:
try:
message_type_class = load_message_type(args.message_type)
print(f"Using message type: {message_type_class.__name__}")
except ImportError as e:
print(f"Warning: {e}")
print("Will use raw data instead")

# Connect to the selected tap
print(f"\nConnecting to tap: {tap_name}")
if not tap.connect(tap_name):
print(f"Failed to connect to tap: {tap_name}")
return 1

print("Connected successfully!")

try:
if args.count:
# Receive a specific number of messages
print(f"\nReceiving {args.count} messages (timeout: {args.timeout}ms):")
for i in range(args.count):
raw_data = tap.read(args.timeout)
if raw_data:
print(f"Message {i + 1}:")

# Parse data if message type was provided
if message_type_class:
parsed = parse_protobuf(raw_data, message_type_class)
if parsed:
print(f"Parsed message: {parsed}")
else:
print("Failed to parse as protobuf message")
print(f"Raw data: {format_data(raw_data)}")
else:
print(f"Raw data: {format_data(raw_data)}")

print()
else:
print(f"Timeout waiting for message {i + 1}")
else:
# Stream messages until interrupted
print("\nStreaming messages (Ctrl+C to stop):")
count = 0
for raw_data in tap.stream(args.timeout):
count += 1
print(f"Message {count}:")

# Parse data if message type was provided
if message_type_class:
parsed = parse_protobuf(raw_data, message_type_class)
if parsed:
print(f"Parsed message: {parsed}")
else:
print("Failed to parse as protobuf message")
print(f"Raw data: {format_data(raw_data)}")
else:
print(f"Raw data: {format_data(raw_data)}")

print()

except KeyboardInterrupt:
print("\nStream interrupted by user")

finally:
# Clean up
print("\nDisconnecting...")
tap.disconnect()
print("Done")

return 0


if __name__ == "__main__":
sys.exit(main())