Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,24 @@ A repository with code for a online video acquisition/encoding pipeline

## ZeroMQ Architecture

![ZeroMQ Architecture](assets/zmq_architecture.svg)
![ZeroMQ Architecture](assets/zmq_architecture.svg)

## Notes on workflow startup

Button -> EnableExperiment
EnableExperiment(unit) -> StartLogging(unit) -> StartExperiment (unit)
-> StartLogging -> IsExperimentRunning(bool)

In main:
zmqRequest (master) -> EnableExperiment

In satellite:
zmqRequest (satellite) -> EnableExperiment

## Refactored

In master:

Button | zmqRequest -> TryStart ( _ = >{
zmqRequest (satellite) -> EnableExperiment(satellite) -> HasStarted .zip(). timeout}
) -> EnableExperiment -> StartLogging -> StartExperiment -> Trigger
2 changes: 1 addition & 1 deletion docs/api.session.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
api.session
-------------

.. autopydantic_model:: aind_behavior_services.session.AindBehaviorSessionModel
.. autopydantic_model:: aind_behavior_services.session.Session
:members:
:undoc-members:
:show-inheritance:
5 changes: 4 additions & 1 deletion examples/clabe.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,7 @@ default_behavior_picker:
config_library_dir: '\\allen\aind\scratch\AindBehavior.db\AindBehaviorJustFrames'

robocopy:
destination: '\\allen\aind\scratch\data'
destination: '\\allen\aind\scratch\data'

xml_rpc_client:
token: '42'
21 changes: 11 additions & 10 deletions examples/examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
import os
from pathlib import Path

import aind_behavior_services.rig as rig
from aind_behavior_services.session import AindBehaviorSessionModel
from aind_behavior_services.rig import cameras, harp
from aind_behavior_services.session import Session

from aind_behavior_just_frames.rig import AindJustFramesRig


def main(path_seed: str = "./local/{schema}.json"):
this_session = AindBehaviorSessionModel(
this_session = Session(
date=datetime.datetime.now(tz=datetime.timezone.utc),
experiment="AindVideoEncodingBenchmarks",
subject="Test",
Expand All @@ -19,38 +19,39 @@ def main(path_seed: str = "./local/{schema}.json"):
experimenter=["Foo", "Bar"],
)

video_writer = rig.cameras.VideoWriterFfmpeg(
video_writer = cameras.VideoWriterFfmpeg(
frame_rate=120,
container_extension="mp4",
# input and output arguments can be overridden by the user
)

this_rig = AindJustFramesRig(
data_directory=Path("C:/Data"),
computer_name="this_computer",
rig_name="this_rig",
triggered_camera_controller_0=rig.cameras.CameraController[rig.cameras.SpinnakerCamera](
triggered_camera_controller_0=cameras.CameraController[cameras.SpinnakerCamera](
frame_rate=120,
cameras={
"FaceCamera": rig.cameras.SpinnakerCamera(
"FaceCamera": cameras.SpinnakerCamera(
serial_number="SerialNumber",
binning=1,
exposure=5000,
gain=0,
video_writer=video_writer,
adc_bit_depth=rig.cameras.SpinnakerCameraAdcBitDepth.ADC10BIT,
adc_bit_depth=cameras.SpinnakerCameraAdcBitDepth.ADC10BIT,
),
"SideCamera": rig.cameras.SpinnakerCamera(
"SideCamera": cameras.SpinnakerCamera(
serial_number="SerialNumber",
binning=1,
exposure=5000,
gain=0,
video_writer=video_writer,
adc_bit_depth=rig.cameras.SpinnakerCameraAdcBitDepth.ADC10BIT,
adc_bit_depth=cameras.SpinnakerCameraAdcBitDepth.ADC10BIT,
),
},
),
triggered_camera_controller_1=None,
harp_behavior=rig.harp.HarpBehavior(port_name="COM3"),
harp_behavior=harp.HarpBehavior(port_name="COM3"),
)

os.makedirs(os.path.dirname(path_seed), exist_ok=True)
Expand Down
131 changes: 131 additions & 0 deletions examples/experiment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import asyncio
import dataclasses
import logging
from pathlib import Path

import clabe.xml_rpc
from aind_behavior_services.session import Session
from clabe import resource_monitor
from clabe.apps import AindBehaviorServicesBonsaiApp, BonsaiApp
from clabe.data_transfer import robocopy
from clabe.launcher import Launcher, experiment
from clabe.pickers import DefaultBehaviorPicker, DefaultBehaviorPickerSettings

from aind_behavior_just_frames.rig import AindJustFramesRig, SatelliteRig

logger = logging.getLogger(__name__)


@experiment(name="just_frames_with_satellites")
async def my_experiment(launcher: Launcher) -> None:
picker = DefaultBehaviorPicker(launcher=launcher, settings=DefaultBehaviorPickerSettings())
session = picker.pick_session(Session)
rig = picker.pick_rig(AindJustFramesRig)
launcher.register_session(session, rig.data_directory)

monitor = resource_monitor.ResourceMonitor(
constrains=[
resource_monitor.available_storage_constraint_factory(launcher.data_directory, 2e11),
]
)

# Validate resources
monitor.run()
has_satellites = len(rig.satellite_rigs) > 0
satellites: dict[str, SatelliteRigConnection] = {}
if has_satellites:
SATELLITE_UPLOAD_ROOT = "."
for s in rig.satellite_rigs:
xml_client = clabe.xml_rpc.XmlRpcClient(
settings=clabe.xml_rpc.XmlRpcClientSettings(
token="just-frames", server_url=f"{s.zmq_protocol_config.address}:8000"
)
)
this_session = xml_client.upload_model(
session, SATELLITE_UPLOAD_ROOT / f"{session.session_name}_session.json"
)
this_rig = xml_client.upload_model(s, SATELLITE_UPLOAD_ROOT / f"{session.session_name}_rig.json")

assert this_session.filename is not None, "Failed to upload session to satellite rig."
assert this_rig.filename is not None, "Failed to upload rig to satellite rig."
additional_externalized_properties = {
"RigPath": this_rig.filename,
"SessionPath": this_session.filename,
}
satellite_bonsai_app = BonsaiApp(
workflow=Path(r"./src/main.bonsai"),
additional_externalized_properties=additional_externalized_properties,
)
satellites[s.rig_name] = SatelliteRigConnection(
rig=s,
xml_rpc_client=xml_client,
bonsai_app=satellite_bonsai_app,
xml_rpc_executor=clabe.xml_rpc.XmlRpcExecutor(client=xml_client),
)

bonsai_app = AindBehaviorServicesBonsaiApp(
workflow=Path(r"./src/main.bonsai"),
rig=rig,
session=session,
)

tasks = {
satellite.rig.rig_name: satellite.xml_rpc_executor.run_async(satellite.bonsai_app.command)
for satellite in satellites.values()
}
tasks[rig.rig_name] = bonsai_app.run_async()
results = await asyncio.gather(*tasks.values())

for rig_id, result in dict(zip(tasks.keys(), results)).items():
if result.exit_code != 0:
logger.error(
"RigId %s 's, App exited with error code %d. With stdout %s and stderr %s",
rig_id,
result.exit_code,
result.stdout,
result.stderr,
)
else:
logger.info("RigId %s 's, App completed successfully with stdout %s", rig_id, result.stdout)
logger.debug("RigId %s 's, App completed successfully with stderr %s", rig_id, result.stderr)

launcher.copy_logs()

settings = robocopy.RobocopySettings()
assert launcher.session.session_name is not None, "Session name is None"
settings.destination = Path(settings.destination) / launcher.session.subject / launcher.session.session_name
robocopy_tasks = {
satellite.rig.rig_name: satellite.xml_rpc_executor.run_async(
_make_robocopy_from_rig(settings, satellite.rig, launcher.session.session_name).command
)
for satellite in satellites.values()
}
robocopy_tasks[rig.rig_name] = robocopy.RobocopyService(
source=rig.data_directory / launcher.session.session_name, settings=settings
).run_async()
await asyncio.gather(*robocopy_tasks.values())
return


@dataclasses.dataclass
class SatelliteRigConnection:
rig: SatelliteRig
xml_rpc_client: clabe.xml_rpc.XmlRpcClient
xml_rpc_executor: clabe.xml_rpc.XmlRpcExecutor
bonsai_app: BonsaiApp


def _make_robocopy_from_rig(
robocopy_settings: robocopy.RobocopySettings, rig: AindJustFramesRig | SatelliteRig, session_name: str
) -> robocopy.RobocopyService:
# For videos, we flatten everything in the behavior-videos directory
# Everything else gets dumped in the behavior directory under .satellites/rig_name/
source = {
rig.data_directory / session_name / "behavior-videos": Path(robocopy_settings.destination) / "behavior-videos",
rig.data_directory / session_name / "behavior": Path(robocopy_settings.destination)
/ "behavior"
/ "satellites"
/ rig.rig_name,
}
settings = robocopy_settings.model_copy(update={"destination": None}) # we will set destination per-path
return robocopy.RobocopyService(source=source, settings=settings)
10 changes: 6 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@ version = "0.5.0rc2"
readme = {file = "README.md", content-type = "text/markdown"}

dependencies = [
"aind_behavior_services @git+https://github.com/AllenNeuralDynamics/Aind.Behavior.Services@90c77e682922288bc7779c4f8cde5d6f15115899",
"aind_behavior_services>=0.13",
"pydantic-settings"
]

[project.optional-dependencies]

data = ["contraqctor >= 0.5.3 ,<0.6.0"]
data = ["contraqctor >=0.5.3, <0.6.0"]

launcher = [
"aind-clabe[aind-services] @ git+https://github.com/AllenNeuralDynamics/clabe@test-services-13",
"aind_behavior_just_frames[data]",]
"aind-clabe[aind-services]>=0.10.0",
"aind_behavior_just_frames[data]"
]

[dependency-groups]

Expand Down
Loading