Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/aiko_services/main/hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def remove_hook_handler(self, hook_name, hook_function):
pass

@abstractmethod
def run_hook(self, hook_name):
def run_hook(self, hook_name, variables=None):
pass

@abstractmethod
Expand Down
15 changes: 15 additions & 0 deletions src/aiko_services/main/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@
_PIPELINE_HOOK_PROCESS_ELEMENT_POST = PIPELINE_HOOK_PROCESS_ELEMENT_POST+"0"
PIPELINE_HOOK_PROCESS_FRAME = "pipeline.process_frame:"
_PIPELINE_HOOK_PROCESS_FRAME = PIPELINE_HOOK_PROCESS_FRAME+"0"
PIPELINE_HOOK_PROCESS_FRAME_COMPLETE = "pipeline.process_frame_complete:"
_PIPELINE_HOOK_PROCESS_FRAME_COMPLETE = PIPELINE_HOOK_PROCESS_FRAME_COMPLETE+"0"
PIPELINE_HOOK_DESTROY_STREAM = "pipeline.destroy_stream:"
_PIPELINE_HOOK_DESTROY_STREAM = PIPELINE_HOOK_DESTROY_STREAM+"0"

_GRACE_TIME = 60 # seconds
_LOGGER = aiko.logger(__name__)
Expand Down Expand Up @@ -698,6 +702,8 @@ def __init__(self, context):
self.add_hook(_PIPELINE_HOOK_PROCESS_ELEMENT)
self.add_hook(_PIPELINE_HOOK_PROCESS_ELEMENT_POST)
self.add_hook(_PIPELINE_HOOK_PROCESS_FRAME)
self.add_hook(_PIPELINE_HOOK_PROCESS_FRAME_COMPLETE)
self.add_hook(_PIPELINE_HOOK_DESTROY_STREAM)

self.pipeline_graph = self._create_pipeline_graph(context.definition)
self.share["element_count"] = self.pipeline_graph.element_count
Expand Down Expand Up @@ -1059,6 +1065,7 @@ def destroy_stream(self, stream_id, graceful=False, use_thread_local=True):
if stream_id in self.DEBUG: # DEBUG: 2024-12-02
del self.DEBUG[stream_id]

diagnostic = {}
graph_path = self.pipeline_graph.get_path(self.share["graph_path"])
for node in graph_path:
element, element_name, local, _ = \
Expand All @@ -1078,6 +1085,10 @@ def destroy_stream(self, stream_id, graceful=False, use_thread_local=True):
in_destroy_stream=True))
if stream.state == StreamState.ERROR:
break

self.run_hook(_PIPELINE_HOOK_DESTROY_STREAM,
lambda: {"stream": stream,
"diagnostic": diagnostic.get("diagnostic") if diagnostic else None})
finally:
if use_thread_local:
stream.lock.release()
Expand Down Expand Up @@ -1361,6 +1372,10 @@ def _process_frame_common(self, stream_dict, frame_data_in, new_frame) \
graph_node_list = loop_graph

if frame_complete:
self.run_hook(_PIPELINE_HOOK_PROCESS_FRAME_COMPLETE,
lambda: {
"stream": stream,
"frame_data_out": frame_data_out})
stream_info = {
"stream_id": stream.stream_id,
"frame_id": stream.frame_id,
Expand Down
17 changes: 14 additions & 3 deletions src/aiko_services/tests/unit/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ def process_frame(self, stream) -> Tuple[aiko.StreamEvent, dict]:
aiko.process.terminate() # TODO: Improve Aiko Services Process exit
return aiko.StreamEvent.OKAY, {}

def do_create_pipeline(pipeline_definition_json):
def do_create_pipeline(pipeline_definition_json, hooks=None,
frame_data=FRAME_DATA,
stream_id=None, parameters=None):
file = None
with tempfile.NamedTemporaryFile(delete=True, mode="w") as file:
file.write(pipeline_definition_json)
Expand All @@ -34,8 +36,17 @@ def do_create_pipeline(pipeline_definition_json):

pipeline = aiko.PipelineImpl.create_pipeline(
file.name, pipeline_definition, name=None, graph_path=None,
stream_id=None, parameters=PARAMETERS,
frame_id=0, frame_data=FRAME_DATA,
stream_id=stream_id, parameters=parameters or PARAMETERS,
frame_id=0, frame_data=frame_data,
grace_time=GRACE_TIME, queue_response=None)

if hooks is not None:
for hook, hook_function in hooks.items():
pipeline.add_hook_handler(hook, hook_function)

pipeline.run(mqtt_connection_required=False)

# Clean up global hooks
if hooks is not None:
for hook, hook_function in hooks.items():
pipeline.remove_hook_handler(hook, hook_function)
1 change: 1 addition & 0 deletions src/aiko_services/tests/unit/test_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,4 @@ def test_hook(): # by developer
hook_test = aiko.compose_instance(HookTest, init_args)
aiko.event.add_timer_handler(hook_test.hook_check, 0.0)
aiko.process.run(mqtt_connection_required=False)
aiko.event.remove_timer_handler(hook_test.hook_check)
81 changes: 81 additions & 0 deletions src/aiko_services/tests/unit/test_pipeline_hooks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import aiko_services as aiko
from aiko_services.main.pipeline import (
_PIPELINE_HOOK_PROCESS_ELEMENT,
_PIPELINE_HOOK_PROCESS_ELEMENT_POST,
_PIPELINE_HOOK_PROCESS_FRAME,
_PIPELINE_HOOK_PROCESS_FRAME_COMPLETE,
_PIPELINE_HOOK_DESTROY_STREAM,
)
from aiko_services.main.stream import StreamEvent
from aiko_services.tests.unit import do_create_pipeline

PIPELINE_DEFINITION = """{
"version": 0, "name": "p_test", "runtime": "python",
"graph": ["(A)"],
"elements": [
{ "name": "A",
"input": [{ "name": "a", "type": "int" }],
"output": [{ "name": "a", "type": "int" }],
"deploy": {
"local": { "module": "aiko_services.tests.unit.test_pipeline_hooks" }
}
}
]
}
"""

class A(aiko.PipelineElement):
def __init__(self, context):
context.call_init(self, "PipelineElement", context)

def frame_generator(self, stream, frame_id):
if frame_id == 0:
return StreamEvent.OKAY, {"a": 1}
else:
return StreamEvent.STOP, {}

def start_stream(self, stream, stream_id):
self.create_frames(stream, frame_generator=self.frame_generator)
return StreamEvent.OKAY, None

def process_frame(self, stream, a):
return StreamEvent.OKAY, {"a": a}

def stop_stream(self, stream, stream_id):
self.stop()
return StreamEvent.OKAY, None


def test_pipeline_hooks():
hooks_called = []

def hook_process_frame(hook_name, component, logger, variables, options):
hooks_called.append(hook_name)
assert variables["frame_data_in"] == {"a": 1}

def hook_process_element(hook_name, component, logger, variables, options):
hooks_called.append(hook_name)
assert variables["inputs"] == {"a": 1}

def hook_process_element_post(hook_name, component, logger, variables, options):
hooks_called.append(hook_name)
assert variables["frame_data_out"] == {"a": 1}

def hook_process_frame_complete(hook_name, component, logger, variables, options):
hooks_called.append(hook_name)
assert variables["frame_data_out"] == {"a": 1}

def hook_destroy_stream(hook_name, component, logger, variables, options):
hooks_called.append(hook_name)
assert variables["diagnostic"] == None

hooks = {
_PIPELINE_HOOK_PROCESS_FRAME: hook_process_frame,
_PIPELINE_HOOK_PROCESS_ELEMENT: hook_process_element,
_PIPELINE_HOOK_PROCESS_ELEMENT_POST: hook_process_element_post,
_PIPELINE_HOOK_PROCESS_FRAME_COMPLETE: hook_process_frame_complete,
_PIPELINE_HOOK_DESTROY_STREAM: hook_destroy_stream,
}
do_create_pipeline(PIPELINE_DEFINITION, hooks=hooks, frame_data=None,
stream_id="0")
assert hooks_called == list(hooks.keys())