From 6348c02a99c36eed5b411b7d1c62b933eda9241e Mon Sep 17 00:00:00 2001 From: Kenneth Date: Sat, 31 Jan 2026 12:45:41 -0700 Subject: [PATCH 1/3] Just a few changes --- tools/pserver/pserver.py | 11 +++++------ tools/pserver/pstream_base.py | 2 +- tools/pserver/pstream_json.py | 8 ++------ tools/pserver/pstreamer.py | 10 +++++----- 4 files changed, 13 insertions(+), 18 deletions(-) diff --git a/tools/pserver/pserver.py b/tools/pserver/pserver.py index 3bf80d83..4362b038 100644 --- a/tools/pserver/pserver.py +++ b/tools/pserver/pserver.py @@ -1,13 +1,15 @@ #!/usr/bin/env python3 import socket import sys +from tkinter import SE from putils import * from pstreamer import PStreamer from pstream_json import PStreamJSON class PServer: - def __init__(self): - self.verbose = 0 + def __init__(self, host = "127.0.0.1", port = 3000): + self.host = host + self.port = port def send_stream(self, client_socket, client_address, streamer: PStreamer): """ @@ -29,7 +31,6 @@ def send_stream(self, client_socket, client_address, streamer: PStreamer): continue client_socket.sendall(data) - #pinfo("PServer", f"Sent: {data.decode('utf-8').strip()}") counter += 1 except (BrokenPipeError, ConnectionResetError) as e: @@ -39,10 +40,8 @@ def send_stream(self, client_socket, client_address, streamer: PStreamer): finally: client_socket.close() -def main(): - host = '127.0.0.1' - port = 3000 +def main(): if len(sys.argv) > 1: port = int(sys.argv[1]) diff --git a/tools/pserver/pstream_base.py b/tools/pserver/pstream_base.py index 2eedd66d..41edf96d 100644 --- a/tools/pserver/pstream_base.py +++ b/tools/pserver/pstream_base.py @@ -51,6 +51,6 @@ def put_data(self, data: bytes): def get_data(self, timeout: Optional[float] = None) -> Optional[bytes]: """Get data from the queue.""" try: - return self.data_queue.get(timeout=timeout) + return self.data_queue.get(timeout=timeout) #if timeout is None, it blocks until an item is available except queue.Empty: return None \ No newline at end of file diff --git a/tools/pserver/pstream_json.py b/tools/pserver/pstream_json.py index 926424d2..11c90c47 100644 --- a/tools/pserver/pstream_json.py +++ b/tools/pserver/pstream_json.py @@ -7,7 +7,7 @@ class PStreamJSON(PStreamBase): """JSON stream implementation that generates sensor data.""" def __init__(self): - super().__init__() + super().__init__() #gives other methods access to the base class self.counter = 0 self.sample_data = [ { @@ -48,8 +48,4 @@ def get_next_data(self) -> bytes: self.counter += 1 # Convert to JSON string with newline delimiter ('\n') - json_str = json.dumps(json_obj) - message = json_str + '\n' - - - return message.encode('utf-8') \ No newline at end of file + return (json.dumps(json_obj) + '\n').encode('utf-8') \ No newline at end of file diff --git a/tools/pserver/pstreamer.py b/tools/pserver/pstreamer.py index aefcb45d..326c6b12 100644 --- a/tools/pserver/pstreamer.py +++ b/tools/pserver/pstreamer.py @@ -12,11 +12,11 @@ class PStreamer: and makes it available via a queue. """ - def __init__(self): + def __init__(self, interval: float = 1.0): self.stream: Optional[PStreamBase] = None self.thread: Optional[threading.Thread] = None self._stop_event = threading.Event() - self.stream_interval = 1.0 # *** By Default: 1 second between data generation! *** + self.stream_interval = interval # *** By Default: 1 second between data generation! *** def build_stream(self, stream: PStreamBase) -> 'PStreamer': """ @@ -46,8 +46,8 @@ def set_interval(self, interval: float) -> 'PStreamer': def _stream_worker(self): """Internal worker method that runs in a separate thread.""" - if self.stream is None: - raise ValueError("Stream not built. Call build_stream() first.") + #if self.stream is None: + #raise ValueError("Stream not built. Call build_stream() first.") self.stream.start() print(f"[PStreamer] Stream thread started with {self.stream.__class__.__name__}") @@ -84,7 +84,7 @@ def start(self): def stop(self): """Stop the streaming thread.""" - if self.thread is None or not self.thread.is_alive(): + if not self.is_running(): print("[PStreamer] Stream not running") return From f85377bc7ea32222c9a45cd91f8d51543660e6e4 Mon Sep 17 00:00:00 2001 From: Kenneth Date: Tue, 3 Feb 2026 16:16:30 -0700 Subject: [PATCH 2/3] Simulating random corruption in the data stream --- tools/pserver/pstream_json.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/tools/pserver/pstream_json.py b/tools/pserver/pstream_json.py index 11c90c47..35da1eab 100644 --- a/tools/pserver/pstream_json.py +++ b/tools/pserver/pstream_json.py @@ -2,6 +2,8 @@ import json import time from pstream_base import PStreamBase +import random + class PStreamJSON(PStreamBase): """JSON stream implementation that generates sensor data.""" @@ -27,6 +29,8 @@ def __init__(self): } ] + + def get_next_data(self) -> bytes: """Generate the next JSON data packet.""" # Cycle through sample data @@ -45,6 +49,19 @@ def get_next_data(self) -> bytes: elif json_obj["sensor"] == "pressure": json_obj["value"] = 1013.25 + (self.counter % 10) * 0.1 + # Simulating corrupt data + if random.random(): + corruption_type = random.choice(["nonphysical", "noise", "missing", "no_change"]) + + if corruption_type == "nonphysical": + json_obj["value"] == random.uniform(-9999, 9999) + elif corruption_type == "noise": + json_obj["value"] *= random.uniform(1.5, 3.0) + elif corruption_type == "missing": + del json_obj["value"] + elif corruption_type == "no_change": + pass + self.counter += 1 # Convert to JSON string with newline delimiter ('\n') From 03d356ba7fddf1785ed1aa96cca2d4c1f02b7713 Mon Sep 17 00:00:00 2001 From: Kenneth Date: Tue, 3 Feb 2026 17:51:42 -0700 Subject: [PATCH 3/3] Added randome delays to data stream --- tools/pserver/pstream_json.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/tools/pserver/pstream_json.py b/tools/pserver/pstream_json.py index 35da1eab..b8d93fd2 100644 --- a/tools/pserver/pstream_json.py +++ b/tools/pserver/pstream_json.py @@ -3,6 +3,7 @@ import time from pstream_base import PStreamBase import random +import time class PStreamJSON(PStreamBase): @@ -50,7 +51,7 @@ def get_next_data(self) -> bytes: json_obj["value"] = 1013.25 + (self.counter % 10) * 0.1 # Simulating corrupt data - if random.random(): + if random.random() < 0.5: # 50% chance of seeing corruption in data corruption_type = random.choice(["nonphysical", "noise", "missing", "no_change"]) if corruption_type == "nonphysical": @@ -64,5 +65,13 @@ def get_next_data(self) -> bytes: self.counter += 1 + """ + Simulating delay in data stream. + Note: If there is a delay, the server will state: No data received + """ + if random.random() < 0.5: # 50% chance of delay + pause_time = 1 + time.sleep(pause_time) + # Convert to JSON string with newline delimiter ('\n') return (json.dumps(json_obj) + '\n').encode('utf-8') \ No newline at end of file