From 21bbd0cbc82b90e03c4b0baf36e0b7579b11335b Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Mon, 29 Mar 2021 18:31:35 +0200 Subject: [PATCH 1/7] rpc: tighter shutdown synchronization in client subscription This fixes a rare issue where the client subscription forwarding loop would attempt send on the subscription's channel after Unsubscribe has returned, leading to a panic if the subscription channel was already closed by the user. Example: sub, _ := client.Subscribe(..., channel, ...) sub.Unsubscribe() close(channel) The race occurred because Unsubscribe called quitWithServer to tell the forwarding loop to stop sending, but did not wait for the loop to actually stop. This is fixed by adding an additional channel to track the shutdown, on which Unsubscribe now waits. --- rpc/handler.go | 4 +- rpc/subscription.go | 115 ++++++++++++++++++++++++++++++-------------- 2 files changed, 80 insertions(+), 39 deletions(-) diff --git a/rpc/handler.go b/rpc/handler.go index 23023eaca1f..85f774a1b7f 100644 --- a/rpc/handler.go +++ b/rpc/handler.go @@ -189,7 +189,7 @@ func (h *handler) cancelAllRequests(err error, inflightReq *requestOp) { } for id, sub := range h.clientSubs { delete(h.clientSubs, id) - sub.quitWithError(false, err) + sub.close(err) } } @@ -281,7 +281,7 @@ func (h *handler) handleResponse(msg *jsonrpcMessage) { return } if op.err = json.Unmarshal(msg.Result, &op.sub.subid); op.err == nil { - go op.sub.start() + go op.sub.run() h.clientSubs[op.sub.subid] = op.sub } } diff --git a/rpc/subscription.go b/rpc/subscription.go index 233215d792f..ee6ac57acfa 100644 --- a/rpc/subscription.go +++ b/rpc/subscription.go @@ -208,23 +208,34 @@ type ClientSubscription struct { channel reflect.Value namespace string subid string - in chan json.RawMessage - quitOnce sync.Once // ensures quit is closed once - quit chan struct{} // quit is closed when the subscription exits - errOnce sync.Once // ensures err is closed once - err chan error + // The in channel receives notification values from client dispatcher. + in chan json.RawMessage + + // The error channel receives the error from the forwarding loop. + // It is closed by Unsubscribe. + err chan error + errOnce sync.Once + + // Closing of the subscription is requested by sending on 'quit'. This is handled by + // the forwarding loop, which closes 'forwardDone' when it has stopped sending to + // sub.channel. Finally, 'unsubDone' is closed after unsubscribing on the server side. + quit chan error + forwardDone chan struct{} + unsubDone chan struct{} } func newClientSubscription(c *Client, namespace string, channel reflect.Value) *ClientSubscription { sub := &ClientSubscription{ - client: c, - namespace: namespace, - etype: channel.Type().Elem(), - channel: channel, - quit: make(chan struct{}), - err: make(chan error, 1), - in: make(chan json.RawMessage), + client: c, + namespace: namespace, + etype: channel.Type().Elem(), + channel: channel, + in: make(chan json.RawMessage), + quit: make(chan error), + forwardDone: make(chan struct{}), + unsubDone: make(chan struct{}), + err: make(chan error, 1), } return sub } @@ -232,9 +243,9 @@ func newClientSubscription(c *Client, namespace string, channel reflect.Value) * // Err returns the subscription error channel. The intended use of Err is to schedule // resubscription when the client connection is closed unexpectedly. // -// The error channel receives a value when the subscription has ended due -// to an error. The received error is nil if Close has been called -// on the underlying client and no other error has occurred. +// The error channel receives a value when the subscription has ended due to an error. The +// received error is nil if Close has been called on the underlying client and no other +// error has occurred. // // The error channel is closed when Unsubscribe is called on the subscription. func (sub *ClientSubscription) Err() <-chan error { @@ -244,41 +255,64 @@ func (sub *ClientSubscription) Err() <-chan error { // Unsubscribe unsubscribes the notification and closes the error channel. // It can safely be called more than once. func (sub *ClientSubscription) Unsubscribe() { - sub.quitWithError(true, nil) - sub.errOnce.Do(func() { close(sub.err) }) -} - -func (sub *ClientSubscription) quitWithError(unsubscribeServer bool, err error) { - sub.quitOnce.Do(func() { - // The dispatch loop won't be able to execute the unsubscribe call - // if it is blocked on deliver. Close sub.quit first because it - // unblocks deliver. - close(sub.quit) - if unsubscribeServer { - sub.requestUnsubscribe() - } - if err != nil { - if err == ErrClientQuit { - err = nil // Adhere to subscription semantics. - } - sub.err <- err + sub.errOnce.Do(func() { + select { + case sub.quit <- nil: + <-sub.unsubDone + case <-sub.unsubDone: } + close(sub.err) }) } +// deliver is called by the client's message dispatcher to send a notification value. func (sub *ClientSubscription) deliver(result json.RawMessage) (ok bool) { select { case sub.in <- result: return true - case <-sub.quit: + case <-sub.forwardDone: return false } } -func (sub *ClientSubscription) start() { - sub.quitWithError(sub.forward()) +// close is called by the client's message dispatcher when the connection is closed. +func (sub *ClientSubscription) close(err error) { + select { + case sub.quit <- err: + case <-sub.forwardDone: + } +} + +// run is the forwarding loop of the subscription. It runs in its own goroutine and +// is launched by the client's handler after the subscription has been created. +func (sub *ClientSubscription) run() { + defer close(sub.unsubDone) + + unsubscribe, err := sub.forward() + + // The client's dispatch loop won't be able to execute the unsubscribe call if it is + // blocked in sub.deliver() or sub.close(). Closing forwardDone unblocks them. + close(sub.forwardDone) + + // Call the unsubscribe method on the server. + if unsubscribe { + sub.requestUnsubscribe() + } + + // Send the error. + if err != nil { + if err == ErrClientQuit { + // ErrClientQuit gets here when Client.Close is called. This is reported as a + // nil error because it's not an error, but we can't close sub.err here, only + // Unsubscribe can. + err = nil + } + sub.err <- err + } } +// forward is the forwarding loop. It takes in RPC notifications and sends them +// on the subscription channel. func (sub *ClientSubscription) forward() (unsubscribeServer bool, err error) { cases := []reflect.SelectCase{ {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)}, @@ -287,6 +321,7 @@ func (sub *ClientSubscription) forward() (unsubscribeServer bool, err error) { } buffer := list.New() defer buffer.Init() + for { var chosen int var recv reflect.Value @@ -301,7 +336,12 @@ func (sub *ClientSubscription) forward() (unsubscribeServer bool, err error) { switch chosen { case 0: // <-sub.quit - return false, nil + var err error + if !recv.IsNil() { + err = recv.Interface().(error) + } + return false, err + case 1: // <-sub.in val, err := sub.unmarshal(recv.Interface().(json.RawMessage)) if err != nil { @@ -311,6 +351,7 @@ func (sub *ClientSubscription) forward() (unsubscribeServer bool, err error) { return true, ErrSubscriptionQueueOverflow } buffer.PushBack(val) + case 2: // sub.channel<- cases[2].Send = reflect.Value{} // Don't hold onto the value. buffer.Remove(buffer.Front()) From 7bb723a46a0b9c23ba3d67367c36b40b8ae88419 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Mon, 29 Mar 2021 18:38:21 +0200 Subject: [PATCH 2/7] rpc: add test for closing subscription channel after Unsubscribe --- rpc/client_test.go | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/rpc/client_test.go b/rpc/client_test.go index 5d301a07a77..d95fb617940 100644 --- a/rpc/client_test.go +++ b/rpc/client_test.go @@ -376,6 +376,34 @@ func TestClientCloseUnsubscribeRace(t *testing.T) { } } +// This checks that the subscribed channel can be closed after Unsubscribe. +// It is the reproducer for https://github.com/ethereum/go-ethereum/issues/22322 +func TestClientSubscriptionChannelClose(t *testing.T) { + t.Parallel() + + var ( + srv = NewServer() + service = new(notificationTestService) + httpsrv = httptest.NewServer(srv.WebsocketHandler(nil)) + wsURL = "ws:" + strings.TrimPrefix(httpsrv.URL, "http:") + ) + defer srv.Stop() + defer httpsrv.Close() + + srv.RegisterName("nftest", service) + client, _ := Dial(wsURL) + + for i := 0; i < 100; i++ { + ch := make(chan int, 100) + sub, err := client.Subscribe(context.Background(), "nftest", ch, "someSubscription", maxClientSubscriptionBuffer-1, 1) + if err != nil { + t.Fatal(err) + } + sub.Unsubscribe() + close(ch) + } +} + // This test checks that Client doesn't lock up when a single subscriber // doesn't read subscription events. func TestClientNotificationStorm(t *testing.T) { From cce2dac5c37b3e5c31e872f530b75d8bb34d806a Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Mon, 29 Mar 2021 18:38:41 +0200 Subject: [PATCH 3/7] rpc: add missing Unsubscribe call in websocket test --- rpc/websocket_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/rpc/websocket_test.go b/rpc/websocket_test.go index 37ed19476f1..9b08ddebe04 100644 --- a/rpc/websocket_test.go +++ b/rpc/websocket_test.go @@ -134,6 +134,7 @@ func TestClientWebsocketPing(t *testing.T) { if err != nil { t.Fatalf("client subscribe error: %v", err) } + defer sub.Unsubscribe() // Wait for the context's deadline to be reached before proceeding. // This is important for reproducing https://github.com/ethereum/go-ethereum/issues/19798 From 59893beb85db64efb95f197fe81c89d0833e7df7 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Mon, 29 Mar 2021 18:39:11 +0200 Subject: [PATCH 4/7] rpc: remove stray Init call Not sure why this was there, but it's useless: when forward exits, the list is no longer referenced and doesn't need to be cleared. --- rpc/subscription.go | 1 - 1 file changed, 1 deletion(-) diff --git a/rpc/subscription.go b/rpc/subscription.go index ee6ac57acfa..8d35466e42f 100644 --- a/rpc/subscription.go +++ b/rpc/subscription.go @@ -320,7 +320,6 @@ func (sub *ClientSubscription) forward() (unsubscribeServer bool, err error) { {Dir: reflect.SelectSend, Chan: sub.channel}, } buffer := list.New() - defer buffer.Init() for { var chosen int From 1c549e2c2ed1201e095b3db6d3002d831ccfd43d Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 30 Mar 2021 15:13:47 +0200 Subject: [PATCH 5/7] rpc: remove Unsubscribe in websocket test and explain why it's not there The mockup server in this test can't handle the method call. --- rpc/websocket_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/rpc/websocket_test.go b/rpc/websocket_test.go index 9b08ddebe04..4976853baf8 100644 --- a/rpc/websocket_test.go +++ b/rpc/websocket_test.go @@ -129,12 +129,15 @@ func TestClientWebsocketPing(t *testing.T) { if err != nil { t.Fatalf("client dial error: %v", err) } + defer client.Close() + resultChan := make(chan int) sub, err := client.EthSubscribe(ctx, resultChan, "foo") if err != nil { t.Fatalf("client subscribe error: %v", err) } - defer sub.Unsubscribe() + // Note: Unsubscribe is not called on this subscription because the mockup + // server can't handle the request. // Wait for the context's deadline to be reached before proceeding. // This is important for reproducing https://github.com/ethereum/go-ethereum/issues/19798 From a8169a081e1abb3e3b7280d7827e1acdd8dd6b58 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 30 Mar 2021 15:17:29 +0200 Subject: [PATCH 6/7] rpc: call unsubscribe on server when Unsubscribe called on client sub --- rpc/client_test.go | 65 +++++++++++++++++++++++++++++++++++++++++++-- rpc/subscription.go | 13 ++++++--- 2 files changed, 72 insertions(+), 6 deletions(-) diff --git a/rpc/client_test.go b/rpc/client_test.go index d95fb617940..4e05d896692 100644 --- a/rpc/client_test.go +++ b/rpc/client_test.go @@ -18,6 +18,7 @@ package rpc import ( "context" + "encoding/json" "fmt" "math/rand" "net" @@ -376,6 +377,67 @@ func TestClientCloseUnsubscribeRace(t *testing.T) { } } +// unsubscribeRecorder collects the subscription IDs of *_unsubscribe calls. +type unsubscribeRecorder struct { + ServerCodec + unsubscribes map[string]bool +} + +func (r *unsubscribeRecorder) readBatch() ([]*jsonrpcMessage, bool, error) { + if r.unsubscribes == nil { + r.unsubscribes = make(map[string]bool) + } + + msgs, batch, err := r.ServerCodec.readBatch() + for _, msg := range msgs { + if msg.isUnsubscribe() { + var params []string + if err := json.Unmarshal(msg.Params, ¶ms); err != nil { + panic("unsubscribe decode error: " + err.Error()) + } + r.unsubscribes[params[0]] = true + } + } + return msgs, batch, err +} + +// This checks that Client calls the _unsubscribe method on the server when Unsubscribe is +// called on a subscription. +func TestClientSubscriptionUnsubscribeServer(t *testing.T) { + t.Parallel() + + // Create the server. + srv := NewServer() + srv.RegisterName("nftest", new(notificationTestService)) + p1, p2 := net.Pipe() + recorder := &unsubscribeRecorder{ServerCodec: NewCodec(p1)} + go srv.ServeCodec(recorder, OptionMethodInvocation|OptionSubscriptions) + defer srv.Stop() + + // Create the client on the other end of the pipe. + client, _ := newClient(context.Background(), func(context.Context) (ServerCodec, error) { + return NewCodec(p2), nil + }) + defer client.Close() + + // Create the subscription. + ch := make(chan int) + sub, err := client.Subscribe(context.Background(), "nftest", ch, "someSubscription", 1, 1) + if err != nil { + t.Fatal(err) + } + + // Unsubscribe and check that unsubscribe was called. + sub.Unsubscribe() + if !recorder.unsubscribes[sub.subid] { + t.Fatal("client did not call unsubscribe method") + } + err, open := <-sub.Err() + if open { + t.Fatal("subscription error channel not closed after unsubscribe") + } +} + // This checks that the subscribed channel can be closed after Unsubscribe. // It is the reproducer for https://github.com/ethereum/go-ethereum/issues/22322 func TestClientSubscriptionChannelClose(t *testing.T) { @@ -383,14 +445,13 @@ func TestClientSubscriptionChannelClose(t *testing.T) { var ( srv = NewServer() - service = new(notificationTestService) httpsrv = httptest.NewServer(srv.WebsocketHandler(nil)) wsURL = "ws:" + strings.TrimPrefix(httpsrv.URL, "http:") ) defer srv.Stop() defer httpsrv.Close() - srv.RegisterName("nftest", service) + srv.RegisterName("nftest", new(notificationTestService)) client, _ := Dial(wsURL) for i := 0; i < 100; i++ { diff --git a/rpc/subscription.go b/rpc/subscription.go index 8d35466e42f..942e764e5d6 100644 --- a/rpc/subscription.go +++ b/rpc/subscription.go @@ -225,6 +225,9 @@ type ClientSubscription struct { unsubDone chan struct{} } +// This is the sentinel value sent on sub.quit when Unsubscribe is called. +var errUnsubscribed = errors.New("unsubscribed") + func newClientSubscription(c *Client, namespace string, channel reflect.Value) *ClientSubscription { sub := &ClientSubscription{ client: c, @@ -257,7 +260,7 @@ func (sub *ClientSubscription) Err() <-chan error { func (sub *ClientSubscription) Unsubscribe() { sub.errOnce.Do(func() { select { - case sub.quit <- nil: + case sub.quit <- errUnsubscribed: <-sub.unsubDone case <-sub.unsubDone: } @@ -303,8 +306,7 @@ func (sub *ClientSubscription) run() { if err != nil { if err == ErrClientQuit { // ErrClientQuit gets here when Client.Close is called. This is reported as a - // nil error because it's not an error, but we can't close sub.err here, only - // Unsubscribe can. + // nil error because it's not an error, but we can't close sub.err here. err = nil } sub.err <- err @@ -335,10 +337,13 @@ func (sub *ClientSubscription) forward() (unsubscribeServer bool, err error) { switch chosen { case 0: // <-sub.quit - var err error if !recv.IsNil() { err = recv.Interface().(error) } + if err == errUnsubscribed { + // Exiting because Unsubscribe was called, unsubscribe on server. + return true, nil + } return false, err case 1: // <-sub.in From f5d11f819c547d5ef6eb0a2434bc802b1cea70d8 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 30 Mar 2021 15:33:38 +0200 Subject: [PATCH 7/7] rpc: shorten check for closed channel --- rpc/client_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rpc/client_test.go b/rpc/client_test.go index 4e05d896692..224eb0c5c82 100644 --- a/rpc/client_test.go +++ b/rpc/client_test.go @@ -432,8 +432,7 @@ func TestClientSubscriptionUnsubscribeServer(t *testing.T) { if !recorder.unsubscribes[sub.subid] { t.Fatal("client did not call unsubscribe method") } - err, open := <-sub.Err() - if open { + if _, open := <-sub.Err(); open { t.Fatal("subscription error channel not closed after unsubscribe") } }