Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
36af69b
Invoke lambda in the main thread and add support for infinite sequences
takahiro-blab Apr 24, 2025
28b74f9
Add test for infinite sequence
takahiro-blab Apr 24, 2025
5d5eb63
Add test for stopping by StopIteration
takahiro-blab Apr 24, 2025
9d9bd11
Add test for lambda called in same thread
takahiro-blab Apr 24, 2025
756685f
Add comment on JobFactory#runloop
takahiro-blab Apr 28, 2025
60ff3ac
Rewrite JobFactory#stopper from endless (one-line) methods to legacy …
takahiro-blab Apr 28, 2025
4823fcc
Extract merged options into a variable before calling Parallel.in_thr…
takahiro-blab Apr 28, 2025
3eda109
Add comment spec/cases/infinite_sequence.rb
takahiro-blab Apr 28, 2025
7e8a77c
Make thread queue handing use Thread#thread_variable_set and #thread_…
takahiro-blab Apr 28, 2025
20cb415
Update Readme.md
takahiro-blab Apr 28, 2025
7e76725
Add comment on JobFactory#enum_wrapper
takahiro-blab Apr 30, 2025
5eea888
Rename JobFactory's some methods and variables.
takahiro-blab May 1, 2025
ec0083b
Fix for support Ruby 3.0 and earlier.
takahiro-blab May 1, 2025
074eab7
Add comment
takahiro-blab May 1, 2025
70208e6
Rename last few runloop leftovers to worker_queue
takahiro-blab May 4, 2025
98a74e4
Make use a mutex and a counter instead of a Queue in Parallel.in_threads
takahiro-blab May 4, 2025
f3a9dd2
Update comment of Parallel.in_threads in lib/parallel.rb
takahiro-blab May 12, 2025
9b9f4bd
Rename `JobFactory#consume_worker_queue` to `consume_worker_queues`
takahiro-blab May 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,22 @@ items = [1,2,3]
Parallel.each( -> { items.pop || Parallel::Stop }) { |number| ... }
```

Iterations can be stopped by raising StopIteration.

```Ruby
items = [1,2,3]
Parallel.each( -> { items.pop || raise(StopIteration) }) { |number| ... }
```

Also supports Enumerator instances as a source.

```Ruby
enumerator = Enumerator.new do |y|
y << 1; y << 2; y << 3
end
Parallel.each( enumerator ) { |number| ... }
```

Also supports `any?` or `all?`

```Ruby
Expand Down
78 changes: 70 additions & 8 deletions lib/parallel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,27 @@ def wait

class JobFactory
def initialize(source, mutex)
@lambda = (source.respond_to?(:call) && source) || queue_wrapper(source)
@source = source.to_a unless @lambda # turn Range and other Enumerable-s into an Array
@lambda = enum_wrapper(source) || (source.respond_to?(:call) && source) || queue_wrapper(source)
@source = source.to_a unless @lambda # turn non-Enumerable-s into an Array
@runloop_queue = Thread::Queue.new if @lambda
@mutex = mutex
@index = -1
@stopped = false
end

def next
if producer?
queue_for_thread = Thread.current.thread_variable_get(:parallel_queue)
if @runloop_queue && queue_for_thread
return if @stopped
item = runloop_enq(queue_for_thread)
return if item == Stop
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is it not like this ?

Suggested change
return if @stopped
item = runloop_enq(queue_for_thread)
return if item == Stop
item = runloop_enq(queue_for_thread)
@stopped = (item == Stop)
return if @stopped

Copy link
Author

@takahiro-blab takahiro-blab Apr 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stopped will be set in JobFactory#runloop .

This JobFactory#next may be called from some threads at the same time.
So your @stopped = (item == Stop) needs exclusive control Mutex#synchronize .

Taking the value out of the @lambda and Setting the result to@stopped must be handled in the critical section or be handled in one thread, I think.
Otherwise, a certain thread may clear @stopped flag. This could be a bug.

So previous implementation of Parallel uses @mutex.synchronize .
This PR's code handles @lambda and check item == Stop in #runloop by one thread, so @mutex is given, but it's not used.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah thx, yeah this is a tricky section :)
can you leave a bit of inline comment for the gotchas

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added comment to this code.

index = @index += 1
elsif producer?
# - index and item stay in sync
# - do not call lambda after it has returned Stop
item, index = @mutex.synchronize do
return if @stopped
item = @lambda.call
item = call_lambda
@stopped = (item == Stop)
return if @stopped
[item, @index += 1]
Expand All @@ -123,6 +130,26 @@ def next
[item, index]
end

def runloop
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would this make sense ?

Suggested change
def runloop
def run_runloop

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some method comments would help here too, what does it do exactly

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. Please feel free to change anything.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def runloop
def consume_enumerator_queues

does this work ?

# consume items for from enumerator queues until they stop producing

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed the comment says "enumerator queues", but as far as I can tell, there's only one queue being consumed from in this method.
Does this enumerator queues mean JobFactory's @lambda, doesn't it?
Should this be singular instead?

return unless @runloop_queue

loop do
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this right ?

Suggested change
loop do
# every time a threads wants to start work, it adds a new queue, we pop the queue here until everything is done (stop)
# then push a new item into the queue for the thread to read and work on
loop do

queue = @runloop_queue.pop
return if queue == Stop
item = call_lambda
queue.push(item)
break if item == Stop
end
@stopped = true
# clear out all work queues by adding a "stop" to them which will stop the thread working on them
begin
while queue = @runloop_queue.pop(true)
queue.push(Stop) if queue != Stop # Unlock waiting threads.
end
rescue ThreadError # All threads are unlocked.
end
end

def size
if producer?
Float::INFINITY
Expand All @@ -142,15 +169,35 @@ def unpack(data)
producer? ? data : [@source[data], data]
end

def stopper
@runloop_queue&.push(Stop)
end

private

def call_lambda
@lambda.call
rescue StopIteration
Stop
end

def runloop_enq(queue_for_thread)
@runloop_queue.push(queue_for_thread)
queue_for_thread.pop # Wait until @lambda returns.
end

def producer?
@lambda
end

def queue_wrapper(array)
array.respond_to?(:num_waiting) && array.respond_to?(:pop) && -> { array.pop(false) }
end

def enum_wrapper(source)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe give some examples of what types this is trying to detect

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are asking about #enum_wrapper , aren't you?

This method aims converting Enumerator instance and objects including Enumerable to Method instance which calls #next , but objects which is accessible by [] method shouldn't be converted. It's because accessing by index is faster and it can avoid serializing problems, you know.

So, as first, checking [] method, if [] method is available, it returns false. Next, if #next method is available, returns Method instance.

For example:

enum_wrapper([1,2,3]) # -> false
enum_wrapper(1..5) # -> Method ( (1..5).method(:next) )
enum_wrapper(Prime.to_enum) # -> Method (See infinite_sequece.rb test case)

# Convert what is inaccessible by the index
!source.respond_to?(:[]) && source.respond_to?(:next) && source.method(:next)
end
end

class UserInterruptHandler
Expand Down Expand Up @@ -211,13 +258,24 @@ def restore_interrupt(old, signal)
class << self
def in_threads(options = { count: 2 })
threads = []
count, = extract_count_from_options(options)
count, options = extract_count_from_options(options)
finished_monitor = options[:runloop] && Queue.new(1..(count - 1)) # Insert values, one less in count than the number of threads.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explain why 1 less

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's because the last thread must raise ThreadError by calling empty queue's #pop method.

For example, if there are 5 worker threads (when option[:count] is 5), Queue finished_monitor will have 4 values.
Each of five worker threads will execute finished_monitor.pop(true), then 4 of them can get value from finished_monitor.
But the last one thread don't get value, and a ThreadError exception will be raised.
So JobFactory#stopper will be called only once.
(I just realized that, perhaps, multiple calls of JobFactory#stopper may has no side effect... If so, these could be written more concisely without using Queue finished_monitor.)

In rescue section, the last one thread will call stopper.call, which stops JobFactory#runloop.

Queue#pop raises ThreadError when true is given as argument and the queue is empty.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah got it, very complicated ... can you leave some short inline comment to explain a bit

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added comment to this code with below question.

stopper = options[:stopper]

Thread.handle_interrupt(Exception => :never) do
Thread.handle_interrupt(Exception => :immediate) do
count.times do |i|
threads << Thread.new { yield(i) }
threads << Thread.new do
yield(i)
ensure
begin
finished_monitor&.pop(true) # This must be executed even if the worker thread is killed (by #work_in_processes).
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explain why it needs to be executed

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is not case, JobFacotry#runloop will block in queue = @runloop_queue.pop and it cannot come back from #runloop.
To main thread's surely finishing options[:runloop]&.call (JobFactory#stopper), each of worker threads must call finished_monitor&.pop(true) even if it is killed. So it's in the ensure section.
(Please look at above question)

(JobFactory#stopper pushes Stop JobFactory's @runloop_queue, so it make #runloop finish.)

And, this logic is also necessary for terminating operations by Ctrl+C or workers' throwing Parallel::Kill.
(When Parallel::Kill or Break is thrown, worker threads will be killed by UserInterruptHandler.kill in #work_in_processes)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx, can you leave a bit of this inline for future archeologists :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added comment with above question.

rescue ThreadError # Queue#pop raises ThreadError when the queue is empty.
stopper&.call # Stop JobFactory#runloop
end
end
end
options[:runloop]&.call # Invoke lambda in caller thread, and provide jobs to thread queue.
threads.map(&:value)
end
ensure
Expand Down Expand Up @@ -431,7 +489,9 @@ def work_in_threads(job_factory, options, &block)
results_mutex = Mutex.new # arrays are not thread-safe on jRuby
exception = nil

in_threads(options) do |worker_num|
thread_options = options.merge(runloop: job_factory.method(:runloop), stopper: job_factory.method(:stopper))
in_threads(thread_options) do |worker_num|
Thread.current.thread_variable_set(:parallel_queue, Thread::Queue.new)
self.worker_number = worker_num
# as long as there are more jobs, work on one of them
while !exception && (set = job_factory.next)
Expand Down Expand Up @@ -523,9 +583,11 @@ def work_in_processes(job_factory, options, &blk)
exception = nil

UserInterruptHandler.kill_on_ctrl_c(workers.map(&:pid), options) do
in_threads(options) do |i|
thread_options = options.merge(runloop: job_factory.method(:runloop), stopper: job_factory.method(:stopper))
in_threads(thread_options) do |i|
worker = workers[i]
worker.thread = Thread.current
Thread.current.thread_variable_set(:parallel_queue, Thread::Queue.new)
worked = false

begin
Expand Down
24 changes: 24 additions & 0 deletions spec/cases/infinite_sequence.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# frozen_string_literal: true

# Reproduction case based on GitHub Issue #211
# Original code provided by @cyclotron3k in the issue
# using a enum that is infinite, so this will hang forever when trying to convert to an array

require 'prime'
require './spec/cases/helper'

private_key = 12344567899

results = []

[{ in_threads: 2 }, { in_threads: 0 }].each do |options|
primes = Prime.to_enum
Parallel.each(primes, options) do |prime|
if private_key % prime == 0
results << prime.to_s
raise Parallel::Break
end
end
end

print results.join(',')
21 changes: 21 additions & 0 deletions spec/cases/lambda_call_same_thread.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# frozen_string_literal: true
require './spec/cases/helper'

runner_thread = nil
all = [3, 2, 1]
my_proc = proc {
runner_thread ||= Thread.current
if Thread.current != runner_thread
raise "proc is called in different thread!"
end

all.pop || Parallel::Stop
}

class Callback
def self.call(x)
$stdout.sync = true
"ITEM-#{x}"
end
end
puts(Parallel.map(my_proc, in_threads: 2) { |(i, _id)| Callback.call i })
21 changes: 21 additions & 0 deletions spec/cases/lambda_can_stop_by_exception.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# frozen_string_literal: true
require './spec/cases/helper'

def generate_proc
count = 0
proc {
raise StopIteration if 3 <= count
count += 1
}
end

class Callback
def self.call(x)
$stdout.sync = true
"ITEM-#{x}"
end
end

[{ in_processes: 2 }, { in_threads: 2 }, { in_threads: 0 }].each do |options|
puts(Parallel.map(generate_proc, options) { |(i, _id)| Callback.call i })
end
12 changes: 12 additions & 0 deletions spec/parallel_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,18 @@ def cpus
end
end

it "can process infinite sequence enumerator" do
ruby("spec/cases/infinite_sequence.rb").split(',').should == ['139'] * 2
end

it "can be finished by lambda raising StopIteration" do
ruby("spec/cases/lambda_can_stop_by_exception.rb").should == "ITEM-1\nITEM-2\nITEM-3\n" * 3
end

it "must call lambda in same thread" do
ruby("spec/cases/lambda_call_same_thread.rb").should == "ITEM-1\nITEM-2\nITEM-3\n"
end

it "fails when running with a prefilled queue without stop since there are no threads to fill it" do
error = (RUBY_VERSION >= "2.0.0" ? "No live threads left. Deadlock?" : "deadlock detected (fatal)")
ruby("spec/cases/fatal_queue.rb 2>&1").should include error
Expand Down
Loading