Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions tools/pserver/pserver.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
#!/usr/bin/env python3
import socket
import sys
from tkinter import SE
from putils import *
from pstreamer import PStreamer
from pstream_json import PStreamJSON

class PServer:
def __init__(self):
self.verbose = 0
def __init__(self, host = "127.0.0.1", port = 3000):
self.host = host
self.port = port

def send_stream(self, client_socket, client_address, streamer: PStreamer):
"""
Expand All @@ -29,7 +31,6 @@ def send_stream(self, client_socket, client_address, streamer: PStreamer):
continue

client_socket.sendall(data)
#pinfo("PServer", f"Sent: {data.decode('utf-8').strip()}")
counter += 1

except (BrokenPipeError, ConnectionResetError) as e:
Expand All @@ -39,10 +40,8 @@ def send_stream(self, client_socket, client_address, streamer: PStreamer):
finally:
client_socket.close()

def main():
host = '127.0.0.1'
port = 3000

def main():
if len(sys.argv) > 1:
port = int(sys.argv[1])

Expand Down
2 changes: 1 addition & 1 deletion tools/pserver/pstream_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,6 @@ def put_data(self, data: bytes):
def get_data(self, timeout: Optional[float] = None) -> Optional[bytes]:
"""Get data from the queue."""
try:
return self.data_queue.get(timeout=timeout)
return self.data_queue.get(timeout=timeout) #if timeout is None, it blocks until an item is available
except queue.Empty:
return None
34 changes: 28 additions & 6 deletions tools/pserver/pstream_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
import json
import time
from pstream_base import PStreamBase
import random
import time


class PStreamJSON(PStreamBase):
"""JSON stream implementation that generates sensor data."""

def __init__(self):
super().__init__()
super().__init__() #gives other methods access to the base class
self.counter = 0
self.sample_data = [
{
Expand All @@ -27,6 +30,8 @@ def __init__(self):
}
]



def get_next_data(self) -> bytes:
"""Generate the next JSON data packet."""
# Cycle through sample data
Expand All @@ -45,11 +50,28 @@ def get_next_data(self) -> bytes:
elif json_obj["sensor"] == "pressure":
json_obj["value"] = 1013.25 + (self.counter % 10) * 0.1

# Simulating corrupt data
if random.random() < 0.5: # 50% chance of seeing corruption in data
corruption_type = random.choice(["nonphysical", "noise", "missing", "no_change"])

if corruption_type == "nonphysical":
json_obj["value"] == random.uniform(-9999, 9999)
elif corruption_type == "noise":
json_obj["value"] *= random.uniform(1.5, 3.0)
elif corruption_type == "missing":
del json_obj["value"]
elif corruption_type == "no_change":
pass

self.counter += 1

# Convert to JSON string with newline delimiter ('\n')
json_str = json.dumps(json_obj)
message = json_str + '\n'

"""
Simulating delay in data stream.
Note: If there is a delay, the server will state: No data received
"""
if random.random() < 0.5: # 50% chance of delay
pause_time = 1
time.sleep(pause_time)

return message.encode('utf-8')
# Convert to JSON string with newline delimiter ('\n')
return (json.dumps(json_obj) + '\n').encode('utf-8')
10 changes: 5 additions & 5 deletions tools/pserver/pstreamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ class PStreamer:
and makes it available via a queue.
"""

def __init__(self):
def __init__(self, interval: float = 1.0):
self.stream: Optional[PStreamBase] = None
self.thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self.stream_interval = 1.0 # *** By Default: 1 second between data generation! ***
self.stream_interval = interval # *** By Default: 1 second between data generation! ***

def build_stream(self, stream: PStreamBase) -> 'PStreamer':
"""
Expand Down Expand Up @@ -46,8 +46,8 @@ def set_interval(self, interval: float) -> 'PStreamer':

def _stream_worker(self):
"""Internal worker method that runs in a separate thread."""
if self.stream is None:
raise ValueError("Stream not built. Call build_stream() first.")
#if self.stream is None:
#raise ValueError("Stream not built. Call build_stream() first.")

self.stream.start()
print(f"[PStreamer] Stream thread started with {self.stream.__class__.__name__}")
Expand Down Expand Up @@ -84,7 +84,7 @@ def start(self):

def stop(self):
"""Stop the streaming thread."""
if self.thread is None or not self.thread.is_alive():
if not self.is_running():
print("[PStreamer] Stream not running")
return

Expand Down