-
Notifications
You must be signed in to change notification settings - Fork 0
WIP: add replacement for dask #1111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Conversation
paul-nechifor
commented
Jan 25, 2026
Greptile OverviewGreptile SummaryThis PR implements an alternative to Dask for module deployment using a multiprocessing-based worker system with forkserver. The implementation allows toggling between Dask and the new system via the Key Changes:
Issues Found:
Confidence Score: 2/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant MB as ModuleBlueprintSet
participant MC as ModuleCoordinator
participant WM as WorkerManager
participant W as Worker
participant P as Process (forkserver)
participant M as Module Instance
participant RPC as RPCClient
Note over MB,MC: Module Deployment (--no-dask mode)
MB->>MB: Collect module specs
MB->>MC: deploy_parallel(module_specs)
MC->>MC: Check if WorkerManager
MC->>WM: deploy_parallel(module_specs)
loop For each module_spec
WM->>W: Create Worker(module_class, args, kwargs)
W->>W: Assign worker_id
W->>P: Start forkserver Process
activate P
P->>M: Instantiate module_class(*args, **kwargs)
P->>M: Set worker = worker_id
P->>P: Start _worker_loop()
end
loop For each worker
W->>P: Send set_ref request via pipe
P->>M: Set instance.ref = actor
P-->>W: Return worker_id
W->>W: Mark ready
end
loop For each worker
W->>RPC: Create RPCClient(actor, module_class)
RPC->>RPC: Start LCMRPC service
WM-->>MC: Return RPCClient
end
MC-->>MB: Return list of RPCClients
Note over MB,RPC: RPC Method Call
MB->>RPC: module.start()
RPC->>RPC: Create RpcCall
RPC->>W: actor.__getattr__("start")
W->>P: Send getattr request via pipe
P->>M: getattr(instance, "start")
P-->>W: Return method
W-->>RPC: Return ActorFuture(method)
RPC->>RPC: Call method via LCMRPC
Note over MC,P: Shutdown
MC->>RPC: module.stop()
RPC->>P: call_nowait("stop")
P->>M: instance.stop()
MC->>WM: close_all()
WM->>W: shutdown()
W->>P: Send shutdown request
P-->>W: Acknowledge
W->>P: Join/terminate process
deactivate P
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
6 files reviewed, 6 comments
|
|
||
| from typing import TypeVar | ||
|
|
||
| from dask.distributed import Actor, get_worker |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get_worker is still imported from dask but is used in Module.set_ref() on line 424. When --no-dask mode is enabled, this import will succeed but calling get_worker() will fail at runtime since no Dask worker context exists.
| from dask.distributed import Actor, get_worker | |
| from dask.distributed import Actor |
| from typing_extensions import TypeVar as TypeVarExtension | ||
|
|
||
| if TYPE_CHECKING: | ||
| from dimos.core.introspection.module import ModuleInfo | ||
|
|
||
| from typing import TypeVar |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TypeVar is imported twice - once from typing_extensions as TypeVarExtension (line 29), and again from typing (line 34). The second import shadows the first one.
| from typing_extensions import TypeVar as TypeVarExtension | |
| if TYPE_CHECKING: | |
| from dimos.core.introspection.module import ModuleInfo | |
| from typing import TypeVar | |
| from typing_extensions import TypeVar as TypeVarExtension | |
| if TYPE_CHECKING: | |
| from dimos.core.introspection.module import ModuleInfo | |
| from typing import TypeVar |
| def __getattr__(self, name: str) -> ActorFuture: | ||
| """Proxy attribute access to the worker process.""" | ||
| if name.startswith("_"): | ||
| raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'") | ||
|
|
||
| result = self._send_request_to_worker({"type": "getattr", "name": name}) | ||
| return ActorFuture(result) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The __getattr__ method always returns an ActorFuture, but attribute access should return the actual attribute value, not a future. This works for method calls (where the future wraps the return value), but for non-callable attributes, callers would get an ActorFuture wrapper instead of the value itself.
| elif req_type == "getattr": | ||
| response["result"] = getattr(instance, request["name"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This getattr request returns the attribute value directly without wrapping method calls. When the attribute is a method, it returns the bound method object, not the result of calling it. This means the Actor.__getattr__ will wrap a method object in an ActorFuture, not the result of calling the method.
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| from concurrent.futures import ThreadPoolExecutor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
traitlets.Any is imported but typing.Any should be used instead for type annotations
| from concurrent.futures import ThreadPoolExecutor | |
| from typing import Any |
Additional Comments (1)
|
945e0ba to
85bb311
Compare