-
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Add Fiber::ExecutionContext::Parallel#resize
#15956
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
straight-shoota
merged 15 commits into
crystal-lang:master
from
ysbaddaden:feature/add-resize-to-execution-context-parallel
Sep 10, 2025
Merged
Changes from 5 commits
Commits
Show all changes
15 commits
Select commit
Hold shift + click to select a range
654c74b
Add Runnables#drain to move all fibers to the global queue
ysbaddaden a1362ee
Capacity is actually schedulers.size
ysbaddaden 67696dd
Method to shutdown scheduler now
ysbaddaden 1a56f90
Add Parallel#resize(maximum)
ysbaddaden a7e38fa
Fix: raise if new capacity is less than 1
ysbaddaden dfd818c
Update src/fiber/execution_context/runnables.cr
ysbaddaden e7f4cec
Fix: must reset `@parked` before waking all parked threads
ysbaddaden 8898811
Replace Shutdown::NO with an invalid Shutdown value + check for expli…
ysbaddaden 3b8bc2f
Fix: abort spinning if shutdown now
ysbaddaden 3d09820
Fix: drop Shutdown enum and use a boolean for now
ysbaddaden 04085f8
Fix: issues around spinning threads count
ysbaddaden 0ad4cc8
Fix: couple ameba issues
ysbaddaden 93df822
Rename Parallel::Scheduler#shutdown!
ysbaddaden 89abdf4
Spec for Fiber::ExecutionContext::Parallel#resize
ysbaddaden 25e9e11
Spec: faster sleep/wakeup
ysbaddaden File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -69,7 +69,6 @@ module Fiber::ExecutionContext | |
|
|
||
| @parked = Atomic(Int32).new(0) | ||
| @spinning = Atomic(Int32).new(0) | ||
| @capacity : Int32 | ||
|
|
||
| # :nodoc: | ||
| protected def self.default(maximum : Int32) : self | ||
|
|
@@ -102,12 +101,12 @@ module Fiber::ExecutionContext | |
| @condition = Thread::ConditionVariable.new | ||
|
|
||
| @global_queue = GlobalQueue.new(@mutex) | ||
| @schedulers = Array(Scheduler).new(@capacity) | ||
| @threads = Array(Thread).new(@capacity) | ||
| @schedulers = Array(Scheduler).new(capacity) | ||
| @threads = Array(Thread).new(capacity) | ||
|
|
||
| @rng = Random::PCG32.new | ||
|
|
||
| start_schedulers | ||
| start_schedulers(capacity) | ||
| @threads << hijack_current_thread(@schedulers.first) if hijack | ||
|
|
||
| ExecutionContext.execution_contexts.push(self) | ||
|
|
@@ -120,7 +119,7 @@ module Fiber::ExecutionContext | |
|
|
||
| # The maximum number of threads that can be started. | ||
| def capacity : Int32 | ||
| @capacity | ||
| @schedulers.size | ||
| end | ||
|
|
||
| # :nodoc: | ||
|
|
@@ -140,7 +139,7 @@ module Fiber::ExecutionContext | |
| # OPTIMIZE: consider storing schedulers to an array-like object that would | ||
| # use an atomic/fence to make sure that @size can only be incremented | ||
| # *after* the value has been written to @buffer. | ||
| private def start_schedulers | ||
| private def start_schedulers(capacity) | ||
| capacity.times do |index| | ||
| @schedulers << Scheduler.new(self, "#{@name}-#{index}") | ||
| end | ||
|
|
@@ -176,6 +175,66 @@ module Fiber::ExecutionContext | |
| end | ||
| end | ||
|
|
||
| # Resizes the context to the new *maximum* parallelism. | ||
| # | ||
| # The new *maximum* can grow, in which case more schedulers are created to | ||
| # eventually increase the parallelism. | ||
| # | ||
| # The new *maximum* can also shrink, in which case the overflow schedulers | ||
| # are removed and told to shutdown immediately. The actual shutdown is | ||
| # cooperative, so running schedulers won't stop until their current fiber | ||
| # tries to switch to another fiber. | ||
| def resize(maximum : Int32) : Nil | ||
| raise ArgumentError.new("Parallelism can't be less than one.") if maximum < 1 | ||
| removed_schedulers = nil | ||
|
|
||
| @mutex.synchronize do | ||
| # can run in parallel to #steal that dereferences @schedulers (once) | ||
| # without locking the mutex, so we dup the schedulers, mutate the copy, | ||
| # and eventually assign the copy as @schedulers; this way #steal can | ||
| # safely access the array (never mutated). | ||
| new_schedulers = nil | ||
| new_capacity = maximum | ||
| old_threads = @threads | ||
| old_schedulers = @schedulers | ||
| old_capacity = capacity | ||
|
|
||
| if new_capacity > old_capacity | ||
| @schedulers = Array(Scheduler).new(new_capacity) do |index| | ||
| old_schedulers[index]? || Scheduler.new(self, "#{@name}-#{index}") | ||
| end | ||
| threads = Array(Thread).new(new_capacity) | ||
| old_threads.each { |thread| threads << thread } | ||
| @threads = threads | ||
| elsif new_capacity < old_capacity | ||
| # tell the overflow schedulers to shutdown | ||
| removed_schedulers = old_schedulers[new_capacity..] | ||
| removed_schedulers.each(&.shutdown(:now)) | ||
|
|
||
| # resize | ||
| @schedulers = old_schedulers[0...new_capacity] | ||
| @threads = old_threads[0...new_capacity] | ||
|
|
||
| # makes sure that the above writes to @schedulers and @threads are | ||
| # executed before continuing (maybe not needed, but let's err on the | ||
| # safe side) | ||
| Atomic.fence(:acquire_release) | ||
|
|
||
| # wakeup all waiting schedulers so they can shutdown | ||
| @condition.broadcast | ||
| @event_loop.interrupt | ||
| end | ||
| end | ||
|
|
||
| return unless removed_schedulers | ||
|
|
||
| # drain the local queues of removed schedulers since they're no longer | ||
| # available for stealing | ||
| removed_schedulers.each do |scheduler| | ||
| [email protected] | ||
| end | ||
| end | ||
|
|
||
| # :nodoc: | ||
| def spawn(*, name : String? = nil, same_thread : Bool, &block : ->) : Fiber | ||
| raise ArgumentError.new("#{self.class.name}#spawn doesn't support same_thread:true") if same_thread | ||
|
|
@@ -200,11 +259,12 @@ module Fiber::ExecutionContext | |
| protected def steal(& : Scheduler ->) : Nil | ||
| return if capacity == 1 | ||
|
|
||
| schedulers = @schedulers | ||
| i = @rng.next_int | ||
| n = @schedulers.size | ||
| n = schedulers.size | ||
|
|
||
| n.times do |j| | ||
| if scheduler = @schedulers[(i &+ j) % n]? | ||
| if scheduler = schedulers[(i &+ j) % n]? | ||
| yield scheduler | ||
| end | ||
| end | ||
|
|
@@ -282,11 +342,11 @@ module Fiber::ExecutionContext | |
| # check if we can start another thread; no need for atomics, the values | ||
| # shall be rather stable over time and we check them again inside the | ||
| # mutex | ||
| return if @threads.size == capacity | ||
| return if @threads.size >= capacity | ||
|
|
||
| @mutex.synchronize do | ||
| index = @threads.size | ||
| return if index == capacity # check again | ||
| return if index >= capacity # check again | ||
|
|
||
| @threads << start_thread(@schedulers[index]) | ||
| end | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.