Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions spec/std/fiber/execution_context/parallel_spec.cr
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{% skip_file unless flag?(:execution_context) %}
require "spec"
require "wait_group"

describe Fiber::ExecutionContext::Parallel do
it ".new" do
Expand Down Expand Up @@ -41,4 +42,43 @@ describe Fiber::ExecutionContext::Parallel do
Fiber::ExecutionContext::Parallel.new("test", size: 5..1)
end
end

it "#resize" do
ctx = Fiber::ExecutionContext::Parallel.new("ctx", 1)
running = Atomic(Bool).new(true)
wg = WaitGroup.new

10.times do
wg.add(1)

ctx.spawn do
while running.get(:relaxed)
sleep(10.microseconds)
end
ensure
wg.done
end
end

# it grows
ctx.resize(4)
ctx.capacity.should eq(4)

# it shrinks
ctx.resize(2)
ctx.capacity.should eq(2)

# it doesn't change
ctx.resize(2)
ctx.capacity.should eq(2)

10.times do
n = rand(1..4)
ctx.resize(n)
ctx.capacity.should eq(n)
end

running.set(false)
wg.wait
end
end
26 changes: 26 additions & 0 deletions spec/std/fiber/execution_context/runnables_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,32 @@ describe Fiber::ExecutionContext::Runnables do
end
end

describe "#drain" do
it "drains the local queue into the global queue" do
fibers = 6.times.map { |i| new_fake_fiber("f#{i}") }.to_a

# local enqueue + overflow
g = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new)
r = Fiber::ExecutionContext::Runnables(6).new(g)

# empty
r.drain
g.size.should eq(0)

# full
fibers.each { |f| r.push(f) }
r.drain
r.shift?.should be_nil
g.size.should eq(6)

# refill half (1 pop + 2 grab) and drain again
g.unsafe_grab?(r, divisor: 1)
r.drain
r.shift?.should be_nil
g.size.should eq(5)
end
end

describe "#bulk_push" do
it "fills the local queue" do
l = Fiber::List.new
Expand Down
89 changes: 77 additions & 12 deletions src/fiber/execution_context/parallel.cr
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ module Fiber::ExecutionContext

@parked = Atomic(Int32).new(0)
@spinning = Atomic(Int32).new(0)
@capacity : Int32

# :nodoc:
protected def self.default(maximum : Int32) : self
Expand Down Expand Up @@ -102,12 +101,12 @@ module Fiber::ExecutionContext
@condition = Thread::ConditionVariable.new

@global_queue = GlobalQueue.new(@mutex)
@schedulers = Array(Scheduler).new(@capacity)
@threads = Array(Thread).new(@capacity)
@schedulers = Array(Scheduler).new(capacity)
@threads = Array(Thread).new(capacity)

@rng = Random::PCG32.new

start_schedulers
start_schedulers(capacity)
@threads << hijack_current_thread(@schedulers.first) if hijack

ExecutionContext.execution_contexts.push(self)
Expand All @@ -120,7 +119,7 @@ module Fiber::ExecutionContext

# The maximum number of threads that can be started.
def capacity : Int32
@capacity
@schedulers.size
end

# :nodoc:
Expand All @@ -140,7 +139,7 @@ module Fiber::ExecutionContext
# OPTIMIZE: consider storing schedulers to an array-like object that would
# use an atomic/fence to make sure that @size can only be incremented
# *after* the value has been written to @buffer.
private def start_schedulers
private def start_schedulers(capacity)
capacity.times do |index|
@schedulers << Scheduler.new(self, "#{@name}-#{index}")
end
Expand Down Expand Up @@ -176,6 +175,71 @@ module Fiber::ExecutionContext
end
end

# Resizes the context to the new *maximum* parallelism.
#
# The new *maximum* can grow, in which case more schedulers are created to
# eventually increase the parallelism.
#
# The new *maximum* can also shrink, in which case the overflow schedulers
# are removed and told to shutdown immediately. The actual shutdown is
# cooperative, so running schedulers won't stop until their current fiber
# tries to switch to another fiber.
def resize(maximum : Int32) : Nil
raise ArgumentError.new("Parallelism can't be less than one.") if maximum < 1
removed_schedulers = nil

@mutex.synchronize do
# can run in parallel to #steal that dereferences @schedulers (once)
# without locking the mutex, so we dup the schedulers, mutate the copy,
# and eventually assign the copy as @schedulers; this way #steal can
# safely access the array (never mutated).
new_capacity = maximum
old_threads = @threads
old_schedulers = @schedulers
old_capacity = capacity

if new_capacity > old_capacity
@schedulers = Array(Scheduler).new(new_capacity) do |index|
old_schedulers[index]? || Scheduler.new(self, "#{@name}-#{index}")
end
threads = Array(Thread).new(new_capacity)
old_threads.each { |thread| threads << thread }
@threads = threads
elsif new_capacity < old_capacity
# tell the overflow schedulers to shutdown
removed_schedulers = old_schedulers[new_capacity..]
removed_schedulers.each(&.shutdown!)

# resize
@schedulers = old_schedulers[0...new_capacity]
@threads = old_threads[0...new_capacity]

# reset @parked counter (we wake all parked threads) so they can
# shutdown (if told to):
woken_threads = @parked.get(:relaxed)
@parked.set(0, :relaxed)

# update @spinning prior to unpark threads; we use acquire release
# semantics to make sure that all the above stores are visible before
# the following wakeup calls (maybe not needed, but let's err on the
# safe side)
@spinning.add(woken_threads, :acquire_release)

# wake every waiting thread:
@condition.broadcast
@event_loop.interrupt
end
end

return unless removed_schedulers

# drain the local queues of removed schedulers since they're no longer
# available for stealing
removed_schedulers.each do |scheduler|
[email protected]
end
end

# :nodoc:
def spawn(*, name : String? = nil, same_thread : Bool, &block : ->) : Fiber
raise ArgumentError.new("#{self.class.name}#spawn doesn't support same_thread:true") if same_thread
Expand All @@ -200,11 +264,12 @@ module Fiber::ExecutionContext
protected def steal(& : Scheduler ->) : Nil
return if capacity == 1

schedulers = @schedulers
i = @rng.next_int
n = @schedulers.size
n = schedulers.size

n.times do |j|
if scheduler = @schedulers[(i &+ j) % n]?
if scheduler = schedulers[(i &+ j) % n]?
yield scheduler
end
end
Expand Down Expand Up @@ -271,8 +336,8 @@ module Fiber::ExecutionContext
# we must also decrement the number of parked threads because another
# thread could lock the mutex and increment @spinning again before the
# signaled thread is resumed
spinning = @spinning.add(1, :acquire_release)
parked = @parked.sub(1, :acquire_release)
@spinning.add(1, :acquire_release)
@parked.sub(1, :acquire_release)

@condition.signal
end
Expand All @@ -282,11 +347,11 @@ module Fiber::ExecutionContext
# check if we can start another thread; no need for atomics, the values
# shall be rather stable over time and we check them again inside the
# mutex
return if @threads.size == capacity
return if @threads.size >= capacity

@mutex.synchronize do
index = @threads.size
return if index == capacity # check again
return if index >= capacity # check again

@threads << start_thread(@schedulers[index])
end
Expand Down
28 changes: 26 additions & 2 deletions src/fiber/execution_context/parallel/scheduler.cr
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,18 @@ module Fiber::ExecutionContext
@spinning = false
@waiting = false
@parked = false
@shutdown = false

protected def initialize(@execution_context, @name)
@global_queue = @execution_context.global_queue
@runnables = Runnables(256).new(@global_queue)
@event_loop = @execution_context.event_loop
end

protected def shutdown! : Nil
@shutdown = true
end

# :nodoc:
def spawn(*, name : String? = nil, same_thread : Bool, &block : ->) : Fiber
raise RuntimeError.new("#{self.class.name}#spawn doesn't support same_thread:true") if same_thread
Expand Down Expand Up @@ -86,6 +91,8 @@ module Fiber::ExecutionContext
end

private def quick_dequeue? : Fiber?
return if @shutdown

# every once in a while: dequeue from global queue to avoid two fibers
# constantly respawing each other to completely occupy the local queue
if (@tick &+= 1) % 61 == 0
Expand Down Expand Up @@ -121,8 +128,21 @@ module Fiber::ExecutionContext
Crystal.trace :sched, "started"

loop do
if @shutdown
spin_stop
@runnables.drain

# we may have been the last running scheduler, waiting on the event
# loop while there are pending events for example; let's resume a
# scheduler to take our place
@execution_context.wake_scheduler

Crystal.trace :sched, "shutdown"
break
end

if fiber = find_next_runnable
spin_stop if @spinning
spin_stop
resume fiber
else
# the event loop enqueued a fiber (or was interrupted) or the
Expand All @@ -145,6 +165,8 @@ module Fiber::ExecutionContext

# nothing to do: start spinning
spinning do
return if @shutdown

yield @global_queue.grab?(@runnables, divisor: @execution_context.size)

if @execution_context.lock_evloop? { @event_loop.run(pointerof(list), blocking: false) }
Expand Down Expand Up @@ -189,10 +211,12 @@ module Fiber::ExecutionContext
# loop: park the thread until another scheduler or another context
# enqueues a fiber
@execution_context.park_thread do
# don't park the thread when told to shutdown
return if @shutdown

# by the time we acquire the lock, another thread may have enqueued
# fiber(s) and already tried to wakeup a thread (race) so we must
# check again; we don't check the scheduler's local queue (it's empty)

yield @global_queue.unsafe_grab?(@runnables, divisor: @execution_context.size)
yield try_steal?

Expand Down
51 changes: 40 additions & 11 deletions src/fiber/execution_context/runnables.cr
Original file line number Diff line number Diff line change
Expand Up @@ -75,25 +75,54 @@ module Fiber::ExecutionContext

# first, try to grab half of the fibers from local queue
batch = uninitialized Fiber[N] # actually N // 2 + 1 but that doesn't compile
n.times do |i|
batch.to_unsafe[i] = @buffer.to_unsafe[(head &+ i) % N]
end
_, success = @head.compare_and_set(head, head &+ n, :acquire_release, :acquire)
_, success = try_grab(batch.to_unsafe, head, n)
return false unless success

# append fiber to the batch
# append fiber to the batch and push to global queue
batch.to_unsafe[n] = fiber
push_to_global_queue(batch.to_unsafe, n &+ 1)
true
end

# link the fibers
# Transfers every fiber in the local runnables queue to the global queue.
# This will grab the global lock.
#
# Can be executed by any scheduler.
def drain : Nil
batch = uninitialized Fiber[N]
n = 0

head = @head.get(:acquire) # sync with other consumers
loop do
tail = @tail.get(:acquire) # sync with the producer

n = (tail &- head)
return if n == 0 # queue is empty

# try to grab everything from local queue
head, success = try_grab(batch.to_unsafe, head, n)
break if success
end

push_to_global_queue(batch.to_unsafe, n)
end

private def try_grab(batch, head, n)
n.times do |i|
batch.to_unsafe[i].list_next = batch.to_unsafe[i &+ 1]
batch[i] = @buffer.to_unsafe[(head &+ i) % N]
end
list = Fiber::List.new(batch.to_unsafe[0], batch.to_unsafe[n], size: (n &+ 1).to_i32)
@head.compare_and_set(head, head &+ n, :acquire_release, :acquire)
end

# now put the batch on global queue (grabs the global lock)
@global_queue.bulk_push(pointerof(list))
private def push_to_global_queue(batch, n)
# link the fibers
(n &- 1).times do |i|
batch[i].list_next = batch[i &+ 1]
end
list = Fiber::List.new(batch[0], batch[n &- 1], size: n.to_i32)

true
# and put the batch on global queue (grabs the global lock)
@global_queue.bulk_push(pointerof(list))
end

# Tries to enqueue all the fibers in *list* into the local queue. If the
Expand Down