diff --git a/lib/octocatalog-diff/util/parallel.rb b/lib/octocatalog-diff/util/parallel.rb index 36e7ba58..4f4197ed 100644 --- a/lib/octocatalog-diff/util/parallel.rb +++ b/lib/octocatalog-diff/util/parallel.rb @@ -129,22 +129,26 @@ def self.run_tasks_parallel(result, task_array, logger) # Waiting for children and handling results while pidmap.any? - this_pid, exit_obj = Process.wait2(0) - next unless this_pid && pidmap.key?(this_pid) - index = pidmap[this_pid][:index] - exitstatus = exit_obj.exitstatus - raise "PID=#{this_pid} exited abnormally: #{exit_obj.inspect}" if exitstatus.nil? - raise "PID=#{this_pid} exited with status #{exitstatus}" unless exitstatus.zero? - - input = File.read(File.join(ipc_tempdir, "#{this_pid}.dat")) - result[index] = Marshal.load(input) # rubocop:disable Security/MarshalLoad - time_delta = Time.now - pidmap[this_pid][:start_time] - pidmap.delete(this_pid) - - logger.debug "PID=#{this_pid} completed in #{time_delta} seconds, #{input.length} bytes" - - next if result[index].status - return result[index].exception + pidmap.each do |pid| + status = Process.waitpid2(pid[0], Process::WNOHANG) + next if status.nil? + this_pid, exit_obj = status + next unless this_pid && pidmap.key?(this_pid) + index = pidmap[this_pid][:index] + exitstatus = exit_obj.exitstatus + raise "PID=#{this_pid} exited abnormally: #{exit_obj.inspect}" if exitstatus.nil? + raise "PID=#{this_pid} exited with status #{exitstatus}" unless exitstatus.zero? + + input = File.read(File.join(ipc_tempdir, "#{this_pid}.dat")) + result[index] = Marshal.load(input) # rubocop:disable Security/MarshalLoad + time_delta = Time.now - pidmap[this_pid][:start_time] + pidmap.delete(this_pid) + + logger.debug "PID=#{this_pid} completed in #{time_delta} seconds, #{input.length} bytes" + + next if result[index].status + return result[index].exception + end end logger.debug 'All child processes completed with no exceptions raised' diff --git a/spec/octocatalog-diff/tests/util/parallel_spec.rb b/spec/octocatalog-diff/tests/util/parallel_spec.rb index 82ba232d..3cb2a8b7 100644 --- a/spec/octocatalog-diff/tests/util/parallel_spec.rb +++ b/spec/octocatalog-diff/tests/util/parallel_spec.rb @@ -16,6 +16,65 @@ end context 'with parallel processing' do + it 'should only Process.wait() its own children' do + class Foo + def one(arg, _logger = nil) + 'one ' + arg + end + + def two(arg, _logger = nil) + 'two ' + arg + end + + def dont_wait_me_bro(sleep_for = 1) + # do we need a rescue block here? + pid = fork do + sleep sleep_for + Kernel.exit! 0 # Kernel.exit! avoids at_exit from parents being triggered by children exiting + end + pid + end + + def wait_on_me(pid) + status = nil + # just in case status never equals anything + count = 100 + while status.nil? || count > 0 + count -= 1 + status = Process.waitpid2(pid, Process::WNOHANG) + end + status + end + end + + c = Foo.new + # start my non-parallel process first + just_a_guy = c.dont_wait_me_bro + + one = OctocatalogDiff::Util::Parallel::Task.new(method: c.method(:one), args: 'abc', description: 'test1') + two = OctocatalogDiff::Util::Parallel::Task.new(method: c.method(:two), args: 'def', description: 'test2') + result = OctocatalogDiff::Util::Parallel.run_tasks([one, two], nil, true) + expect(result).to be_a_kind_of(Array) + expect(result.size).to eq(2) + + one_result = result[0] + expect(one_result).to be_a_kind_of(OctocatalogDiff::Util::Parallel::Result) + expect(one_result.status).to eq(true) + expect(one_result.exception).to eq(nil) + expect(one_result.output).to match(/^one abc/) + + two_result = result[1] + expect(two_result).to be_a_kind_of(OctocatalogDiff::Util::Parallel::Result) + expect(two_result.status).to eq(true) + expect(two_result.exception).to eq(nil) + expect(two_result.output).to match(/^two def/) + + # just_a_guy should still be need to be waited + result = c.wait_on_me(just_a_guy) + expect(result).to be_a_kind_of(Array) + # test result and check for error conditions + end + it 'should parallelize and return task results' do class Foo def one(arg, _logger = nil)