Skip to content

Commit b4b7d17

Browse files
Allow executing effects that have become unsubscribed to be canceled by listenerMiddleware.clearListeners (#5102)
* Add tests showing that clearListeners doesn't cancel running, unsubscribed listeners * Track executing listeners to allow cancelation for unsubscribed listeners
1 parent 95eda57 commit b4b7d17

File tree

2 files changed

+158
-3
lines changed

2 files changed

+158
-3
lines changed

packages/toolkit/src/listenerMiddleware/index.ts

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -262,10 +262,12 @@ const cancelActiveListeners = (
262262

263263
const createClearListenerMiddleware = (
264264
listenerMap: Map<string, ListenerEntry>,
265+
executingListeners: Map<ListenerEntry, number>,
265266
) => {
266267
return () => {
267-
listenerMap.forEach(cancelActiveListeners)
268-
268+
for (const listener of executingListeners.keys()) {
269+
cancelActiveListeners(listener);
270+
}
269271
listenerMap.clear()
270272
}
271273
}
@@ -339,6 +341,23 @@ export const createListenerMiddleware = <
339341
middlewareOptions: CreateListenerMiddlewareOptions<ExtraArgument> = {},
340342
) => {
341343
const listenerMap = new Map<string, ListenerEntry>()
344+
345+
// Track listeners whose effect is currently executing so clearListeners can
346+
// abort even listeners that have become unsubscribed while executing.
347+
const executingListeners = new Map<ListenerEntry, number>()
348+
const trackExecutingListener = (entry: ListenerEntry) => {
349+
const count = executingListeners.get(entry) ?? 0
350+
executingListeners.set(entry, count + 1)
351+
}
352+
const untrackExecutingListener = (entry: ListenerEntry) => {
353+
const count = executingListeners.get(entry) ?? 1
354+
if (count === 1) {
355+
executingListeners.delete(entry)
356+
} else {
357+
executingListeners.set(entry, count - 1)
358+
}
359+
}
360+
342361
const { extra, onError = defaultErrorHandler } = middlewareOptions
343362

344363
assertFunction(onError, 'onError')
@@ -401,6 +420,7 @@ export const createListenerMiddleware = <
401420

402421
try {
403422
entry.pending.add(internalTaskController)
423+
trackExecutingListener(entry)
404424
await Promise.resolve(
405425
entry.effect(
406426
action,
@@ -452,11 +472,15 @@ export const createListenerMiddleware = <
452472
await Promise.all(autoJoinPromises)
453473

454474
abortControllerWithReason(internalTaskController, listenerCompleted) // Notify that the task has completed
475+
untrackExecutingListener(entry)
455476
entry.pending.delete(internalTaskController)
456477
}
457478
}
458479

459-
const clearListenerMiddleware = createClearListenerMiddleware(listenerMap)
480+
const clearListenerMiddleware = createClearListenerMiddleware(
481+
listenerMap,
482+
executingListeners,
483+
)
460484

461485
const middleware: ListenerMiddleware<
462486
StateType,

packages/toolkit/src/listenerMiddleware/tests/effectScenarios.test.ts

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,4 +364,135 @@ describe('Saga-style Effects Scenarios', () => {
364364

365365
expect(canceledCheck).toBe(true)
366366
})
367+
368+
test('long-running listener with immediate unsubscribe is cancelable', async () => {
369+
let runCount = 0
370+
let abortCount = 0
371+
372+
startListening({
373+
actionCreator: increment,
374+
effect: async (action, listenerApi) => {
375+
runCount++
376+
377+
// Stop listening for this action
378+
listenerApi.unsubscribe()
379+
380+
try {
381+
// Wait indefinitely
382+
await listenerApi.condition(() => false)
383+
} catch (err) {
384+
if (err instanceof TaskAbortError) {
385+
abortCount++
386+
}
387+
}
388+
},
389+
})
390+
391+
// First action starts the listener, which unsubscribes
392+
store.dispatch(increment())
393+
expect(runCount).toBe(1)
394+
395+
// Verify that the first action unsubscribed the listener
396+
store.dispatch(increment())
397+
expect(runCount).toBe(1)
398+
399+
// Now call clearListeners, which should abort the running effect, even
400+
// though the listener is no longer subscribed
401+
listenerMiddleware.clearListeners()
402+
await delay(0)
403+
404+
expect(abortCount).toBe(1)
405+
})
406+
407+
test('long-running listener with unsubscribe race is cancelable', async () => {
408+
let runCount = 0
409+
let abortCount = 0
410+
411+
startListening({
412+
actionCreator: increment,
413+
effect: async (action, listenerApi) => {
414+
runCount++
415+
416+
if (runCount === 2) {
417+
// On the second run, stop listening for this action
418+
listenerApi.unsubscribe()
419+
return
420+
}
421+
422+
try {
423+
// Wait indefinitely
424+
await listenerApi.condition(() => false)
425+
} catch (err) {
426+
if (err instanceof TaskAbortError) {
427+
abortCount++
428+
}
429+
}
430+
},
431+
})
432+
433+
// First action starts the hanging effect
434+
store.dispatch(increment())
435+
expect(runCount).toBe(1)
436+
437+
// Second action starts the fast effect, which unsubscribes
438+
store.dispatch(increment())
439+
expect(runCount).toBe(2)
440+
441+
// Third action should be a noop
442+
store.dispatch(increment())
443+
expect(runCount).toBe(2)
444+
445+
// The hanging effect should still be hanging
446+
expect(abortCount).toBe(0)
447+
448+
// Now call clearListeners, which should abort the hanging effect, even
449+
// though the listener is no longer subscribed
450+
listenerMiddleware.clearListeners()
451+
await delay(0)
452+
453+
expect(abortCount).toBe(1)
454+
})
455+
456+
test('long-running listener with immediate unsubscribe and forked child is cancelable', async () => {
457+
let outerAborted = false
458+
let innerAborted = false
459+
460+
startListening({
461+
actionCreator: increment,
462+
effect: async (action, listenerApi) => {
463+
// Stop listening for this action
464+
listenerApi.unsubscribe()
465+
466+
const pollingTask = listenerApi.fork(async (forkApi) => {
467+
try {
468+
// Cancellation-aware indefinite pause
469+
await forkApi.pause(new Promise(() => {}))
470+
} catch (err) {
471+
if (err instanceof TaskAbortError) {
472+
innerAborted = true
473+
}
474+
}
475+
})
476+
477+
try {
478+
// Wait indefinitely
479+
await listenerApi.condition(() => false)
480+
pollingTask.cancel()
481+
} catch (err) {
482+
if (err instanceof TaskAbortError) {
483+
outerAborted = true
484+
}
485+
}
486+
},
487+
})
488+
489+
store.dispatch(increment())
490+
await delay(0)
491+
492+
listenerMiddleware.clearListeners()
493+
await delay(0)
494+
495+
expect(outerAborted).toBe(true)
496+
expect(innerAborted).toBe(true)
497+
})
367498
})

0 commit comments

Comments
 (0)