Skip to content
36 changes: 20 additions & 16 deletions lib/octocatalog-diff/util/parallel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -129,22 +129,26 @@ def self.run_tasks_parallel(result, task_array, logger)

# Waiting for children and handling results
while pidmap.any?
this_pid, exit_obj = Process.wait2(0)
next unless this_pid && pidmap.key?(this_pid)
index = pidmap[this_pid][:index]
exitstatus = exit_obj.exitstatus
raise "PID=#{this_pid} exited abnormally: #{exit_obj.inspect}" if exitstatus.nil?
raise "PID=#{this_pid} exited with status #{exitstatus}" unless exitstatus.zero?

input = File.read(File.join(ipc_tempdir, "#{this_pid}.dat"))
result[index] = Marshal.load(input) # rubocop:disable Security/MarshalLoad
time_delta = Time.now - pidmap[this_pid][:start_time]
pidmap.delete(this_pid)

logger.debug "PID=#{this_pid} completed in #{time_delta} seconds, #{input.length} bytes"

next if result[index].status
return result[index].exception
pidmap.each do |pid|
status = Process.waitpid2(pid[0], Process::WNOHANG)
next if status.nil?
this_pid, exit_obj = status
next unless this_pid && pidmap.key?(this_pid)
index = pidmap[this_pid][:index]
exitstatus = exit_obj.exitstatus
raise "PID=#{this_pid} exited abnormally: #{exit_obj.inspect}" if exitstatus.nil?
raise "PID=#{this_pid} exited with status #{exitstatus}" unless exitstatus.zero?

input = File.read(File.join(ipc_tempdir, "#{this_pid}.dat"))
result[index] = Marshal.load(input) # rubocop:disable Security/MarshalLoad
time_delta = Time.now - pidmap[this_pid][:start_time]
pidmap.delete(this_pid)

logger.debug "PID=#{this_pid} completed in #{time_delta} seconds, #{input.length} bytes"

next if result[index].status
return result[index].exception
end
end

logger.debug 'All child processes completed with no exceptions raised'
Expand Down
59 changes: 59 additions & 0 deletions spec/octocatalog-diff/tests/util/parallel_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,65 @@
end

context 'with parallel processing' do
it 'should only Process.wait() its own children' do
class Foo
def one(arg, _logger = nil)
'one ' + arg
end

def two(arg, _logger = nil)
'two ' + arg
end

def dont_wait_me_bro(sleep_for = 1)
# do we need a rescue block here?
pid = fork do
sleep sleep_for
Kernel.exit! 0 # Kernel.exit! avoids at_exit from parents being triggered by children exiting
end
pid
end

def wait_on_me(pid)
status = nil
# just in case status never equals anything
count = 100
while status.nil? || count > 0
count -= 1
status = Process.waitpid2(pid, Process::WNOHANG)
end
status
end
end

c = Foo.new
# start my non-parallel process first
just_a_guy = c.dont_wait_me_bro

one = OctocatalogDiff::Util::Parallel::Task.new(method: c.method(:one), args: 'abc', description: 'test1')
two = OctocatalogDiff::Util::Parallel::Task.new(method: c.method(:two), args: 'def', description: 'test2')
result = OctocatalogDiff::Util::Parallel.run_tasks([one, two], nil, true)
expect(result).to be_a_kind_of(Array)
expect(result.size).to eq(2)

one_result = result[0]
expect(one_result).to be_a_kind_of(OctocatalogDiff::Util::Parallel::Result)
expect(one_result.status).to eq(true)
expect(one_result.exception).to eq(nil)
expect(one_result.output).to match(/^one abc/)

two_result = result[1]
expect(two_result).to be_a_kind_of(OctocatalogDiff::Util::Parallel::Result)
expect(two_result.status).to eq(true)
expect(two_result.exception).to eq(nil)
expect(two_result.output).to match(/^two def/)

# just_a_guy should still be need to be waited
result = c.wait_on_me(just_a_guy)
expect(result).to be_a_kind_of(Array)
# test result and check for error conditions
end

it 'should parallelize and return task results' do
class Foo
def one(arg, _logger = nil)
Expand Down