Skip to content

Commit 71efcdb

Browse files
committed
Add Parallel#resize(maximum)
Allows to grow and shrink a Parallel context. Takes care to replace `@schedulers` instead of mutating it, and to modify #steal to read it once, then always refer to the same array (never mutated) so we can safely steal without acquiring the mutex.
1 parent 3345cca commit 71efcdb

File tree

1 file changed

+60
-4
lines changed

1 file changed

+60
-4
lines changed

src/fiber/execution_context/parallel.cr

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,61 @@ module Fiber::ExecutionContext
164164
end
165165
end
166166

167+
# Resizes the context to the new *maximum* parallelism.
168+
#
169+
# The new *maximum* can grow, in which case more schedulers are started to
170+
# eventually increase the parallelism.
171+
#
172+
# The new *maximum* can also shrink, in which case the overflow schedulers
173+
# are removed and told to shutdown immediately. The actual shutdown is
174+
# cooperative, so running schedulers won't stop until their current fiber
175+
# tries to switch to another fiber.
176+
def resize(maximum : Int32) : Nil
177+
maximum = maximum.clamp(1..) # FIXME: raise if maximum < 1
178+
removed_schedulers = nil
179+
180+
@mutex.synchronize do
181+
# can run in parallel to #steal that dereferences @schedulers (once)
182+
# without locking the mutex, so we dup the schedulers, mutate the copy,
183+
# and eventually assign the copy as @schedulers; this way #steal should
184+
# only ever saw a complete and valid array.
185+
new_schedulers = nil
186+
new_capacity = maximum
187+
old_threads = @threads
188+
old_schedulers = @schedulers
189+
old_capacity = capacity
190+
191+
if new_capacity > old_capacity
192+
@schedulers = Array(Scheduler).new(new_capacity) do |index|
193+
old_schedulers[index]? || Scheduler.new(self, "#{@name}-#{index}")
194+
end
195+
threads = Array(Thread).new(new_capacity)
196+
old_threads.each { |thread| threads << thread }
197+
@threads = threads
198+
elsif new_capacity < old_capacity
199+
# tell the overflow schedulers to shutdown
200+
removed_schedulers = old_schedulers[new_capacity..]
201+
removed_schedulers.each(&.shutdown(:now))
202+
203+
# resize
204+
@schedulers = old_schedulers[0...new_capacity]
205+
@threads = old_threads[0...new_capacity]
206+
207+
# wakeup all waiting schedulers so they can shutdown
208+
@condition.broadcast
209+
@event_loop.interrupt
210+
end
211+
end
212+
213+
return unless removed_schedulers
214+
215+
# drain the local queues of removed schedulers because they're no longer
216+
# available for stealing
217+
removed_schedulers.each do |scheduler|
218+
scheduler.@runnables.drain
219+
end
220+
end
221+
167222
# :nodoc:
168223
def spawn(*, name : String? = nil, same_thread : Bool, &block : ->) : Fiber
169224
raise ArgumentError.new("#{self.class.name}#spawn doesn't support same_thread:true") if same_thread
@@ -188,11 +243,12 @@ module Fiber::ExecutionContext
188243
protected def steal(& : Scheduler ->) : Nil
189244
return if capacity == 1
190245

246+
schedulers = @schedulers
191247
i = @rng.next_int
192-
n = @schedulers.size
248+
n = schedulers.size
193249

194250
n.times do |j|
195-
if scheduler = @schedulers[(i &+ j) % n]?
251+
if scheduler = schedulers[(i &+ j) % n]?
196252
yield scheduler
197253
end
198254
end
@@ -270,11 +326,11 @@ module Fiber::ExecutionContext
270326
# check if we can start another thread; no need for atomics, the values
271327
# shall be rather stable over time and we check them again inside the
272328
# mutex
273-
return if @threads.size == capacity
329+
return if @threads.size >= capacity
274330

275331
@mutex.synchronize do
276332
index = @threads.size
277-
return if index == capacity # check again
333+
return if index >= capacity # check again
278334

279335
@threads << start_thread(@schedulers[index])
280336
end

0 commit comments

Comments
 (0)