diff --git a/src/aiko_services/main/pipeline.py b/src/aiko_services/main/pipeline.py index cfb9725..401825b 100755 --- a/src/aiko_services/main/pipeline.py +++ b/src/aiko_services/main/pipeline.py @@ -644,7 +644,7 @@ def create_stream(self, stream_id, graph_path=None, pass @abstractmethod - def destroy_stream(self, stream_id, graceful=False): + def destroy_stream(self, stream_id, graceful=False, use_thread_local=True, diagnostic={}): pass @abstractmethod @@ -1014,7 +1014,10 @@ def create_stream(self, stream_id, graph_path=None, self._disable_thread_local("create_stream()") return True - def destroy_stream(self, stream_id, graceful=False, use_thread_local=True): + def destroy_stream(self, stream_id, + graceful=False, + use_thread_local=True, + diagnostic={}): stream_id = str(stream_id) # TODO: Proper solution for handling of remote Pipeline proxy @@ -1049,7 +1052,7 @@ def destroy_stream(self, stream_id, graceful=False, use_thread_local=True): stream.lock.acquire("destroy_stream()") if graceful and len(stream.frames): - arguments = [stream_id, graceful, use_thread_local] + arguments = [stream_id, graceful, use_thread_local, diagnostic] self._post_message( ActorTopic.IN, "destroy_stream", arguments, delay=3.0) return False @@ -1059,14 +1062,18 @@ def destroy_stream(self, stream_id, graceful=False, use_thread_local=True): if stream_id in self.DEBUG: # DEBUG: 2024-12-02 del self.DEBUG[stream_id] + destroy_stream_state = stream.state + graph_path = self.pipeline_graph.get_path(self.share["graph_path"]) for node in graph_path: element, element_name, local, _ = \ PipelineGraph.get_element(node) if local: ## Local element ## try: - stream_event, diagnostic = element.stop_stream( + stream_event, stop_stream_data = element.stop_stream( stream, stream_id) + if stream_event == StreamEvent.ERROR: + diagnostic = stop_stream_data except Exception as exception: self.logger.error("Exception in " \ "pipeline.destroy_stream() --> stop_stream()") @@ -1077,7 +1084,23 @@ def destroy_stream(self, stream_id, graceful=False, use_thread_local=True): element_name, stream, stream_event, diagnostic, in_destroy_stream=True)) if stream.state == StreamState.ERROR: - break + destroy_stream_state = StreamState.ERROR + elif destroy_stream_state == StreamState.ERROR: + stream.state = StreamState.ERROR + + # Notify listeners that the stream has stopped + stop_state = stream.state + if stop_state >= StreamState.RUN: + stop_state = StreamState.STOP + stream_info = { + "stream_id": stream.stream_id, + "frame_id": stream.frame_id, + "state": stop_state} + if stream.queue_response: + stream.queue_response.put((stream_info, diagnostic)) + if stream.topic_response: + actor = get_actor_mqtt(stream.topic_response, Pipeline) + actor.process_frame_response(stream_info, diagnostic) finally: if use_thread_local: stream.lock.release() @@ -1580,7 +1603,8 @@ def get_stream_id(): if not in_destroy_stream: # avoid destroy_stream() recursion if stream.lock._in_use: stream.lock.release() - self.destroy_stream(get_stream_id(), use_thread_local=False) + stream.state = StreamState.ERROR + self.destroy_stream(get_stream_id(), use_thread_local=False, diagnostic=diagnostic) return stream_state