Skip to content

Commit 6c4ca08

Browse files
committed
Ensure we don't leak any child tasks in client loop.
1 parent e369f19 commit 6c4ca08

File tree

3 files changed

+42
-3
lines changed

3 files changed

+42
-3
lines changed

fixtures/async/container/supervisor/a_server.rb

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,16 @@ def around
5858
let(:monitors) {[registration_monitor]}
5959
let(:server) {Async::Container::Supervisor::Server.new(endpoint: @bound_endpoint, monitors: monitors)}
6060

61+
def restart_supervisor
62+
Console.info(self, "Supervisor stopping...")
63+
@server_task&.stop
64+
65+
Console.info(self, "Supervisor starting....")
66+
@server_task = reactor.async do
67+
server.run
68+
end
69+
end
70+
6171
before do
6272
@bound_endpoint = endpoint.bound
6373

lib/async/container/supervisor/client.rb

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,11 @@ def connect
4848

4949
# Run the client in a loop, reconnecting if necessary.
5050
def run
51-
Async(annotation: "Supervisor Client", transient: true) do
51+
Async(annotation: "Supervisor Client", transient: true) do |task|
5252
loop do
5353
connection = connect!
5454

55-
Async do
55+
connected_task = task.async do
5656
connected!(connection)
5757
end
5858

@@ -61,6 +61,10 @@ def run
6161
Console.error(self, "Connection failed:", exception: error)
6262
sleep(rand)
6363
ensure
64+
# Ensure any tasks that were created during connection are stopped:
65+
connected_task&.stop
66+
67+
# Close the connection itself:
6468
connection&.close
6569
end
6670
end

test/async/container/client.rb

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
describe Async::Container::Supervisor::Client do
1010
include Async::Container::Supervisor::AServer
11-
include Sus::Fixtures::Console::CapturedLogger
11+
# include Sus::Fixtures::Console::CapturedLogger
1212

1313
let(:client) {subject.new(endpoint: endpoint)}
1414

@@ -32,5 +32,30 @@
3232

3333
client_task.stop
3434
end
35+
36+
it "does not leak fibers when connected! creates tasks and reconnection occurs" do
37+
state = Thread::Queue.new
38+
39+
mock(client) do |mock|
40+
mock.replace(:connected!) do
41+
state << :connected
42+
43+
Async do
44+
sleep
45+
ensure
46+
state << :disconnected
47+
end
48+
end
49+
end
50+
51+
client_task = client.run
52+
expect(state.pop).to be == :connected
53+
54+
# Interrupt the supervisor:
55+
restart_supervisor
56+
57+
expect(state.pop).to be == :disconnected
58+
expect(state.pop).to be == :connected
59+
end
3560
end
3661
end

0 commit comments

Comments
 (0)