From b8d15b738bea794d0a9c64d90b2a963ab3a49e5a Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Wed, 7 May 2025 21:28:57 -0700 Subject: [PATCH 1/3] Update to using taps --- synapse-api | 2 +- synapse/cli/rpc.py | 19 +++++++++ synapse/cli/tap_listen.py | 89 +++++++++++++++++++++++++++++++++++++++ synapse/client/device.py | 9 ++++ 4 files changed, 118 insertions(+), 1 deletion(-) create mode 100644 synapse/cli/tap_listen.py diff --git a/synapse-api b/synapse-api index 8dbfdb1..25c382e 160000 --- a/synapse-api +++ b/synapse-api @@ -1 +1 @@ -Subproject commit 8dbfdb126a833161926d0bc47a6d888859901539 +Subproject commit 25c382e40fdec56f15bbac783b31c2bd189b2983 diff --git a/synapse/cli/rpc.py b/synapse/cli/rpc.py index f955c5d..0d2fc20 100644 --- a/synapse/cli/rpc.py +++ b/synapse/cli/rpc.py @@ -13,6 +13,7 @@ from rich.console import Console from rich.pretty import pprint +from rich.table import Table from synapse.cli.query import StreamingQueryClient from synapse.utils.log import log_entry_to_str @@ -39,6 +40,9 @@ def add_commands(subparsers): e.add_argument("config_file", type=str) e.set_defaults(func=configure) + e = subparsers.add_parser("taps", help="List available taps") + e.set_defaults(func=list_taps) + f = subparsers.add_parser("logs", help="Get logs from the device") f.add_argument("--output", "-o", type=str, help="Optional file to write logs to") f.add_argument( @@ -81,6 +85,21 @@ def add_commands(subparsers): f.set_defaults(func=get_logs) +def list_taps(args): + console = Console() + taps = syn.Device(args.uri, args.verbose).list_taps() + + table = Table(title="Available Taps") + table.add_column("Name", style="cyan") + table.add_column("Message Type", style="green") + table.add_column("Endpoint", style="green") + + for tap in taps.taps: + table.add_row(tap.name, tap.message_type, tap.endpoint) + + console.print(table) + + def info(args): console = Console() with console.status("Getting device information...", spinner="bouncingBall"): diff --git a/synapse/cli/tap_listen.py b/synapse/cli/tap_listen.py new file mode 100644 index 0000000..25f3ef8 --- /dev/null +++ b/synapse/cli/tap_listen.py @@ -0,0 +1,89 @@ +import zmq +import socket +import threading +import time +import argparse +import binascii + + +def listen_zmq(endpoint, topic=""): + """Listen for ZMQ messages""" + context = zmq.Context() + subscriber = context.socket(zmq.SUB) + subscriber.connect(endpoint) + + # Set subscription filter (empty = all messages) + subscriber.setsockopt_string(zmq.SUBSCRIBE, topic) + + print(f"ZMQ Subscriber connected to {endpoint} with topic '{topic or 'ALL'}'") + + while True: + try: + # If multi-part message with topic + if topic: + # Get the topic first + topic_msg = subscriber.recv() + # Then get the data + message = subscriber.recv() + else: + # Single message + message = subscriber.recv() + + print(f"Got ZMQ data: {len(message)} bytes") + # Print first 20 bytes as hex for debugging + print(f" Data preview: {binascii.hexlify(message[:20]).decode()}") + except Exception as e: + print(f"ZMQ Error: {e}") + time.sleep(1) + + +def listen_udp(ip, port): + """Listen for UDP messages""" + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.bind((ip, port)) + + print(f"UDP Listener bound to {ip}:{port}") + + while True: + try: + data, addr = sock.recvfrom(8192) # Buffer size is 8KB + print(f"Got UDP data from {addr}: {len(data)} bytes") + # Print first 20 bytes as hex for debugging + print(f" Data preview: {binascii.hexlify(data[:20]).decode()}") + except Exception as e: + print(f"UDP Error: {e}") + time.sleep(1) + + +def main(): + parser = argparse.ArgumentParser(description="Listen to ZMQ and UDP taps") + + # ZMQ options + parser.add_argument( + "--port", + type=str, + default="tcp://10.40.62.57", + help="ZMQ endpoint to connect to", + ) + + args = parser.parse_args() + + # Start ZMQ listener thread + connection = f"tcp://10.40.62.57:{args.port}" + zmq_topic = "" + zmq_thread = threading.Thread( + target=listen_zmq, args=(connection, zmq_topic), daemon=True + ) + zmq_thread.start() + + # Keep main thread alive + try: + print("Listening for messages (press Ctrl+C to exit)...") + while True: + time.sleep(1) + except KeyboardInterrupt: + print("Shutting down...") + + +if __name__ == "__main__": + main() diff --git a/synapse/client/device.py b/synapse/client/device.py index 3b292e2..20911b8 100644 --- a/synapse/client/device.py +++ b/synapse/client/device.py @@ -16,6 +16,7 @@ from synapse.api.synapse_pb2_grpc import SynapseDeviceStub from synapse.client.config import Config from synapse.utils.log import log_level_to_pb +from synapse.api.tap_pb2 import ListTapsRequest DEFAULT_SYNAPSE_PORT = 647 @@ -185,6 +186,14 @@ def stream_query( self.logger.error(f"Error during StreamQuery: {str(e)}") yield StreamQueryResponse(code=StatusCode.kQueryFailed) + def list_taps(self) -> ListTapResponse: + try: + request = ListTapsRequest() + return self.rpc.ListTaps(request) + except grpc.RpcError as e: + self.logger.error("Error: %s", e.details) + return None + def _handle_status_response(self, status): if status.code != StatusCode.kOk: self.logger.error("Error %d: %s", status.code, status.message) From 112aba43e8b4b51a2d251197d347373ba4b1bd11 Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Thu, 8 May 2025 16:15:36 -0700 Subject: [PATCH 2/3] Updated to use the new taps api --- .gitignore | 1 + synapse-api | 2 +- synapse/cli/rpc.py | 12 ++++++++---- synapse/client/device.py | 9 --------- 4 files changed, 10 insertions(+), 14 deletions(-) diff --git a/.gitignore b/.gitignore index dbe68da..da7ebd1 100644 --- a/.gitignore +++ b/.gitignore @@ -184,3 +184,4 @@ output_*.json *.jsonl .scienv synapse_data* +.synapse_deploy_cache.json diff --git a/synapse-api b/synapse-api index 25c382e..c3d730e 160000 --- a/synapse-api +++ b/synapse-api @@ -1 +1 @@ -Subproject commit 25c382e40fdec56f15bbac783b31c2bd189b2983 +Subproject commit c3d730e6fac727b1ce2d3eef66202f185d736a37 diff --git a/synapse/cli/rpc.py b/synapse/cli/rpc.py index 0d2fc20..9b297e1 100644 --- a/synapse/cli/rpc.py +++ b/synapse/cli/rpc.py @@ -87,14 +87,18 @@ def add_commands(subparsers): def list_taps(args): console = Console() - taps = syn.Device(args.uri, args.verbose).list_taps() - table = Table(title="Available Taps") + request = QueryRequest() + request.query_type = QueryRequest.kListTaps + request.list_taps_query.SetInParent() + response = syn.Device(args.uri, args.verbose).query(request) + + table = Table(title="Available Taps", show_lines=True) table.add_column("Name", style="cyan") table.add_column("Message Type", style="green") - table.add_column("Endpoint", style="green") + table.add_column("Endpoint (will be abstracted)", style="green") - for tap in taps.taps: + for tap in response.list_taps_response.taps: table.add_row(tap.name, tap.message_type, tap.endpoint) console.print(table) diff --git a/synapse/client/device.py b/synapse/client/device.py index 20911b8..3b292e2 100644 --- a/synapse/client/device.py +++ b/synapse/client/device.py @@ -16,7 +16,6 @@ from synapse.api.synapse_pb2_grpc import SynapseDeviceStub from synapse.client.config import Config from synapse.utils.log import log_level_to_pb -from synapse.api.tap_pb2 import ListTapsRequest DEFAULT_SYNAPSE_PORT = 647 @@ -186,14 +185,6 @@ def stream_query( self.logger.error(f"Error during StreamQuery: {str(e)}") yield StreamQueryResponse(code=StatusCode.kQueryFailed) - def list_taps(self) -> ListTapResponse: - try: - request = ListTapsRequest() - return self.rpc.ListTaps(request) - except grpc.RpcError as e: - self.logger.error("Error: %s", e.details) - return None - def _handle_status_response(self, status): if status.code != StatusCode.kOk: self.logger.error("Error %d: %s", status.code, status.message) From 5ec89c1de7d82120644acf855c61b7a3225a83ce Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Thu, 8 May 2025 16:59:22 -0700 Subject: [PATCH 3/3] refactor --- synapse/cli/rpc.py | 38 ++++++++--------- synapse/cli/tap_listen.py | 89 --------------------------------------- 2 files changed, 19 insertions(+), 108 deletions(-) delete mode 100644 synapse/cli/tap_listen.py diff --git a/synapse/cli/rpc.py b/synapse/cli/rpc.py index 9b297e1..b298b8a 100644 --- a/synapse/cli/rpc.py +++ b/synapse/cli/rpc.py @@ -85,25 +85,6 @@ def add_commands(subparsers): f.set_defaults(func=get_logs) -def list_taps(args): - console = Console() - - request = QueryRequest() - request.query_type = QueryRequest.kListTaps - request.list_taps_query.SetInParent() - response = syn.Device(args.uri, args.verbose).query(request) - - table = Table(title="Available Taps", show_lines=True) - table.add_column("Name", style="cyan") - table.add_column("Message Type", style="green") - table.add_column("Endpoint (will be abstracted)", style="green") - - for tap in response.list_taps_response.taps: - table.add_row(tap.name, tap.message_type, tap.endpoint) - - console.print(table) - - def info(args): console = Console() with console.status("Getting device information...", spinner="bouncingBall"): @@ -267,3 +248,22 @@ def parse_datetime(time_str: Optional[str]) -> Optional[datetime]: finally: if output_file: output_file.close() + + +def list_taps(args): + console = Console() + + request = QueryRequest() + request.query_type = QueryRequest.kListTaps + request.list_taps_query.SetInParent() + response = syn.Device(args.uri, args.verbose).query(request) + + table = Table(title="Available Taps", show_lines=True) + table.add_column("Name", style="cyan") + table.add_column("Message Type", style="green") + table.add_column("Endpoint", style="green") + + for tap in response.list_taps_response.taps: + table.add_row(tap.name, tap.message_type, tap.endpoint) + + console.print(table) diff --git a/synapse/cli/tap_listen.py b/synapse/cli/tap_listen.py deleted file mode 100644 index 25f3ef8..0000000 --- a/synapse/cli/tap_listen.py +++ /dev/null @@ -1,89 +0,0 @@ -import zmq -import socket -import threading -import time -import argparse -import binascii - - -def listen_zmq(endpoint, topic=""): - """Listen for ZMQ messages""" - context = zmq.Context() - subscriber = context.socket(zmq.SUB) - subscriber.connect(endpoint) - - # Set subscription filter (empty = all messages) - subscriber.setsockopt_string(zmq.SUBSCRIBE, topic) - - print(f"ZMQ Subscriber connected to {endpoint} with topic '{topic or 'ALL'}'") - - while True: - try: - # If multi-part message with topic - if topic: - # Get the topic first - topic_msg = subscriber.recv() - # Then get the data - message = subscriber.recv() - else: - # Single message - message = subscriber.recv() - - print(f"Got ZMQ data: {len(message)} bytes") - # Print first 20 bytes as hex for debugging - print(f" Data preview: {binascii.hexlify(message[:20]).decode()}") - except Exception as e: - print(f"ZMQ Error: {e}") - time.sleep(1) - - -def listen_udp(ip, port): - """Listen for UDP messages""" - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - sock.bind((ip, port)) - - print(f"UDP Listener bound to {ip}:{port}") - - while True: - try: - data, addr = sock.recvfrom(8192) # Buffer size is 8KB - print(f"Got UDP data from {addr}: {len(data)} bytes") - # Print first 20 bytes as hex for debugging - print(f" Data preview: {binascii.hexlify(data[:20]).decode()}") - except Exception as e: - print(f"UDP Error: {e}") - time.sleep(1) - - -def main(): - parser = argparse.ArgumentParser(description="Listen to ZMQ and UDP taps") - - # ZMQ options - parser.add_argument( - "--port", - type=str, - default="tcp://10.40.62.57", - help="ZMQ endpoint to connect to", - ) - - args = parser.parse_args() - - # Start ZMQ listener thread - connection = f"tcp://10.40.62.57:{args.port}" - zmq_topic = "" - zmq_thread = threading.Thread( - target=listen_zmq, args=(connection, zmq_topic), daemon=True - ) - zmq_thread.start() - - # Keep main thread alive - try: - print("Listening for messages (press Ctrl+C to exit)...") - while True: - time.sleep(1) - except KeyboardInterrupt: - print("Shutting down...") - - -if __name__ == "__main__": - main()