Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
ec79e47
Update __init__.py,condition.py scheduler.py
nadzhou Dec 6, 2025
9b9e0a8
Update
nadzhou Dec 6, 2025
ce5ea18
Update scheduler.py
nadzhou Dec 6, 2025
eb7046a
Update scheduler.py
nadzhou Dec 6, 2025
7a1cef8
Update condition.py,test_condition.py
nadzhou Dec 6, 2025
fda7dce
Update scheduler.py
nadzhou Dec 6, 2025
e04d5da
Update condition.py
nadzhou Dec 6, 2025
f777c64
Update condition.py
nadzhou Dec 14, 2025
585b677
Update condition.py
nadzhou Dec 14, 2025
332c4d9
Update condition.py
nadzhou Dec 14, 2025
59badc6
Update condition.py
nadzhou Dec 14, 2025
a48d28f
Update condition.py,test_condition.py
nadzhou Dec 14, 2025
d8f98d5
Update condition.py
nadzhou Dec 15, 2025
d2e2af9
Update condition.py,test_condition.py
nadzhou Dec 21, 2025
ea75275
Update
nadzhou Dec 21, 2025
f2773f2
Update condition.py,test_condition.py
nadzhou Dec 21, 2025
2a34eb0
Update condition.py,test_condition.py
nadzhou Dec 21, 2025
b29efe7
Update condition.py,test_condition.py
nadzhou Dec 23, 2025
2b73e57
Update test_condition.py
nadzhou Dec 23, 2025
8ebeb4c
Update condition.py,test_condition.py
nadzhou Dec 27, 2025
98f0c55
Update test_condition.py
nadzhou Dec 27, 2025
0e49188
Update condition.py,test_condition.py
nadzhou Jan 5, 2026
b700092
Merge remote-tracking branch 'upstream/main' into add-condition-primi…
nadzhou Jan 5, 2026
055fb8b
Update condition.py,test_condition.py
nadzhou Jan 5, 2026
b64bc9c
Clean up obsolete pins in CI (#9172)
crusaderky Jan 13, 2026
443c479
Bump JamesIves/github-pages-deploy-action from 4.7.6 to 4.8.0 (#9177)
dependabot[bot] Jan 13, 2026
596283b
Merge commit from fork
jacobtomlinson Jan 16, 2026
8b9a691
Version 2026.1.1
jacobtomlinson Jan 16, 2026
d253ab7
Python 3.14 support (#9169)
crusaderky Jan 20, 2026
984db2b
utils_test.popen: allow avoiding path modification, use sysconfig for…
jameslamb Jan 22, 2026
563cde3
Require PyArrow >=16 (#9185)
crusaderky Jan 29, 2026
f4de852
Version 2026.1.2
TomAugspurger Jan 30, 2026
dc5de7f
Pin sphinx=8 (#9190)
crusaderky Feb 3, 2026
2a24066
Type hints for Future.status (#9188)
nadzhou Feb 3, 2026
11c2851
XFAIL test_handle_null_partitions_2 (#9191)
crusaderky Feb 3, 2026
afd0d53
Update condition.py,test_condition.py
nadzhou Feb 8, 2026
4a6b1f8
Merge branch 'main' into add-condition-primitive
nadzhou Feb 8, 2026
7cb4be0
Update condition.py
nadzhou Feb 8, 2026
52ab4ce
Update condition.py,test_condition.py
nadzhou Feb 8, 2026
938568c
Update condition.py,test_condition.py
nadzhou Feb 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions distributed/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,4 @@
"widgets",
"worker_client",
]
from distributed.condition import Condition
238 changes: 238 additions & 0 deletions distributed/condition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
from __future__ import annotations

import asyncio
import logging
import uuid
from collections import defaultdict
from contextlib import suppress

from distributed.lock import Lock
from distributed.utils import log_errors
from distributed.worker import get_client

logger = logging.getLogger(__name__)


class ConditionExtension:
"""Scheduler extension for managing distributed Conditions.

Waiters are keyed by (name, waiter_id) so multiple waiters from the
same client can coexist without overwriting each other's events.
"""

def __init__(self, scheduler):
self.scheduler = scheduler
# name -> {waiter_id -> asyncio.Event}
self.waiters = defaultdict(dict)
self._closed = False

self.scheduler.handlers.update(
{
"condition_wait": self.wait,
"condition_notify": self.notify,
"condition_notify_all": self.notify_all,
}
)
self.scheduler.extensions["conditions"] = self

async def close(self):
"""Cancel all pending waiters so scheduler can shut down cleanly."""
self._closed = True
for name in list(self.waiters):
for event in self.waiters[name].values():
event.set()
del self.waiters[name]

@log_errors
async def wait(self, name=None, waiter_id=None, **kwargs):
"""Register a waiter and block until notified or closed."""
if self._closed:
return

event = asyncio.Event()
self.waiters[name][waiter_id] = event

try:
await event.wait()
finally:
with suppress(KeyError):
del self.waiters[name][waiter_id]
if not self.waiters[name]:
del self.waiters[name]

async def notify(self, name=None, n=1, **kwargs):
"""Wake up to n waiters."""
if name not in self.waiters:
return

notified = 0
for wid in list(self.waiters[name]):
if notified >= n:
break
event = self.waiters[name].get(wid)
if event and not event.is_set():
event.set()
notified += 1

async def notify_all(self, name=None, **kwargs):
"""Wake all waiters."""
if name not in self.waiters:
return

for event in self.waiters[name].values():
if not event.is_set():
event.set()


class Condition:
"""Distributed Condition Variable

A distributed version of asyncio.Condition. Allows one or more clients
to wait until notified by another client.

Like asyncio.Condition, this must be used with a lock. The lock is
released before waiting and reacquired afterwards.

Parameters
----------
name : str, optional
Name of the condition. If not provided, a random name is generated.
client : Client, optional
Client instance. If not provided, uses the default client.
lock : Lock, optional
Lock to use with this condition. If not provided, creates a new Lock.

Examples
--------
>>> from distributed import Client, Condition
>>> client = Client() # doctest: +SKIP
>>> condition = Condition() # doctest: +SKIP

>>> async with condition: # doctest: +SKIP
... await condition.wait()

>>> async with condition: # doctest: +SKIP
... condition.notify() # Wake one waiter
"""

def __init__(self, name=None, client=None, lock=None):
self._client = client
self.name = name or "condition-" + uuid.uuid4().hex

if lock is None:
lock = Lock()
elif not isinstance(lock, Lock):
raise TypeError(f"lock must be a Lock, not {type(lock)}")

self._lock = lock

@property
def client(self):
if not self._client:
try:
self._client = get_client()
except ValueError:
pass
return self._client

def _verify_running(self):
if not self.client:
raise RuntimeError(
f"{type(self)} object not properly initialized. "
"Ensure it's created within a Client context."
)

async def __aenter__(self):
await self.acquire()
return self

async def __aexit__(self, exc_type, exc, tb):
await self.release()

def __repr__(self):
return f"<Condition: {self.name}>"

async def acquire(self, timeout=None):
self._verify_running()
return await self._lock.acquire(timeout=timeout)

async def release(self):
self._verify_running()
return await self._lock.release()

async def locked(self):
return await self._lock.locked()

async def wait(self, timeout=None):
"""Wait until notified.

Releases the underlying lock, waits until notified, then reacquires
the lock before returning. Must be called with the lock held.

Returns True if woken by notify, False on timeout.
"""
self._verify_running()
await self.release()

# Each wait() call gets a unique ID so the scheduler can track
# multiple waiters from the same client independently.
waiter_id = uuid.uuid4().hex

try:
coro = self.client.scheduler.condition_wait(
name=self.name, waiter_id=waiter_id
)
if timeout is not None:
try:
await asyncio.wait_for(coro, timeout=timeout)
return True
except asyncio.TimeoutError:
return False
else:
await coro
return True
finally:
# Always reacquire lock — mirrors asyncio.Condition semantics
try:
await self.acquire()
except asyncio.CancelledError:
with suppress(Exception):
await asyncio.shield(self.acquire())
raise

async def wait_for(self, predicate, timeout=None):
"""Wait until predicate() returns True.

Returns the predicate result (True unless timeout).
"""
result = predicate()
if result:
return result

if timeout is not None:
import time

deadline = time.monotonic() + timeout
while not result:
remaining = deadline - time.monotonic()
if remaining <= 0:
return predicate()
if not await self.wait(timeout=remaining):
return predicate()
result = predicate()
else:
while not result:
await self.wait()
result = predicate()

return result

async def notify(self, n=1):
"""Wake up n waiters (default: 1)."""
self._verify_running()
await self.client.scheduler.condition_notify(name=self.name, n=n)

async def notify_all(self):
"""Wake up all waiters."""
self._verify_running()
await self.client.scheduler.condition_notify_all(name=self.name)
2 changes: 2 additions & 0 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
)
from distributed.comm.addressing import addresses_from_user_args
from distributed.compatibility import PeriodicCallback
from distributed.condition import ConditionExtension
from distributed.core import (
ErrorMessage,
OKMessage,
Expand Down Expand Up @@ -195,6 +196,7 @@
"semaphores": SemaphoreExtension,
"events": EventExtension,
"amm": ActiveMemoryManagerExtension,
"conditions": ConditionExtension,
"memory_sampler": MemorySamplerExtension,
"shuffle": ShuffleSchedulerPlugin,
"spans": SpansSchedulerExtension,
Expand Down
Loading
Loading