Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions amber/src/main/python/core/python_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

from core.models.internal_queue import InternalQueue
from core.runnables import MainLoop, NetworkReceiver, NetworkSender, Heartbeat
from core.util.runnable.runnable import Runnable
from core.util.stoppable.stoppable import Stoppable
from core.util.runnable import Runnable
from core.util.stoppable import Stoppable


class PythonWorker(Runnable, Stoppable):
Expand Down
2 changes: 1 addition & 1 deletion amber/src/main/python/core/runnables/data_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from core.util import Stoppable
from core.util.console_message.replace_print import replace_print
from core.util.console_message.timestamp import current_time_in_local_timezone
from core.util.runnable.runnable import Runnable
from core.util.runnable import Runnable
from proto.org.apache.texera.amber.engine.architecture.rpc import (
ConsoleMessage,
ConsoleMessageType,
Expand Down
4 changes: 2 additions & 2 deletions amber/src/main/python/core/runnables/heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
from overrides import overrides
from threading import Event

from core.util.runnable.runnable import Runnable
from core.util.stoppable.stoppable import Stoppable
from core.util.runnable import Runnable
from core.util.stoppable import Stoppable


class Heartbeat(Runnable, Stoppable):
Expand Down
2 changes: 1 addition & 1 deletion amber/src/main/python/core/runnables/network_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
)
from core.proxy import ProxyServer
from core.util import Stoppable, get_one_of
from core.util.runnable.runnable import Runnable
from core.util.runnable import Runnable
from proto.org.apache.texera.amber.engine.architecture.rpc import EmbeddedControlMessage
from proto.org.apache.texera.amber.engine.common import (
PythonControlMessage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from core.models.internal_queue import DataElement, ECMElement
from core.storage.document_factory import DocumentFactory
from core.util import Stoppable, get_one_of
from core.util.runnable.runnable import Runnable
from core.util.runnable import Runnable
from core.util.virtual_identity import get_from_actor_id_for_input_port_storage
from proto.org.apache.texera.amber.core import (
ActorVirtualIdentity,
Expand Down
2 changes: 1 addition & 1 deletion amber/src/main/python/core/util/buffer/buffer_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from abc import ABCMeta

from core.util.protocol.base_protocols import FlushedGetable, Putable
from core.util.base_protocols import FlushedGetable, Putable


class IBuffer(FlushedGetable, Putable, metaclass=ABCMeta):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from core.util.customized_queue.inner import inner
from core.util.customized_queue.queue_base import IKeyedQueue
from core.util.thread.atomic import AtomicInteger
from core.util.atomic import AtomicInteger

K = TypeVar("K")
T = TypeVar("T")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from abc import ABCMeta
from dataclasses import dataclass

from core.util.protocol.base_protocols import (
from core.util.base_protocols import (
Putable,
Getable,
EmtpyCheckable,
Expand Down
16 changes: 0 additions & 16 deletions amber/src/main/python/core/util/operator/__init__.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
from overrides import overrides

from core.util.customized_queue.queue_base import IQueue, QueueControl, QueueElement
from core.util.runnable.runnable import Runnable
from core.util.stoppable.stoppable import Stoppable
from core.util.runnable import Runnable
from .stoppable import Stoppable


class StoppableQueueBlockingRunnable(Runnable, Stoppable):
Expand Down
16 changes: 0 additions & 16 deletions amber/src/main/python/core/util/thread/__init__.py

This file was deleted.

125 changes: 125 additions & 0 deletions amber/src/test/python/core/test_python_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import pytest

import core.python_worker as pw


class _FakeReceiver:
def __init__(self, input_queue, host):
self.input_queue = input_queue
self.host = host
self.proxy_server = type(
"FakeProxyServer", (), {"get_port_number": staticmethod(lambda: 12345)}
)()
self._shutdown_cb = None

def register_shutdown(self, cb):
self._shutdown_cb = cb

def run(self):
pass

def stop(self):
pass


class _FakeSender:
def __init__(self, output_queue, host, port, handshake_port):
self.output_queue = output_queue
self.host = host
self.port = port
self.handshake_port = handshake_port
self.stopped = False

def run(self):
pass

def stop(self):
self.stopped = True


class _FakeMainLoop:
def __init__(self, worker_id, input_queue, output_queue):
self.worker_id = worker_id
self.stopped = False

def run(self):
pass

def stop(self):
self.stopped = True


class _FakeHeartbeat:
def __init__(self, host, port, interval, stop_event):
self.host = host
self.port = port
self.interval = interval
self.stop_event = stop_event
self.stopped = False

def run(self):
pass

def stop(self):
self.stopped = True


@pytest.fixture
def stub_network(monkeypatch):
monkeypatch.setattr(pw, "NetworkReceiver", _FakeReceiver)
monkeypatch.setattr(pw, "NetworkSender", _FakeSender)
monkeypatch.setattr(pw, "MainLoop", _FakeMainLoop)
monkeypatch.setattr(pw, "Heartbeat", _FakeHeartbeat)


class TestPythonWorker:
@pytest.mark.timeout(5)
def test_construction_wires_dependencies(self, stub_network):
worker = pw.PythonWorker(worker_id="w-1", host="localhost", output_port=9999)

# NetworkSender must receive the handshake port from the receiver's
# proxy server — this is the Java→Python wiring contract.
assert worker._network_sender.handshake_port == 12345
assert worker._network_sender.port == 9999
# The receiver's shutdown callback is wired to worker.stop so a
# client-side disconnect tears the worker down.
assert worker._network_receiver._shutdown_cb == worker.stop

@pytest.mark.timeout(5)
def test_stop_cascades_to_main_loop_sender_and_heartbeat(self, stub_network):
worker = pw.PythonWorker(worker_id="w-1", host="localhost", output_port=9999)

worker.stop()

assert worker._main_loop.stopped is True
assert worker._network_sender.stopped is True
assert worker._heartbeat.stopped is True

@pytest.mark.timeout(5)
def test_run_sets_stop_event_after_main_loop_returns(self, stub_network):
worker = pw.PythonWorker(worker_id="w-1", host="localhost", output_port=9999)

# All fakes' run() return immediately, so run() drains all threads
# without blocking. The contract is that the heartbeat stop event
# is set after the main loop / sender threads join, so the
# heartbeat thread can exit cleanly.
worker.run()

assert worker._stop_event.is_set()
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import pytest

from core.util.thread.atomic import AtomicInteger
from core.util.atomic import AtomicInteger


class TestAtomicIntegerSingleThreaded:
Expand Down
Loading