diff --git a/amber/src/main/python/core/python_worker.py b/amber/src/main/python/core/python_worker.py index bcd0652d596..d1467de6dbe 100644 --- a/amber/src/main/python/core/python_worker.py +++ b/amber/src/main/python/core/python_worker.py @@ -20,8 +20,8 @@ from core.models.internal_queue import InternalQueue from core.runnables import MainLoop, NetworkReceiver, NetworkSender, Heartbeat -from core.util.runnable.runnable import Runnable -from core.util.stoppable.stoppable import Stoppable +from core.util.runnable import Runnable +from core.util.stoppable import Stoppable class PythonWorker(Runnable, Stoppable): diff --git a/amber/src/main/python/core/runnables/data_processor.py b/amber/src/main/python/core/runnables/data_processor.py index 089a162228a..3998a3ff9ad 100644 --- a/amber/src/main/python/core/runnables/data_processor.py +++ b/amber/src/main/python/core/runnables/data_processor.py @@ -30,7 +30,7 @@ from core.util import Stoppable from core.util.console_message.replace_print import replace_print from core.util.console_message.timestamp import current_time_in_local_timezone -from core.util.runnable.runnable import Runnable +from core.util.runnable import Runnable from proto.org.apache.texera.amber.engine.architecture.rpc import ( ConsoleMessage, ConsoleMessageType, diff --git a/amber/src/main/python/core/runnables/heartbeat.py b/amber/src/main/python/core/runnables/heartbeat.py index 9199518f3f4..1e4c45837d4 100644 --- a/amber/src/main/python/core/runnables/heartbeat.py +++ b/amber/src/main/python/core/runnables/heartbeat.py @@ -24,8 +24,8 @@ from overrides import overrides from threading import Event -from core.util.runnable.runnable import Runnable -from core.util.stoppable.stoppable import Stoppable +from core.util.runnable import Runnable +from core.util.stoppable import Stoppable class Heartbeat(Runnable, Stoppable): diff --git a/amber/src/main/python/core/runnables/network_receiver.py b/amber/src/main/python/core/runnables/network_receiver.py index 659cd65c78d..8ba4fbe1472 100644 --- a/amber/src/main/python/core/runnables/network_receiver.py +++ b/amber/src/main/python/core/runnables/network_receiver.py @@ -43,7 +43,7 @@ ) from core.proxy import ProxyServer from core.util import Stoppable, get_one_of -from core.util.runnable.runnable import Runnable +from core.util.runnable import Runnable from proto.org.apache.texera.amber.engine.architecture.rpc import EmbeddedControlMessage from proto.org.apache.texera.amber.engine.common import ( PythonControlMessage, diff --git a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py index e49c0316cc7..6122bbb8b98 100644 --- a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py +++ b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py @@ -38,7 +38,7 @@ from core.models.internal_queue import DataElement, ECMElement from core.storage.document_factory import DocumentFactory from core.util import Stoppable, get_one_of -from core.util.runnable.runnable import Runnable +from core.util.runnable import Runnable from core.util.virtual_identity import get_from_actor_id_for_input_port_storage from proto.org.apache.texera.amber.core import ( ActorVirtualIdentity, diff --git a/amber/src/main/python/core/util/thread/atomic.py b/amber/src/main/python/core/util/atomic.py similarity index 100% rename from amber/src/main/python/core/util/thread/atomic.py rename to amber/src/main/python/core/util/atomic.py diff --git a/amber/src/main/python/core/util/protocol/base_protocols.py b/amber/src/main/python/core/util/base_protocols.py similarity index 100% rename from amber/src/main/python/core/util/protocol/base_protocols.py rename to amber/src/main/python/core/util/base_protocols.py diff --git a/amber/src/main/python/core/util/buffer/buffer_base.py b/amber/src/main/python/core/util/buffer/buffer_base.py index 244b7357407..10a44d28aae 100644 --- a/amber/src/main/python/core/util/buffer/buffer_base.py +++ b/amber/src/main/python/core/util/buffer/buffer_base.py @@ -17,7 +17,7 @@ from abc import ABCMeta -from core.util.protocol.base_protocols import FlushedGetable, Putable +from core.util.base_protocols import FlushedGetable, Putable class IBuffer(FlushedGetable, Putable, metaclass=ABCMeta): diff --git a/amber/src/main/python/core/util/customized_queue/linked_blocking_multi_queue.py b/amber/src/main/python/core/util/customized_queue/linked_blocking_multi_queue.py index 3b46e6db4d7..735f0f6dc0d 100644 --- a/amber/src/main/python/core/util/customized_queue/linked_blocking_multi_queue.py +++ b/amber/src/main/python/core/util/customized_queue/linked_blocking_multi_queue.py @@ -23,7 +23,7 @@ from core.util.customized_queue.inner import inner from core.util.customized_queue.queue_base import IKeyedQueue -from core.util.thread.atomic import AtomicInteger +from core.util.atomic import AtomicInteger K = TypeVar("K") T = TypeVar("T") diff --git a/amber/src/main/python/core/util/customized_queue/queue_base.py b/amber/src/main/python/core/util/customized_queue/queue_base.py index 47b8aac94e4..4ee96312cba 100644 --- a/amber/src/main/python/core/util/customized_queue/queue_base.py +++ b/amber/src/main/python/core/util/customized_queue/queue_base.py @@ -18,7 +18,7 @@ from abc import ABCMeta from dataclasses import dataclass -from core.util.protocol.base_protocols import ( +from core.util.base_protocols import ( Putable, Getable, EmtpyCheckable, diff --git a/amber/src/main/python/core/util/expression_evaluator/__init__.py b/amber/src/main/python/core/util/expression_evaluator.py similarity index 100% rename from amber/src/main/python/core/util/expression_evaluator/__init__.py rename to amber/src/main/python/core/util/expression_evaluator.py diff --git a/amber/src/main/python/core/util/operator/__init__.py b/amber/src/main/python/core/util/operator/__init__.py deleted file mode 100644 index 13a83393a91..00000000000 --- a/amber/src/main/python/core/util/operator/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/amber/src/main/python/core/util/runnable/runnable.py b/amber/src/main/python/core/util/runnable.py similarity index 100% rename from amber/src/main/python/core/util/runnable/runnable.py rename to amber/src/main/python/core/util/runnable.py diff --git a/amber/src/main/python/core/util/stoppable/stoppable_queue_blocking_thread.py b/amber/src/main/python/core/util/stoppable/stoppable_queue_blocking_thread.py index d20073631b3..992ad596fe0 100644 --- a/amber/src/main/python/core/util/stoppable/stoppable_queue_blocking_thread.py +++ b/amber/src/main/python/core/util/stoppable/stoppable_queue_blocking_thread.py @@ -19,8 +19,8 @@ from overrides import overrides from core.util.customized_queue.queue_base import IQueue, QueueControl, QueueElement -from core.util.runnable.runnable import Runnable -from core.util.stoppable.stoppable import Stoppable +from core.util.runnable import Runnable +from .stoppable import Stoppable class StoppableQueueBlockingRunnable(Runnable, Stoppable): diff --git a/amber/src/main/python/core/util/thread/__init__.py b/amber/src/main/python/core/util/thread/__init__.py deleted file mode 100644 index 13a83393a91..00000000000 --- a/amber/src/main/python/core/util/thread/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/amber/src/main/python/core/util/virtual_identity/__init__.py b/amber/src/main/python/core/util/virtual_identity.py similarity index 100% rename from amber/src/main/python/core/util/virtual_identity/__init__.py rename to amber/src/main/python/core/util/virtual_identity.py diff --git a/amber/src/test/python/core/test_python_worker.py b/amber/src/test/python/core/test_python_worker.py new file mode 100644 index 00000000000..25658fc78b7 --- /dev/null +++ b/amber/src/test/python/core/test_python_worker.py @@ -0,0 +1,125 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pytest + +import core.python_worker as pw + + +class _FakeReceiver: + def __init__(self, input_queue, host): + self.input_queue = input_queue + self.host = host + self.proxy_server = type( + "FakeProxyServer", (), {"get_port_number": staticmethod(lambda: 12345)} + )() + self._shutdown_cb = None + + def register_shutdown(self, cb): + self._shutdown_cb = cb + + def run(self): + pass + + def stop(self): + pass + + +class _FakeSender: + def __init__(self, output_queue, host, port, handshake_port): + self.output_queue = output_queue + self.host = host + self.port = port + self.handshake_port = handshake_port + self.stopped = False + + def run(self): + pass + + def stop(self): + self.stopped = True + + +class _FakeMainLoop: + def __init__(self, worker_id, input_queue, output_queue): + self.worker_id = worker_id + self.stopped = False + + def run(self): + pass + + def stop(self): + self.stopped = True + + +class _FakeHeartbeat: + def __init__(self, host, port, interval, stop_event): + self.host = host + self.port = port + self.interval = interval + self.stop_event = stop_event + self.stopped = False + + def run(self): + pass + + def stop(self): + self.stopped = True + + +@pytest.fixture +def stub_network(monkeypatch): + monkeypatch.setattr(pw, "NetworkReceiver", _FakeReceiver) + monkeypatch.setattr(pw, "NetworkSender", _FakeSender) + monkeypatch.setattr(pw, "MainLoop", _FakeMainLoop) + monkeypatch.setattr(pw, "Heartbeat", _FakeHeartbeat) + + +class TestPythonWorker: + @pytest.mark.timeout(5) + def test_construction_wires_dependencies(self, stub_network): + worker = pw.PythonWorker(worker_id="w-1", host="localhost", output_port=9999) + + # NetworkSender must receive the handshake port from the receiver's + # proxy server — this is the Java→Python wiring contract. + assert worker._network_sender.handshake_port == 12345 + assert worker._network_sender.port == 9999 + # The receiver's shutdown callback is wired to worker.stop so a + # client-side disconnect tears the worker down. + assert worker._network_receiver._shutdown_cb == worker.stop + + @pytest.mark.timeout(5) + def test_stop_cascades_to_main_loop_sender_and_heartbeat(self, stub_network): + worker = pw.PythonWorker(worker_id="w-1", host="localhost", output_port=9999) + + worker.stop() + + assert worker._main_loop.stopped is True + assert worker._network_sender.stopped is True + assert worker._heartbeat.stopped is True + + @pytest.mark.timeout(5) + def test_run_sets_stop_event_after_main_loop_returns(self, stub_network): + worker = pw.PythonWorker(worker_id="w-1", host="localhost", output_port=9999) + + # All fakes' run() return immediately, so run() drains all threads + # without blocking. The contract is that the heartbeat stop event + # is set after the main loop / sender threads join, so the + # heartbeat thread can exit cleanly. + worker.run() + + assert worker._stop_event.is_set() diff --git a/amber/src/test/python/core/util/thread/test_atomic.py b/amber/src/test/python/core/util/test_atomic.py similarity index 99% rename from amber/src/test/python/core/util/thread/test_atomic.py rename to amber/src/test/python/core/util/test_atomic.py index fa6238e0eb2..0f6b393233c 100644 --- a/amber/src/test/python/core/util/thread/test_atomic.py +++ b/amber/src/test/python/core/util/test_atomic.py @@ -19,7 +19,7 @@ import pytest -from core.util.thread.atomic import AtomicInteger +from core.util.atomic import AtomicInteger class TestAtomicIntegerSingleThreaded: diff --git a/amber/src/test/python/core/util/expression_evaluator/test_expression_evaluator.py b/amber/src/test/python/core/util/test_expression_evaluator.py similarity index 100% rename from amber/src/test/python/core/util/expression_evaluator/test_expression_evaluator.py rename to amber/src/test/python/core/util/test_expression_evaluator.py diff --git a/amber/src/test/python/core/util/virtual_identity/test_virtual_identity.py b/amber/src/test/python/core/util/test_virtual_identity.py similarity index 100% rename from amber/src/test/python/core/util/virtual_identity/test_virtual_identity.py rename to amber/src/test/python/core/util/test_virtual_identity.py