Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
services:
tiled:
# see the file ./tiled/deploy/config.yml for detailed configuration of tiled
image: ghcr.io/bluesky/tiled:main
ports:
- "8000:8000"
environment:
- TILED_SINGLE_USER_API_KEY=${TILED_SINGLE_USER_API_KEY}
volumes:
- ./services/tiled/deploy:/deploy:Z
- ./data:/data:Z
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://tiled:8000/healthz"]
interval: 10s
timeout: 5s
retries: 3
networks:
aps_net:
# see the file ./tiled/deploy/config.yml for detailed configuration of tiled
image: ghcr.io/bluesky/tiled:main
ports:
- "8000:8000"
environment:
- TILED_SINGLE_USER_API_KEY=${TILED_SINGLE_USER_API_KEY}
volumes:
- ./services/tiled/deploy:/deploy:Z
- ./data:/data:Z
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://tiled:8000/healthz"]
interval: 10s
timeout: 5s
retries: 3
networks:
aps_net:

processor:
command: python -m tr_ap_xps.apps.processor_cli
command: python -m tr_ap_xps.apps.processor_cli_tpx
build:
context: .
dockerfile: Dockerfile_processor
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ authors = [
]
description = "a package to perform computations and suggestion during a time-resolved AP-XPS experiment"
readme = "README.md"
requires-python = ">=3.10"
requires-python = ">=3.11,<3.14"
classifiers = [
"Programming Language :: Python :: 3",
"Operating System :: OS Independent",
Expand Down
24 changes: 19 additions & 5 deletions settings.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,23 @@
# xps_operator:
# log_level: "INFO"
# tiled_uri: "http://localhost:8000"
# lv_zmq_listener:
# zmq_pub_address: "tcp://localhost"
# zmq_pub_port: 5555
# tpx_zmq_listener:
# zmq_pub_address: "tcp://localhost"
# zmq_pub_port: 5657
# websockets_publisher:
# host: "0.0.0.0"
# port: 8001
xps_operator:
log_level: "INFO"
tiled_uri: "http://localhost:8000"
log_level: "DEBUG"
tiled_uri: "https://tiled-staging.computing.als.lbl.gov"
tiled_api_key: afecf30c4fcf142a9c30142dc8b7d5212a55a100715c580f034afcddc317be822174ead4
lv_zmq_listener:
zmq_pub_address: "tcp://localhost"
zmq_pub_port: 5555
websockets_publisher:
host: "0.0.0.0"
port: 8001
tpx_zmq_listener:
zmq_pub_address: "tcp://131.243.191.226"
zmq_pub_port: 5657
websocket_url: "ws://localhost:8766"
65 changes: 38 additions & 27 deletions src/_tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,59 +2,70 @@
import time
from multiprocessing import Process

import pytest
import zmq


def start_processor_cli():
subprocess.run(["python", "processor_cli.py"])
"""Start the processor CLI as a subprocess."""
subprocess.run(
["python", "-m", "tr_ap_xps.apps.processor_cli", "listen"],
check=False, # Don't fail if it exits early
)


def start_zmq_publisher(port):
"""Start a ZMQ publisher that sends test messages."""
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind(f"tcp://*:{port}")
while True:
# Give subscribers time to connect (ZMQ slow joiner problem)
time.sleep(1)
# Send a few messages
for i in range(5):
socket.send_string("test message")
time.sleep(1)


def start_tiled_server():
subprocess.run(["python", "tiled_server.py"])
time.sleep(0.5)
socket.close()
context.term()


def test_integration():
# Start processor_cli in a background process
processor_cli_process = Process(target=start_processor_cli)
processor_cli_process.start()
time.sleep(2) # Give it time to start

"""Integration test for ZMQ message flow."""
# Dynamically assign a port for ZMQ publisher
context = zmq.Context()
temp_socket = context.socket(zmq.PUB)
port = temp_socket.bind_to_random_port("tcp://*")
temp_socket.close()
context.term()

# Start zmq publisher in a background process with the random port
zmq_publisher_process = Process(target=start_zmq_publisher, args=(port,))
zmq_publisher_process.start()
time.sleep(2) # Give it time to start

# Start tiled server in a background process
tiled_server_process = Process(target=start_tiled_server)
tiled_server_process.start()
time.sleep(2) # Give it time to start
time.sleep(0.5) # Give it time to start

# Set up zmq subscriber to receive messages
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect(f"tcp://localhost:{port}")
socket.setsockopt_string(zmq.SUBSCRIBE, "")
#
# Check if messages are received and processed
message = socket.recv_string()
assert message == "test message"

# Clean up
processor_cli_process.terminate()
zmq_publisher_process.terminate()
tiled_server_process.terminate()
# Set timeout to prevent hanging
socket.setsockopt(zmq.RCVTIMEO, 5000) # 5 second timeout

# Give ZMQ time to establish connection (slow joiner problem)
time.sleep(1)

try:
# Check if messages are received and processed
# The timeout is set above, so this won't hang forever
message = socket.recv_string()
assert message == "test message"
except zmq.Again:
pytest.fail("No message received from ZMQ publisher within timeout")
finally:
# Clean up
socket.close()
context.term()
zmq_publisher_process.join(timeout=2) # Wait for process to finish
if zmq_publisher_process.is_alive():
zmq_publisher_process.terminate()
zmq_publisher_process.join()
40 changes: 39 additions & 1 deletion src/_tests/test_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async def test_listen_zmq_interface(mock_operator, monkeypatch):
zmq_socket = setup_zmq() # Ensure setup_zmq supports async if needed

async with run_simulator(num_frames=1):
asyncio.sleep(2)
await asyncio.sleep(2)
listener = XPSLabviewZMQListener(mock_operator, zmq_socket)

# Start the listener in an asyncio task
Expand Down Expand Up @@ -64,3 +64,41 @@ async def test_listen_zmq_interface(mock_operator, monkeypatch):
assert isinstance(
call_args[2][0][0], XPSStop
), f"Second argument is not an instance of XPSStop: {call_args[2][0][0]}"


@pytest.mark.asyncio
async def test_listen_timepix_zmq_interface(mock_operator, monkeypatch):
# monkeypatch.setattr("tr_ap_xps.labview.app_settings.lv_zmq_listener.zmq_pub_port", "6000")
zmq_socket = setup_zmq() # Ensure setup_zmq supports async if needed

async with run_simulator(num_frames=1):
await asyncio.sleep(2)
listener = XPSLabviewZMQListener(mock_operator, zmq_socket)

# Start the listener in an asyncio task
listener_task = asyncio.create_task(listener.start())

# Give the listener time to process messages
await asyncio.sleep(3)

# Stop the listener and wait for it to clean up
await listener.stop()
try:
await listener_task
except asyncio.CancelledError:
pass

# Ensure process was called three times. We expect 1 event.
assert mock_operator.process.call_count == 3

# Validate that the arguments are instances of specific classes
call_args = mock_operator.process.call_args_list
assert isinstance(
call_args[0][0][0], XPSStart
), f"First argument is not an instance of XPSStart: {call_args[0][0][0]}"
assert isinstance(
call_args[1][0][0], XPSRawEvent
), f"Second argument is not an instance of XPSRawEvent: {call_args[1][0][0]}"
assert isinstance(
call_args[2][0][0], XPSStop
), f"Second argument is not an instance of XPSStop: {call_args[2][0][0]}"
78 changes: 78 additions & 0 deletions src/tr_ap_xps/apps/processor_cli_tpx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import asyncio
import logging
import signal

import typer
from tiled.client import from_uri
from tiled.client.node import Container

from ..config import settings
# from ..labview import XPSLabviewZMQListener, setup_zmq
from ..timepix import XPSTimepixZMQListener, setup_zmq
from ..log_utils import setup_logger
from ..pipeline.xps_operator import XPSOperator
from ..tiled import TiledPublisher
from ..websockets import XPSWSResultPublisher

app = typer.Typer()
logger = logging.getLogger("tr_ap_xps")
setup_logger(logger)

app_settings = settings.xps_operator

def tiled_runs_container() -> Container:
try:
client = from_uri(app_settings.tiled_uri, api_key=app_settings.tiled_api_key)
if client.get("runs") is None: # TODO test case
client.create_container("runs")
return client["runs"]
except Exception as e:
logger.error(f"Error connecting to Tiled: {e}")


@app.command()
async def listen() -> None:
try:
logger.setLevel(app_settings.log_level.upper())
logger.debug("DEBUG LOGGING SET")
logger.info(
f"tpx_zmq_socket_address: {app_settings.tpx_zmq_listener.zmq_pub_address}"
)
logger.info(f"tpx_zmq_socket_port: {app_settings.tpx_zmq_listener.zmq_pub_port}")
logger.info(f"tiled_uri: {app_settings.tiled_uri}")
logger.info(
f"tiled_api_key: {'****' if app_settings.tiled_api_key else 'NOT SET!!!'}"
)

received_sigterm = {"received": False} # Define the variable received_sigterm

# setup websocket server
operator = XPSOperator()
ws_publisher = XPSWSResultPublisher(app_settings.websocket_url)
# tiled_pub = TiledPublisher(tiled_runs_container())

operator.add_publisher(ws_publisher)
# operator.add_publisher(tiled_pub)
# connect to labview zmq

tpx_zmq_socket = setup_zmq()
listener = XPSTimepixZMQListener(operator=operator, zmq_socket=tpx_zmq_socket)

# Wait for both tasks to complete
await asyncio.gather(listener.start(), ws_publisher.start())

def handle_sigterm(signum, frame):
logger.info("SIGTERM received, stopping...")
received_sigterm["received"] = True
asyncio.create_task(listener.stop())
asyncio.create_task(ws_publisher.stop())

# Register the handler for SIGTERM
signal.signal(signal.SIGTERM, handle_sigterm)
except Exception as e:
logger.error(f"Error setting up XPS processor {e}")
raise e


if __name__ == "__main__":
asyncio.run(listen())
48 changes: 32 additions & 16 deletions src/tr_ap_xps/pipeline/xps_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from arroyopy.operator import Operator
from arroyopy.schemas import Message

from ..schemas import DataFrameModel, XPSRawEvent, XPSResultStop, XPSStart, XPSStop
from ..schemas import XPSResult, XPSRawEvent, XPSResultStop, XPSStart, XPSStop
from ..timing import timer
from .xps_processor import XPSProcessor

Expand All @@ -17,8 +17,9 @@ class XPSOperator(Operator):

"""

def __init__(self) -> None:
def __init__(self, build_heatmaps: bool = False) -> None:
self.xps_processor = None
self.build_heatmaps = build_heatmaps

async def process(self, message: Message) -> None:
"""
Expand All @@ -40,19 +41,34 @@ async def process(self, message: Message) -> None:
await self.publish(message)

elif isinstance(message, XPSRawEvent):
if not self.xps_processor:
logger.error(
"Received XPSRawEvent without an active XPSProcessor. Started after labview started?"

if self.build_heatmaps:
if not self.xps_processor:
logger.error(
"Received XPSRawEvent without an active XPSProcessor. Started after labview started?"
)
return
result: XPSResult = await asyncio.to_thread(
self.xps_processor.process_frame, message
)
else:
result = XPSResult(
shot_num=message.image_info.frame_number,
integrated_frames=message.image,
rolling_mean=None,
rolling_std=None,
frame_number=message.image_info.frame_number,
detected_peaks=None,
vfft=None,
ifft=None,
shot_recent=None,
shot_mean=None,
shot_std=None,
)
return
result: XPSRawEvent = await asyncio.to_thread(
self.xps_processor.process_frame, message
)
if result:
await self.publish(result)

elif isinstance(message, XPSStop):
data_frame_model = DataFrameModel(df=timer.timing_dataframe)
new_msg = XPSResultStop(function_timings=data_frame_model)
await self.publish(new_msg)
self.xps_processor = None
await self.publish(result)
# elif isinstance(message, XPSStop):
# data_frame_model = DataFrameModel(df=timer.timing_dataframe)
# new_msg = XPSResultStop(function_timings=data_frame_model)
# await self.publish(new_msg)
# self.xps_processor = None
Loading