diff --git a/tools/pserver/pserver.py b/tools/pserver/pserver.py index 3bf80d83..4362b038 100644 --- a/tools/pserver/pserver.py +++ b/tools/pserver/pserver.py @@ -1,13 +1,15 @@ #!/usr/bin/env python3 import socket import sys +from tkinter import SE from putils import * from pstreamer import PStreamer from pstream_json import PStreamJSON class PServer: - def __init__(self): - self.verbose = 0 + def __init__(self, host = "127.0.0.1", port = 3000): + self.host = host + self.port = port def send_stream(self, client_socket, client_address, streamer: PStreamer): """ @@ -29,7 +31,6 @@ def send_stream(self, client_socket, client_address, streamer: PStreamer): continue client_socket.sendall(data) - #pinfo("PServer", f"Sent: {data.decode('utf-8').strip()}") counter += 1 except (BrokenPipeError, ConnectionResetError) as e: @@ -39,10 +40,8 @@ def send_stream(self, client_socket, client_address, streamer: PStreamer): finally: client_socket.close() -def main(): - host = '127.0.0.1' - port = 3000 +def main(): if len(sys.argv) > 1: port = int(sys.argv[1]) diff --git a/tools/pserver/pstream_base.py b/tools/pserver/pstream_base.py index 2eedd66d..41edf96d 100644 --- a/tools/pserver/pstream_base.py +++ b/tools/pserver/pstream_base.py @@ -51,6 +51,6 @@ def put_data(self, data: bytes): def get_data(self, timeout: Optional[float] = None) -> Optional[bytes]: """Get data from the queue.""" try: - return self.data_queue.get(timeout=timeout) + return self.data_queue.get(timeout=timeout) #if timeout is None, it blocks until an item is available except queue.Empty: return None \ No newline at end of file diff --git a/tools/pserver/pstream_json.py b/tools/pserver/pstream_json.py index 926424d2..b8d93fd2 100644 --- a/tools/pserver/pstream_json.py +++ b/tools/pserver/pstream_json.py @@ -2,12 +2,15 @@ import json import time from pstream_base import PStreamBase +import random +import time + class PStreamJSON(PStreamBase): """JSON stream implementation that generates sensor data.""" def __init__(self): - super().__init__() + super().__init__() #gives other methods access to the base class self.counter = 0 self.sample_data = [ { @@ -27,6 +30,8 @@ def __init__(self): } ] + + def get_next_data(self) -> bytes: """Generate the next JSON data packet.""" # Cycle through sample data @@ -45,11 +50,28 @@ def get_next_data(self) -> bytes: elif json_obj["sensor"] == "pressure": json_obj["value"] = 1013.25 + (self.counter % 10) * 0.1 + # Simulating corrupt data + if random.random() < 0.5: # 50% chance of seeing corruption in data + corruption_type = random.choice(["nonphysical", "noise", "missing", "no_change"]) + + if corruption_type == "nonphysical": + json_obj["value"] == random.uniform(-9999, 9999) + elif corruption_type == "noise": + json_obj["value"] *= random.uniform(1.5, 3.0) + elif corruption_type == "missing": + del json_obj["value"] + elif corruption_type == "no_change": + pass + self.counter += 1 - # Convert to JSON string with newline delimiter ('\n') - json_str = json.dumps(json_obj) - message = json_str + '\n' - + """ + Simulating delay in data stream. + Note: If there is a delay, the server will state: No data received + """ + if random.random() < 0.5: # 50% chance of delay + pause_time = 1 + time.sleep(pause_time) - return message.encode('utf-8') \ No newline at end of file + # Convert to JSON string with newline delimiter ('\n') + return (json.dumps(json_obj) + '\n').encode('utf-8') \ No newline at end of file diff --git a/tools/pserver/pstreamer.py b/tools/pserver/pstreamer.py index aefcb45d..326c6b12 100644 --- a/tools/pserver/pstreamer.py +++ b/tools/pserver/pstreamer.py @@ -12,11 +12,11 @@ class PStreamer: and makes it available via a queue. """ - def __init__(self): + def __init__(self, interval: float = 1.0): self.stream: Optional[PStreamBase] = None self.thread: Optional[threading.Thread] = None self._stop_event = threading.Event() - self.stream_interval = 1.0 # *** By Default: 1 second between data generation! *** + self.stream_interval = interval # *** By Default: 1 second between data generation! *** def build_stream(self, stream: PStreamBase) -> 'PStreamer': """ @@ -46,8 +46,8 @@ def set_interval(self, interval: float) -> 'PStreamer': def _stream_worker(self): """Internal worker method that runs in a separate thread.""" - if self.stream is None: - raise ValueError("Stream not built. Call build_stream() first.") + #if self.stream is None: + #raise ValueError("Stream not built. Call build_stream() first.") self.stream.start() print(f"[PStreamer] Stream thread started with {self.stream.__class__.__name__}") @@ -84,7 +84,7 @@ def start(self): def stop(self): """Stop the streaming thread.""" - if self.thread is None or not self.thread.is_alive(): + if not self.is_running(): print("[PStreamer] Stream not running") return