Skip to content

Commit 13bef97

Browse files
committed
Remove idle and saturated sets from scheduler
1 parent 39f91e3 commit 13bef97

File tree

4 files changed

+93
-239
lines changed

4 files changed

+93
-239
lines changed

distributed/scheduler.py

Lines changed: 7 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -1770,11 +1770,8 @@ def __init__(
17701770
self.clients["fire-and-forget"] = ClientState("fire-and-forget")
17711771
self.extensions = {}
17721772
self.host_info = host_info
1773-
self.idle = SortedDict()
1774-
self.idle_task_count = set()
17751773
self.n_tasks = 0
17761774
self.resources = resources
1777-
self.saturated = set()
17781775
self.tasks = tasks
17791776
self.replicated_tasks = {
17801777
ts for ts in self.tasks.values() if len(ts.who_has or ()) > 1
@@ -1865,7 +1862,6 @@ def __pdict__(self) -> dict[str, Any]:
18651862
return {
18661863
"bandwidth": self.bandwidth,
18671864
"resources": self.resources,
1868-
"saturated": self.saturated,
18691865
"unrunnable": self.unrunnable,
18701866
"queued": self.queued,
18711867
"n_tasks": self.n_tasks,
@@ -1879,7 +1875,6 @@ def __pdict__(self) -> dict[str, Any]:
18791875
"extensions": self.extensions,
18801876
"clients": self.clients,
18811877
"workers": self.workers,
1882-
"idle": self.idle,
18831878
"host_info": self.host_info,
18841879
}
18851880

@@ -2310,7 +2305,7 @@ def decide_worker_rootish_queuing_disabled(
23102305
# See root-ish-ness note below in `decide_worker_rootish_queuing_enabled`
23112306
assert math.isinf(self.WORKER_SATURATION) or not ts._queueable
23122307

2313-
pool = self.idle.values() if self.idle else self.running
2308+
pool = self.running
23142309
if not pool:
23152310
return None
23162311

@@ -2375,22 +2370,16 @@ def decide_worker_rootish_queuing_enabled(self) -> WorkerState | None:
23752370
# then add that assertion here (and actually pass in the task).
23762371
assert not math.isinf(self.WORKER_SATURATION)
23772372

2378-
if not self.idle_task_count:
2379-
# All workers busy? Task gets/stays queued.
2373+
if not self.running:
23802374
return None
23812375

23822376
# Just pick the least busy worker.
23832377
# NOTE: this will lead to worst-case scheduling with regards to co-assignment.
2384-
ws = min(
2385-
self.idle_task_count,
2386-
key=lambda ws: len(ws.processing) / ws.nthreads,
2387-
)
2378+
ws = min(self.running, key=lambda ws: len(ws.processing) / ws.nthreads)
2379+
if _worker_full(ws, self.WORKER_SATURATION):
2380+
return None
23882381
if self.validate:
23892382
assert self.workers.get(ws.address) is ws
2390-
assert not _worker_full(ws, self.WORKER_SATURATION), (
2391-
ws,
2392-
_task_slots_available(ws, self.WORKER_SATURATION),
2393-
)
23942383
assert ws in self.running, (ws, self.running)
23952384

23962385
return ws
@@ -2434,7 +2423,7 @@ def decide_worker_non_rootish(self, ts: TaskState) -> WorkerState | None:
24342423
# dependencies, but its group is also smaller than the cluster.
24352424

24362425
# Fastpath when there are no related tasks or restrictions
2437-
worker_pool = self.idle or self.workers
2426+
worker_pool = self.workers
24382427
# FIXME idle and workers are SortedDict's declared as dicts
24392428
# because sortedcontainers is not annotated
24402429
wp_vals = cast("Sequence[WorkerState]", worker_pool.values())
@@ -2927,7 +2916,6 @@ def _transition_waiting_queued(self, key: Key, stimulus_id: str) -> RecsMsgs:
29272916
ts = self.tasks[key]
29282917

29292918
if self.validate:
2930-
assert not self.idle_task_count, (ts, self.idle_task_count)
29312919
self._validate_ready(ts)
29322920

29332921
ts.state = "queued"
@@ -3158,63 +3146,6 @@ def is_rootish(self, ts: TaskState) -> bool:
31583146
and sum(map(len, tg.dependencies)) < self.rootish_tg_dependencies_threshold
31593147
)
31603148

3161-
def check_idle_saturated(self, ws: WorkerState, occ: float = -1.0) -> None:
3162-
"""Update the status of the idle and saturated state
3163-
3164-
The scheduler keeps track of workers that are ..
3165-
3166-
- Saturated: have enough work to stay busy
3167-
- Idle: do not have enough work to stay busy
3168-
3169-
They are considered saturated if they both have enough tasks to occupy
3170-
all of their threads, and if the expected runtime of those tasks is
3171-
large enough.
3172-
3173-
If ``distributed.scheduler.worker-saturation`` is not ``inf``
3174-
(scheduler-side queuing is enabled), they are considered idle
3175-
if they have fewer tasks processing than the ``worker-saturation``
3176-
threshold dictates.
3177-
3178-
Otherwise, they are considered idle if they have fewer tasks processing
3179-
than threads, or if their tasks' total expected runtime is less than half
3180-
the expected runtime of the same number of average tasks.
3181-
3182-
This is useful for load balancing and adaptivity.
3183-
"""
3184-
if self.total_nthreads == 0 or ws.status == Status.closed:
3185-
return
3186-
if occ < 0:
3187-
occ = ws.occupancy
3188-
3189-
p = len(ws.processing)
3190-
3191-
self.saturated.discard(ws)
3192-
if ws.status != Status.running:
3193-
self.idle.pop(ws.address, None)
3194-
elif self.is_unoccupied(ws, occ, p):
3195-
self.idle[ws.address] = ws
3196-
else:
3197-
self.idle.pop(ws.address, None)
3198-
nc = ws.nthreads
3199-
if p > nc:
3200-
pending = occ * (p - nc) / (p * nc)
3201-
if 0.4 < pending > 1.9 * (self.total_occupancy / self.total_nthreads):
3202-
self.saturated.add(ws)
3203-
3204-
if not _worker_full(ws, self.WORKER_SATURATION) and ws.status == Status.running:
3205-
self.idle_task_count.add(ws)
3206-
else:
3207-
self.idle_task_count.discard(ws)
3208-
3209-
def is_unoccupied(
3210-
self, ws: WorkerState, occupancy: float, nprocessing: int
3211-
) -> bool:
3212-
nthreads = ws.nthreads
3213-
return (
3214-
nprocessing < nthreads
3215-
or occupancy < nthreads * (self.total_occupancy / self.total_nthreads) / 2
3216-
)
3217-
32183149
def get_comm_cost(self, ts: TaskState, ws: WorkerState) -> float:
32193150
"""
32203151
Get the estimated communication cost (in s.) to compute the task
@@ -3402,7 +3333,6 @@ def _add_to_processing(
34023333
ts.processing_on = ws
34033334
ts.state = "processing"
34043335
self.acquire_resources(ts, ws)
3405-
self.check_idle_saturated(ws)
34063336
self.n_tasks += 1
34073337

34083338
if ts.actor:
@@ -3468,7 +3398,6 @@ def _exit_processing_common(self, ts: TaskState) -> WorkerState | None:
34683398
if self.workers.get(ws.address) is not ws: # may have been removed
34693399
return None
34703400

3471-
self.check_idle_saturated(ws)
34723401
self.release_resources(ts, ws)
34733402

34743403
return ws
@@ -4606,10 +4535,6 @@ async def add_worker(
46064535
metrics=metrics,
46074536
)
46084537

4609-
# Do not need to adjust self.total_occupancy as self.occupancy[ws] cannot
4610-
# exist before this.
4611-
self.check_idle_saturated(ws)
4612-
46134538
self.stream_comms[address] = BatchedSend(interval="5ms", loop=self.loop)
46144539

46154540
awaitables = []
@@ -5227,13 +5152,11 @@ def stimulus_queue_slots_maybe_opened(self, *, stimulus_id: str) -> None:
52275152
so any tasks that became runnable are already in ``processing``. Otherwise,
52285153
overproduction can occur if queued tasks get scheduled before downstream tasks.
52295154
5230-
Must be called after `check_idle_saturated`; i.e. `idle_task_count` must be up to date.
52315155
"""
52325156
if not self.queued:
52335157
return
52345158
slots_available = sum(
5235-
_task_slots_available(ws, self.WORKER_SATURATION)
5236-
for ws in self.idle_task_count
5159+
_task_slots_available(ws, self.WORKER_SATURATION) for ws in self.running
52375160
)
52385161
if slots_available == 0:
52395162
return
@@ -5466,9 +5389,6 @@ async def remove_worker(
54665389
self.rpc.remove(address)
54675390
del self.stream_comms[address]
54685391
del self.aliases[ws.name]
5469-
self.idle.pop(ws.address, None)
5470-
self.idle_task_count.discard(ws)
5471-
self.saturated.discard(ws)
54725392
del self.workers[address]
54735393
self._workers_removed_total += 1
54745394
ws.status = Status.closed
@@ -5818,23 +5738,6 @@ def validate_state(self, allow_overlap: bool = False) -> None:
58185738
if not (set(self.workers) == set(self.stream_comms)):
58195739
raise ValueError("Workers not the same in all collections")
58205740

5821-
assert self.running.issuperset(self.idle.values()), (
5822-
self.running.copy(),
5823-
set(self.idle.values()),
5824-
)
5825-
assert self.running.issuperset(self.idle_task_count), (
5826-
self.running.copy(),
5827-
self.idle_task_count.copy(),
5828-
)
5829-
assert self.running.issuperset(self.saturated), (
5830-
self.running.copy(),
5831-
self.saturated.copy(),
5832-
)
5833-
assert self.saturated.isdisjoint(self.idle.values()), (
5834-
self.saturated.copy(),
5835-
set(self.idle.values()),
5836-
)
5837-
58385741
task_prefix_counts: defaultdict[str, int] = defaultdict(int)
58395742
for w, ws in self.workers.items():
58405743
assert isinstance(w, str), (type(w), w)
@@ -5845,14 +5748,10 @@ def validate_state(self, allow_overlap: bool = False) -> None:
58455748
assert ws in self.running
58465749
else:
58475750
assert ws not in self.running
5848-
assert ws.address not in self.idle
5849-
assert ws not in self.saturated
58505751

58515752
assert ws.long_running.issubset(ws.processing)
58525753
if not ws.processing:
58535754
assert not ws.occupancy
5854-
if ws.status == Status.running:
5855-
assert ws.address in self.idle
58565755
assert not ws.needs_what.keys() & ws.has_what
58575756
actual_needs_what: defaultdict[TaskState, int] = defaultdict(int)
58585757
for ts in ws.processing:
@@ -6136,7 +6035,6 @@ def handle_long_running(
61366035
ts.prefix.duration_average = (old_duration + compute_duration) / 2
61376036

61386037
ws.add_to_long_running(ts)
6139-
self.check_idle_saturated(ws)
61406038

61416039
self.stimulus_queue_slots_maybe_opened(stimulus_id=stimulus_id)
61426040

@@ -6164,16 +6062,12 @@ def handle_worker_status_change(
61646062

61656063
if ws.status == Status.running:
61666064
self.running.add(ws)
6167-
self.check_idle_saturated(ws)
61686065
self.transitions(
61696066
self.bulk_schedule_unrunnable_after_adding_worker(ws), stimulus_id
61706067
)
61716068
self.stimulus_queue_slots_maybe_opened(stimulus_id=stimulus_id)
61726069
else:
61736070
self.running.discard(ws)
6174-
self.idle.pop(ws.address, None)
6175-
self.idle_task_count.discard(ws)
6176-
self.saturated.discard(ws)
61776071
self._refresh_no_workers_since()
61786072

61796073
def handle_request_refresh_who_has(

0 commit comments

Comments
 (0)