From 85bb311469684f82b3d59ecb58e07cc9c4b9ec6d Mon Sep 17 00:00:00 2001 From: Paul Nechifor Date: Thu, 22 Jan 2026 05:37:38 +0200 Subject: [PATCH 1/3] add replacement for dask --- dimos/core/blueprints.py | 5 +- dimos/core/global_config.py | 1 + dimos/core/module.py | 12 +- dimos/core/module_coordinator.py | 56 ++++++-- dimos/core/test_modules.py | 5 +- dimos/core/test_worker.py | 153 ++++++++++++++++++++++ dimos/core/worker.py | 212 +++++++++++++++++++++++++++++++ dimos/core/worker_manager.py | 76 +++++++++++ dimos/utils/sequential_ids.py | 27 ++++ pyproject.toml | 3 + uv.lock | 17 +++ 11 files changed, 544 insertions(+), 23 deletions(-) create mode 100644 dimos/core/test_worker.py create mode 100644 dimos/core/worker.py create mode 100644 dimos/core/worker_manager.py create mode 100644 dimos/utils/sequential_ids.py diff --git a/dimos/core/blueprints.py b/dimos/core/blueprints.py index 1fa51629bf..83cc002f60 100644 --- a/dimos/core/blueprints.py +++ b/dimos/core/blueprints.py @@ -202,12 +202,15 @@ def _verify_no_name_conflicts(self) -> None: def _deploy_all_modules( self, module_coordinator: ModuleCoordinator, global_config: GlobalConfig ) -> None: + module_specs: list[tuple[type[Module], tuple[Any, ...], dict[str, Any]]] = [] for blueprint in self.blueprints: kwargs = {**blueprint.kwargs} sig = inspect.signature(blueprint.module.__init__) if "global_config" in sig.parameters: kwargs["global_config"] = global_config - module_coordinator.deploy(blueprint.module, *blueprint.args, **kwargs) + module_specs.append((blueprint.module, blueprint.args, kwargs)) + + module_coordinator.deploy_parallel(module_specs) def _connect_transports(self, module_coordinator: ModuleCoordinator) -> None: # Gather all the In/Out connections with remapping applied. diff --git a/dimos/core/global_config.py b/dimos/core/global_config.py index 205c38c361..34dfd0da18 100644 --- a/dimos/core/global_config.py +++ b/dimos/core/global_config.py @@ -48,6 +48,7 @@ class GlobalConfig(BaseSettings): robot_rotation_diameter: float = 0.6 planner_strategy: NavigationStrategy = "simple" planner_robot_speed: float | None = None + dask: bool = True model_config = SettingsConfigDict( env_file=".env", diff --git a/dimos/core/module.py b/dimos/core/module.py index 08e428d3c7..f68233d50d 100644 --- a/dimos/core/module.py +++ b/dimos/core/module.py @@ -26,12 +26,15 @@ overload, ) +from typing_extensions import TypeVar as TypeVarExtension + if TYPE_CHECKING: from dimos.core.introspection.module import ModuleInfo +from typing import TypeVar + from dask.distributed import Actor, get_worker from reactivex.disposable import CompositeDisposable -from typing_extensions import TypeVar from dimos.core import colors from dimos.core.core import T, rpc @@ -82,7 +85,7 @@ class ModuleConfig: frame_id: str | None = None -ModuleConfigT = TypeVar("ModuleConfigT", bound=ModuleConfig, default=ModuleConfig) +ModuleConfigT = TypeVarExtension("ModuleConfigT", bound=ModuleConfig, default=ModuleConfig) class ModuleBase(Configurable[ModuleConfigT], SkillContainer, Resource): @@ -355,7 +358,7 @@ def get_rpc_calls(self, *methods: str) -> RpcCall | tuple[RpcCall, ...]: # type return result[0] if len(result) == 1 else result -class DaskModule(ModuleBase[ModuleConfigT]): +class Module(ModuleBase[ModuleConfigT]): ref: Actor worker: int @@ -454,5 +457,4 @@ def dask_register_subscriber(self, output_name: str, subscriber: RemoteIn[T]) -> getattr(self, output_name).transport.dask_register_subscriber(subscriber) -# global setting -Module = DaskModule +ModuleT = TypeVar("ModuleT", bound="Module") diff --git a/dimos/core/module_coordinator.py b/dimos/core/module_coordinator.py index 9f38fabe05..984b57007e 100644 --- a/dimos/core/module_coordinator.py +++ b/dimos/core/module_coordinator.py @@ -12,22 +12,26 @@ # See the License for the specific language governing permissions and # limitations under the License. +from concurrent.futures import ThreadPoolExecutor import time -from typing import TypeVar + +from traitlets import Any from dimos import core -from dimos.core import DimosCluster, Module +from dimos.core import DimosCluster from dimos.core.global_config import GlobalConfig +from dimos.core.module import Module, ModuleT from dimos.core.resource import Resource - -T = TypeVar("T", bound="Module") +from dimos.core.rpc_client import RPCClient +from dimos.core.worker_manager import WorkerManager class ModuleCoordinator(Resource): - _client: DimosCluster | None = None + _client: DimosCluster | WorkerManager | None = None + _global_config: GlobalConfig _n: int | None = None _memory_limit: str = "auto" - _deployed_modules: dict[type[Module], Module] = {} + _deployed_modules: dict[type[Module], RPCClient] = {} def __init__( self, @@ -37,9 +41,13 @@ def __init__( cfg = global_config or GlobalConfig() self._n = n if n is not None else cfg.n_dask_workers self._memory_limit = cfg.memory_limit + self._global_config = cfg def start(self) -> None: - self._client = core.start(self._n, self._memory_limit) + if self._global_config.dask: + self._client = core.start(self._n, self._memory_limit) + else: + self._client = WorkerManager() def stop(self) -> None: for module in reversed(self._deployed_modules.values()): @@ -47,19 +55,41 @@ def stop(self) -> None: self._client.close_all() # type: ignore[union-attr] - def deploy(self, module_class: type[T], *args, **kwargs) -> T: # type: ignore[no-untyped-def] + def deploy(self, module_class: type[ModuleT], *args: Any, **kwargs: Any) -> RPCClient: if not self._client: raise ValueError("Not started") - module = self._client.deploy(module_class, *args, **kwargs) # type: ignore[attr-defined] + module = self._client.deploy(module_class, *args, **kwargs) # type: ignore[union-attr] self._deployed_modules[module_class] = module - return module # type: ignore[no-any-return] + return module + + def deploy_parallel( + self, module_specs: list[tuple[type[ModuleT], tuple[Any, ...], dict[str, Any]]] + ) -> list[RPCClient]: + if not self._client: + raise ValueError("Not started") + + if isinstance(self._client, WorkerManager): + modules = self._client.deploy_parallel(module_specs) + for (module_class, _, _), module in zip(module_specs, modules, strict=True): + self._deployed_modules[module_class] = module + return modules # type: ignore[return-value] + else: + return [ + self.deploy(module_class, *args, **kwargs) + for module_class, args, kwargs in module_specs + ] def start_all_modules(self) -> None: - for module in self._deployed_modules.values(): - module.start() + modules = list(self._deployed_modules.values()) + if isinstance(self._client, WorkerManager): + with ThreadPoolExecutor(max_workers=len(modules)) as executor: + list(executor.map(lambda m: m.start(), modules)) + else: + for module in modules: + module.start() - def get_instance(self, module: type[T]) -> T | None: + def get_instance(self, module: type[ModuleT]) -> ModuleT | None: return self._deployed_modules.get(module) # type: ignore[return-value] def loop(self) -> None: diff --git a/dimos/core/test_modules.py b/dimos/core/test_modules.py index 7bd995c857..f239df918e 100644 --- a/dimos/core/test_modules.py +++ b/dimos/core/test_modules.py @@ -89,13 +89,10 @@ def is_module_subclass( target_classes = { "Module", "ModuleBase", - "DaskModule", "dimos.core.Module", "dimos.core.ModuleBase", - "dimos.core.DaskModule", "dimos.core.module.Module", "dimos.core.module.ModuleBase", - "dimos.core.module.DaskModule", } def find_qualified_name(base: str, context_module: str | None = None) -> str: @@ -291,7 +288,7 @@ def get_all_module_subclasses(): filtered_results = [] for class_name, filepath, has_start, has_stop, forbidden_methods in results: # Skip base module classes themselves - if class_name in ("Module", "ModuleBase", "DaskModule", "SkillModule"): + if class_name in ("Module", "ModuleBase", "SkillModule"): continue # Skip test-only modules (those defined in test_ files) diff --git a/dimos/core/test_worker.py b/dimos/core/test_worker.py new file mode 100644 index 0000000000..98a7c5782d --- /dev/null +++ b/dimos/core/test_worker.py @@ -0,0 +1,153 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from dimos.core import In, Module, Out, rpc +from dimos.core.worker_manager import WorkerManager +from dimos.msgs.geometry_msgs import Vector3 + + +class SimpleModule(Module): + output: Out[Vector3] + input: In[Vector3] + + counter: int = 0 + + @rpc + def start(self) -> None: + pass + + @rpc + def increment(self) -> int: + self.counter += 1 + return self.counter + + @rpc + def get_counter(self) -> int: + return self.counter + + +class AnotherModule(Module): + value: int = 100 + + @rpc + def start(self) -> None: + pass + + @rpc + def add(self, n: int) -> int: + self.value += n + return self.value + + @rpc + def get_value(self) -> int: + return self.value + + +class ThirdModule(Module): + multiplier: int = 1 + + @rpc + def start(self) -> None: + pass + + @rpc + def multiply(self, n: int) -> int: + self.multiplier *= n + return self.multiplier + + @rpc + def get_multiplier(self) -> int: + return self.multiplier + + +@pytest.fixture +def worker_manager(): + manager = WorkerManager() + try: + yield manager + finally: + manager.close_all() + + +@pytest.mark.integration +def test_worker_manager_basic(worker_manager): + module = worker_manager.deploy(SimpleModule) + module.start() + + result = module.increment() + assert result == 1 + + result = module.increment() + assert result == 2 + + result = module.get_counter() + assert result == 2 + + module.stop() + + +@pytest.mark.integration +def test_worker_manager_multiple_different_modules(worker_manager): + module1 = worker_manager.deploy(SimpleModule) + module2 = worker_manager.deploy(AnotherModule) + + module1.start() + module2.start() + + # Each module has its own state + module1.increment() + module1.increment() + module2.add(10) + + assert module1.get_counter() == 2 + assert module2.get_value() == 110 + + # Stop modules to clean up threads + module1.stop() + module2.stop() + + +@pytest.mark.integration +def test_worker_manager_parallel_deployment(worker_manager): + modules = worker_manager.deploy_parallel( + [ + (SimpleModule, (), {}), + (AnotherModule, (), {}), + (ThirdModule, (), {}), + ] + ) + + assert len(modules) == 3 + module1, module2, module3 = modules + + # Start all modules + module1.start() + module2.start() + module3.start() + + # Each module has its own state + module1.increment() + module2.add(50) + module3.multiply(5) + + assert module1.get_counter() == 1 + assert module2.get_value() == 150 + assert module3.get_multiplier() == 5 + + # Stop modules + module1.stop() + module2.stop() + module3.stop() diff --git a/dimos/core/worker.py b/dimos/core/worker.py new file mode 100644 index 0000000000..431addf3ee --- /dev/null +++ b/dimos/core/worker.py @@ -0,0 +1,212 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import multiprocessing as mp +from multiprocessing.connection import Connection +import traceback +from typing import Any + +from dimos.core.module import ModuleT +from dimos.core.rpc_client import RPCClient +from dimos.utils.actor_registry import ActorRegistry +from dimos.utils.logging_config import setup_logger +from dimos.utils.sequential_ids import SequentialIds + +logger = setup_logger() + + +class ActorFuture: + """Mimics Dask's ActorFuture - wraps a result with .result() method.""" + + def __init__(self, value: Any) -> None: + self._value = value + + def result(self, _timeout: float | None = None) -> Any: + return self._value + + +class Actor: + """Proxy that forwards method calls to the worker process.""" + + def __init__(self, conn: Connection, module_class: type[ModuleT], worker_id: int) -> None: + self._conn = conn + self._cls = module_class + self._worker_id = worker_id + + def _send_request_to_worker(self, request: dict[str, Any]) -> Any: + self._conn.send(request) + response = self._conn.recv() + if response.get("error"): + raise RuntimeError(f"Worker error: {response['error']}") + return response.get("result") + + def set_ref(self, ref: Any) -> ActorFuture: + """Set the actor reference on the remote module.""" + result = self._send_request_to_worker({"type": "set_ref", "ref": ref}) + return ActorFuture(result) + + def __getattr__(self, name: str) -> ActorFuture: + """Proxy attribute access to the worker process.""" + if name.startswith("_"): + raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'") + + result = self._send_request_to_worker({"type": "getattr", "name": name}) + return ActorFuture(result) + + +# Global forkserver context. Using `forkserver` instead of `fork` because it +# avoids CUDA context corruption issues. +_forkserver_ctx: Any = None + + +def _get_forkserver_context() -> Any: + global _forkserver_ctx + if _forkserver_ctx is None: + _forkserver_ctx = mp.get_context("forkserver") + return _forkserver_ctx + + +_seq_ids = SequentialIds() + + +class Worker: + def __init__( + self, + module_class: type[ModuleT], + args: tuple[Any, ...] = (), + kwargs: dict[Any, Any] | None = None, + ) -> None: + self._module_class: type[ModuleT] = module_class + self._args: tuple[Any, ...] = args + self._kwargs: dict[Any, Any] = kwargs or {} + self._process: Any = None + self._conn: Connection | None = None + self._actor: Actor | None = None + self._worker_id: int = _seq_ids.next() + self._ready: bool = False + + def start_process(self) -> None: + parent_conn, child_conn = mp.Pipe() + self._conn = parent_conn + + ctx = _get_forkserver_context() + self._process = ctx.Process( + target=_worker_entrypoint, + args=(child_conn, self._module_class, self._args, self._kwargs, self._worker_id), + daemon=True, + ) + self._process.start() + self._actor = Actor(parent_conn, self._module_class, self._worker_id) + + def wait_until_ready(self) -> None: + if self._ready: + return + if self._actor is None: + raise RuntimeError("Worker process not started") + + worker_id = self._actor.set_ref(self._actor).result() + ActorRegistry.update(str(self._actor), str(worker_id)) + self._ready = True + + logger.info( + "Deployed module.", module=self._module_class.__name__, worker_id=self._worker_id + ) + + def deploy(self) -> None: + self.start_process() + self.wait_until_ready() + + def get_instance(self) -> RPCClient: + if self._actor is None: + raise RuntimeError("Worker not deployed") + return RPCClient(self._actor, self._module_class) + + def shutdown(self) -> None: + if self._conn is not None: + try: + self._conn.send({"type": "shutdown"}) + self._conn.recv() + except (BrokenPipeError, EOFError): + pass + finally: + self._conn.close() + self._conn = None + + if self._process is not None: + self._process.join(timeout=2) + if self._process.is_alive(): + self._process.terminate() + self._process.join(timeout=1) + self._process = None + + +def _worker_entrypoint( + conn: Connection, + module_class: type[ModuleT], + args: tuple[Any, ...], + kwargs: dict[Any, Any], + worker_id: int, +) -> None: + instance = None + + try: + instance = module_class(*args, **kwargs) + instance.worker = worker_id + + _worker_loop(conn, instance, worker_id) + except Exception as e: + logger.error(f"Worker process error: {e}", exc_info=True) + finally: + if instance is not None: + try: + instance.stop() + except Exception: + logger.error("Error during worker shutdown", exc_info=True) + + +def _worker_loop(conn: Connection, instance: Any, worker_id: int) -> None: + while True: + try: + if not conn.poll(timeout=0.1): + continue + request = conn.recv() + except EOFError: + break + + response: dict[str, Any] = {} + try: + req_type = request.get("type") + + if req_type == "set_ref": + instance.ref = request.get("ref") + response["result"] = worker_id + + elif req_type == "getattr": + response["result"] = getattr(instance, request["name"]) + + elif req_type == "shutdown": + response["result"] = True + conn.send(response) + break + + else: + response["error"] = f"Unknown request type: {req_type}" + + except Exception as e: + response["error"] = f"{e.__class__.__name__}: {e}\n{traceback.format_exc()}" + + try: + conn.send(response) + except (BrokenPipeError, EOFError): + break diff --git a/dimos/core/worker_manager.py b/dimos/core/worker_manager.py new file mode 100644 index 0000000000..72ad880edd --- /dev/null +++ b/dimos/core/worker_manager.py @@ -0,0 +1,76 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import multiprocessing.resource_sharer as resource_sharer +from typing import Any + +from dimos.core.module import ModuleT +from dimos.core.rpc_client import RPCClient +from dimos.core.worker import Worker +from dimos.utils.actor_registry import ActorRegistry +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + + +class WorkerManager: + def __init__(self) -> None: + self._workers: list[Worker] = [] + self._closed = False + + def deploy(self, module_class: type[ModuleT], *args: Any, **kwargs: Any) -> RPCClient: + if self._closed: + raise RuntimeError("WorkerManager is closed") + + worker = Worker(module_class, args=args, kwargs=kwargs) + worker.deploy() + self._workers.append(worker) + return worker.get_instance() + + def deploy_parallel( + self, module_specs: list[tuple[type[ModuleT], tuple[Any, ...], dict[Any, Any]]] + ) -> list[RPCClient]: + if self._closed: + raise RuntimeError("WorkerManager is closed") + + workers: list[Worker] = [] + for module_class, args, kwargs in module_specs: + worker = Worker(module_class, args=args, kwargs=kwargs) + worker.start_process() + workers.append(worker) + + for worker in workers: + worker.wait_until_ready() + self._workers.append(worker) + + return [worker.get_instance() for worker in workers] + + def close_all(self) -> None: + if self._closed: + return + self._closed = True + + logger.info("Shutting down all workers...") + + for worker in reversed(self._workers): + try: + worker.shutdown() + except Exception as e: + logger.error(f"Error shutting down worker: {e}", exc_info=True) + + self._workers.clear() + ActorRegistry.clear() + resource_sharer.stop() + + logger.info("All workers shut down") diff --git a/dimos/utils/sequential_ids.py b/dimos/utils/sequential_ids.py new file mode 100644 index 0000000000..d467e8a22d --- /dev/null +++ b/dimos/utils/sequential_ids.py @@ -0,0 +1,27 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from threading import RLock + + +class SequentialIds: + def __init__(self) -> None: + self._value = 0 + self._lock: RLock = RLock() + + def next(self) -> int: + with self._lock: + v = self._value + self._value += 1 + return v diff --git a/pyproject.toml b/pyproject.toml index b91802eb2c..74943dbd2b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -246,6 +246,9 @@ dev = [ "types-tabulate>=0.9.0.20241207,<1", "types-tensorflow>=2.18.0.20251008,<3", "types-tqdm>=4.67.0.20250809,<5", + + # Tools + "py-spy", ] sim = [ diff --git a/uv.lock b/uv.lock index 7380a40780..dc844f5549 100644 --- a/uv.lock +++ b/uv.lock @@ -1481,6 +1481,7 @@ dev = [ { name = "mypy" }, { name = "pandas-stubs" }, { name = "pre-commit" }, + { name = "py-spy" }, { name = "pytest" }, { name = "pytest-asyncio" }, { name = "pytest-env" }, @@ -1683,6 +1684,7 @@ requires-dist = [ { name = "plotly", marker = "extra == 'manipulation'", specifier = ">=5.9.0" }, { name = "plum-dispatch", specifier = "==2.5.7" }, { name = "pre-commit", marker = "extra == 'dev'", specifier = "==4.2.0" }, + { name = "py-spy", marker = "extra == 'dev'" }, { name = "pydantic" }, { name = "pydantic-settings", specifier = ">=2.11.0,<3" }, { name = "pygame", marker = "extra == 'sim'", specifier = ">=2.6.1" }, @@ -6230,6 +6232,21 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/e0/a9/023730ba63db1e494a271cb018dcd361bd2c917ba7004c3e49d5daf795a2/py_cpuinfo-9.0.0-py3-none-any.whl", hash = "sha256:859625bc251f64e21f077d099d4162689c762b5d6a4c3c97553d56241c9674d5", size = 22335, upload-time = "2022-10-25T20:38:27.636Z" }, ] +[[package]] +name = "py-spy" +version = "0.4.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/19/e2/ff811a367028b87e86714945bb9ecb5c1cc69114a8039a67b3a862cef921/py_spy-0.4.1.tar.gz", hash = "sha256:e53aa53daa2e47c2eef97dd2455b47bb3a7e7f962796a86cc3e7dbde8e6f4db4", size = 244726, upload-time = "2025-07-31T19:33:25.172Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/14/e3/3a32500d845bdd94f6a2b4ed6244982f42ec2bc64602ea8fcfe900678ae7/py_spy-0.4.1-py2.py3-none-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl", hash = "sha256:809094208c6256c8f4ccadd31e9a513fe2429253f48e20066879239ba12cd8cc", size = 3682508, upload-time = "2025-07-31T19:33:13.753Z" }, + { url = "https://files.pythonhosted.org/packages/4f/bf/e4d280e9e0bec71d39fc646654097027d4bbe8e04af18fb68e49afcff404/py_spy-0.4.1-py2.py3-none-macosx_11_0_arm64.whl", hash = "sha256:1fb8bf71ab8df95a95cc387deed6552934c50feef2cf6456bc06692a5508fd0c", size = 1796395, upload-time = "2025-07-31T19:33:15.325Z" }, + { url = "https://files.pythonhosted.org/packages/df/79/9ed50bb0a9de63ed023aa2db8b6265b04a7760d98c61eb54def6a5fddb68/py_spy-0.4.1-py2.py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ee776b9d512a011d1ad3907ed53ae32ce2f3d9ff3e1782236554e22103b5c084", size = 2034938, upload-time = "2025-07-31T19:33:17.194Z" }, + { url = "https://files.pythonhosted.org/packages/53/a5/36862e3eea59f729dfb70ee6f9e14b051d8ddce1aa7e70e0b81d9fe18536/py_spy-0.4.1-py2.py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:532d3525538254d1859b49de1fbe9744df6b8865657c9f0e444bf36ce3f19226", size = 2658968, upload-time = "2025-07-31T19:33:18.916Z" }, + { url = "https://files.pythonhosted.org/packages/08/f8/9ea0b586b065a623f591e5e7961282ec944b5fbbdca33186c7c0296645b3/py_spy-0.4.1-py2.py3-none-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:4972c21890b6814017e39ac233c22572c4a61fd874524ebc5ccab0f2237aee0a", size = 2147541, upload-time = "2025-07-31T19:33:20.565Z" }, + { url = "https://files.pythonhosted.org/packages/68/fb/bc7f639aed026bca6e7beb1e33f6951e16b7d315594e7635a4f7d21d63f4/py_spy-0.4.1-py2.py3-none-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:6a80ec05eb8a6883863a367c6a4d4f2d57de68466f7956b6367d4edd5c61bb29", size = 2763338, upload-time = "2025-07-31T19:33:22.202Z" }, + { url = "https://files.pythonhosted.org/packages/e1/da/fcc9a9fcd4ca946ff402cff20348e838b051d69f50f5d1f5dca4cd3c5eb8/py_spy-0.4.1-py2.py3-none-win_amd64.whl", hash = "sha256:d92e522bd40e9bf7d87c204033ce5bb5c828fca45fa28d970f58d71128069fdc", size = 1818784, upload-time = "2025-07-31T19:33:23.802Z" }, +] + [[package]] name = "pyarrow" version = "22.0.0" From bc11f3f2538898705d811f6f744d2350c787458a Mon Sep 17 00:00:00 2001 From: Paul Nechifor Date: Tue, 27 Jan 2026 08:00:58 +0200 Subject: [PATCH 2/3] fix mypy --- dimos/core/module_coordinator.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dimos/core/module_coordinator.py b/dimos/core/module_coordinator.py index 984b57007e..270ef5d54f 100644 --- a/dimos/core/module_coordinator.py +++ b/dimos/core/module_coordinator.py @@ -14,8 +14,7 @@ from concurrent.futures import ThreadPoolExecutor import time - -from traitlets import Any +from typing import Any from dimos import core from dimos.core import DimosCluster From 4cadde955e5d480270dc39b5eb500b59a7ea4d20 Mon Sep 17 00:00:00 2001 From: Paul Nechifor Date: Tue, 27 Jan 2026 08:47:21 +0200 Subject: [PATCH 3/3] fixed --- dimos/core/worker.py | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/dimos/core/worker.py b/dimos/core/worker.py index 431addf3ee..2a2d343a05 100644 --- a/dimos/core/worker.py +++ b/dimos/core/worker.py @@ -57,12 +57,7 @@ def set_ref(self, ref: Any) -> ActorFuture: return ActorFuture(result) def __getattr__(self, name: str) -> ActorFuture: - """Proxy attribute access to the worker process.""" - if name.startswith("_"): - raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'") - - result = self._send_request_to_worker({"type": "getattr", "name": name}) - return ActorFuture(result) + raise NotImplementedError("Should not be needed anymore.") # Global forkserver context. Using `forkserver` instead of `fork` because it @@ -159,7 +154,6 @@ def _worker_entrypoint( worker_id: int, ) -> None: instance = None - try: instance = module_class(*args, **kwargs) instance.worker = worker_id @@ -181,7 +175,7 @@ def _worker_loop(conn: Connection, instance: Any, worker_id: int) -> None: if not conn.poll(timeout=0.1): continue request = conn.recv() - except EOFError: + except (EOFError, KeyboardInterrupt): break response: dict[str, Any] = {} @@ -192,9 +186,6 @@ def _worker_loop(conn: Connection, instance: Any, worker_id: int) -> None: instance.ref = request.get("ref") response["result"] = worker_id - elif req_type == "getattr": - response["result"] = getattr(instance, request["name"]) - elif req_type == "shutdown": response["result"] = True conn.send(response)