From 86207228aad80e4ab4a5f0f5bfc363edcce36e27 Mon Sep 17 00:00:00 2001 From: c3y28 Date: Thu, 10 Jun 2021 15:53:31 +0800 Subject: [PATCH 1/2] add error strategy logic for ForEach operator --- observable_operator.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/observable_operator.go b/observable_operator.go index 5277b4a6..c2f4a29c 100644 --- a/observable_operator.go +++ b/observable_operator.go @@ -1153,6 +1153,7 @@ func (o *ObservableImpl) FlatMap(apply ItemToObservable, opts ...Option) Observa // ForEach subscribes to the Observable and receives notifications for each element. func (o *ObservableImpl) ForEach(nextFunc NextFunc, errFunc ErrFunc, completedFunc CompletedFunc, opts ...Option) Disposed { dispose := make(chan struct{}) + option := parseOptions(opts...) handler := func(ctx context.Context, src <-chan Item) { defer close(dispose) for { @@ -1167,7 +1168,9 @@ func (o *ObservableImpl) ForEach(nextFunc NextFunc, errFunc ErrFunc, completedFu } if i.Error() { errFunc(i.E) - break + if option.getErrorStrategy() == StopOnError { + return + } } nextFunc(i.V) } From 5f53c4e62ac17aa1fc871add49ed371d9de95ba4 Mon Sep 17 00:00:00 2001 From: c3y28 Date: Thu, 10 Jun 2021 19:01:19 +0800 Subject: [PATCH 2/2] add error strategy testcases on ForEach operator --- observable_operator_test.go | 86 +++++++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/observable_operator_test.go b/observable_operator_test.go index fa9ac9cb..b365346b 100644 --- a/observable_operator_test.go +++ b/observable_operator_test.go @@ -854,6 +854,92 @@ func Test_Observable_ForEach_Error(t *testing.T) { assert.Equal(t, errFoo, gotErr) } +func Test_Observable_ForEach_ContinueOnError(t *testing.T) { + defer goleak.VerifyNone(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + count := 0 + var gotErr error + done := make(chan struct{}) + + obs := testObservable(ctx, 1, 2, 3, errFoo, 4) + var strategy OnErrorStrategy = ContinueOnError + + obs.ForEach(func(i interface{}) { + switch v := i.(type) { + case int: + count += v + case error: + // do nothing + default: + } + }, func(err error) { + gotErr = err + select { + case <-ctx.Done(): + return + default: + if strategy == StopOnError { + done <- struct{}{} + } + } + }, func() { + select { + case <-ctx.Done(): + return + case done <- struct{}{}: + } + }, WithContext(ctx), WithErrorStrategy(strategy)) + + // We avoid using the assertion API on purpose + <-done + assert.Equal(t, 10, count) + assert.Equal(t, errFoo, gotErr) +} + +func Test_Observable_ForEach_StopOnError(t *testing.T) { + defer goleak.VerifyNone(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + count := 0 + var gotErr error + done := make(chan struct{}) + + obs := testObservable(ctx, 1, 2, 3, errFoo, 4) + var strategy OnErrorStrategy = StopOnError + + obs.ForEach(func(i interface{}) { + switch v := i.(type) { + case int: + count += v + case error: + // do nothing + default: + } + }, func(err error) { + gotErr = err + select { + case <-ctx.Done(): + return + default: + if strategy == StopOnError { + done <- struct{}{} + } + } + }, func() { + select { + case <-ctx.Done(): + return + case done <- struct{}{}: + } + }, WithContext(ctx), WithErrorStrategy(strategy)) + + // We avoid using the assertion API on purpose + <-done + assert.Equal(t, 6, count) + assert.Equal(t, errFoo, gotErr) +} + func Test_Observable_ForEach_Done(t *testing.T) { defer goleak.VerifyNone(t) ctx, cancel := context.WithCancel(context.Background())