Skip to content

Commit 742d422

Browse files
committed
fix leaks
1 parent eb347a2 commit 742d422

File tree

1 file changed

+20
-13
lines changed

1 file changed

+20
-13
lines changed

diff_parallel.go

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -154,14 +154,12 @@ func newDiffScheduler(ctx context.Context, numWorkers int64, rootTasks ...*task)
154154
grp, ctx := errgroup.WithContext(ctx)
155155
s := &diffScheduler{
156156
numWorkers: numWorkers,
157-
stack: []*task{},
157+
stack: rootTasks,
158158
in: make(chan *task, numWorkers),
159159
out: make(chan *task, numWorkers),
160-
workerWg: &sync.WaitGroup{},
161160
grp: grp,
162161
}
163-
s.workerWg.Add(len(rootTasks))
164-
s.stack = append(s.stack, rootTasks...)
162+
s.taskWg.Add(len(rootTasks))
165163
return s, ctx
166164
}
167165

@@ -170,26 +168,35 @@ type diffScheduler struct {
170168
numWorkers int64
171169
// buffer holds tasks until they are processed
172170
stack []*task
173-
// tasks arrive here
174-
in chan *task
175-
// completed tasks exit here
176-
out chan *task
171+
// inbound and outbound tasks
172+
in, out chan *task
177173
// tracks number of inflight tasks
178-
workerWg *sync.WaitGroup
174+
taskWg sync.WaitGroup
179175
// launches workers and collects errors if any occur
180176
grp *errgroup.Group
181177
}
182178

183179
func (s *diffScheduler) enqueueTask(task *task) {
184-
s.workerWg.Add(1)
180+
s.taskWg.Add(1)
185181
s.in <- task
186182
}
187183

188184
func (s *diffScheduler) startScheduler(ctx context.Context) {
189185
s.grp.Go(func() error {
190-
defer close(s.out)
186+
defer func() {
187+
close(s.out)
188+
// Because the workers may have exited early (due to the context being canceled).
189+
for range s.out {
190+
s.taskWg.Done()
191+
}
192+
// Because the workers may have enqueued additional tasks.
193+
for range s.in {
194+
s.taskWg.Done()
195+
}
196+
// now, the waitgroup should be at 0, and the goroutine that was _waiting_ on it should have exited.
197+
}()
191198
go func() {
192-
s.workerWg.Wait()
199+
s.taskWg.Wait()
193200
close(s.in)
194201
}()
195202
for {
@@ -235,7 +242,7 @@ func (s *diffScheduler) startWorkers(ctx context.Context, out chan *Change) {
235242
}
236243

237244
func (s *diffScheduler) work(ctx context.Context, todo *task, results chan *Change) error {
238-
defer s.workerWg.Done()
245+
defer s.taskWg.Done()
239246

240247
prev := todo.prev
241248
prevCtx := todo.prevCtx

0 commit comments

Comments
 (0)