Skip to content
This repository was archived by the owner on Oct 11, 2024. It is now read-only.

Commit 0a3f365

Browse files
committed
Fix failing tests
1 parent 87692b0 commit 0a3f365

File tree

5 files changed

+68
-17
lines changed

5 files changed

+68
-17
lines changed

core/core.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,9 @@ func (app *App) Start(ctx context.Context) error {
476476
wg.Add(1)
477477
go func() {
478478
defer wg.Done()
479+
defer func() {
480+
log.Debug("closing app.db")
481+
}()
479482
<-innerCtx.Done()
480483
app.db.Close()
481484
}()
@@ -485,13 +488,19 @@ func (app *App) Start(ctx context.Context) error {
485488
wg.Add(1)
486489
go func() {
487490
defer wg.Done()
491+
defer func() {
492+
log.Debug("closing eth RPC rate limiter")
493+
}()
488494
ethRPCRateLimiterErrChan <- app.ethRPCRateLimiter.Start(innerCtx, rateLimiterCheckpointInterval)
489495
}()
490496

491497
// Set up the snapshot expiration watcher pruning logic
492498
wg.Add(1)
493499
go func() {
494500
defer wg.Done()
501+
defer func() {
502+
log.Debug("closing snapshot expiration watcher")
503+
}()
495504
ticker := time.NewTicker(expirationPollingInterval)
496505
for {
497506
select {
@@ -513,6 +522,9 @@ func (app *App) Start(ctx context.Context) error {
513522
wg.Add(1)
514523
go func() {
515524
defer wg.Done()
525+
defer func() {
526+
log.Debug("closing order watcher")
527+
}()
516528
log.Info("starting order watcher")
517529
orderWatcherErrChan <- app.orderWatcher.Watch(innerCtx)
518530
}()
@@ -528,6 +540,9 @@ func (app *App) Start(ctx context.Context) error {
528540
wg.Add(1)
529541
go func() {
530542
defer wg.Done()
543+
defer func() {
544+
log.Debug("closing block watcher")
545+
}()
531546
log.Info("starting block watcher")
532547
blockWatcherErrChan <- app.blockWatcher.Watch(innerCtx)
533548
}()
@@ -589,6 +604,9 @@ func (app *App) Start(ctx context.Context) error {
589604
wg.Add(1)
590605
go func() {
591606
defer wg.Done()
607+
defer func() {
608+
log.Debug("closing ordersync service")
609+
}()
592610
if err := app.ordersyncService.GetOrders(innerCtx, ordersyncMinPeers); err != nil {
593611
orderSyncErrChan <- err
594612
}
@@ -599,6 +617,9 @@ func (app *App) Start(ctx context.Context) error {
599617
wg.Add(1)
600618
go func() {
601619
defer wg.Done()
620+
defer func() {
621+
log.Debug("closing p2p node")
622+
}()
602623
addrs := app.node.Multiaddrs()
603624
log.WithFields(map[string]interface{}{
604625
"addresses": addrs,
@@ -608,6 +629,9 @@ func (app *App) Start(ctx context.Context) error {
608629
wg.Add(1)
609630
go func() {
610631
defer wg.Done()
632+
defer func() {
633+
log.Debug("closing new addrs checker")
634+
}()
611635
app.periodicallyCheckForNewAddrs(innerCtx, addrs)
612636
}()
613637

@@ -618,6 +642,9 @@ func (app *App) Start(ctx context.Context) error {
618642
wg.Add(1)
619643
go func() {
620644
defer wg.Done()
645+
defer func() {
646+
log.Debug("closing periodic stats logger")
647+
}()
621648
app.periodicallyLogStats(innerCtx)
622649
}()
623650

@@ -664,6 +691,7 @@ func (app *App) Start(ctx context.Context) error {
664691
// Wait for all goroutines to exit. If we reached here it means we are done
665692
// and there are no errors.
666693
wg.Wait()
694+
log.Debug("app successfully closed")
667695
return nil
668696
}
669697

core/core_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func TestEthereumChainDetection(t *testing.T) {
5353
func newTestApp(t *testing.T) *App {
5454
dataDir := "/tmp/test_node/" + uuid.New().String()
5555
config := Config{
56-
Verbosity: 6,
56+
Verbosity: 2,
5757
DataDir: dataDir,
5858
P2PTCPPort: 0,
5959
P2PWebSocketsPort: 0,

core/ordersync/ordersync.go

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"errors"
77
"fmt"
88
"math/rand"
9+
"sync"
910
"time"
1011

1112
"github.com/0xProject/0x-mesh/p2p"
@@ -23,7 +24,7 @@ const (
2324
TypeResponse = "Response"
2425
)
2526

26-
const requestResponseTimeout = 15 * time.Second
27+
const requestResponseTimeout = 30 * time.Second
2728

2829
var (
2930
// retryBackoff defines how long to wait before trying again if we didn't get
@@ -33,6 +34,7 @@ var (
3334
Max: 1 * time.Minute, // Longest back-off length
3435
Factor: 2, // Factor to multiple each successive back-off
3536
}
37+
backoffMut = &sync.Mutex{}
3638
)
3739

3840
var (
@@ -105,8 +107,8 @@ func (s *Service) SupportedSubprotocols() []string {
105107

106108
type Subprotocol interface {
107109
Name() string
108-
GetOrders(*Request) (*Response, error)
109-
HandleOrders(*Response) (*Request, error)
110+
GetOrders(context.Context, *Request) (*Response, error)
111+
HandleOrders(context.Context, *Response) (*Request, error)
110112
ParseRequestMetadata(metadata json.RawMessage) (interface{}, error)
111113
ParseResponseMetadata(metadata json.RawMessage) (interface{}, error)
112114
}
@@ -141,6 +143,9 @@ func (s *Service) GetMatchingSubprotocol(rawReq *rawRequest) (Subprotocol, error
141143
}
142144

143145
func (s *Service) HandleStream(stream network.Stream) {
146+
log.WithFields(log.Fields{
147+
"requester": stream.Conn().RemotePeer().Pretty(),
148+
}).Trace("handling ordersync stream")
144149
defer func() {
145150
_ = stream.Close()
146151
}()
@@ -151,6 +156,9 @@ func (s *Service) HandleStream(stream network.Stream) {
151156
log.WithError(err).Warn("waitForRequest returned error")
152157
return
153158
}
159+
log.WithFields(log.Fields{
160+
"requester": stream.Conn().RemotePeer().Pretty(),
161+
}).Trace("received ordersync request")
154162
if rawReq.Type != TypeRequest {
155163
log.WithField("gotType", rawReq.Type).Warn("wrong type for Request")
156164
return
@@ -160,7 +168,7 @@ func (s *Service) HandleStream(stream network.Stream) {
160168
log.WithError(err).Warn("GetMatchingSubprotocol returned error")
161169
return
162170
}
163-
res, err := handleRequestWithSubprotocol(subprotocol, rawReq)
171+
res, err := handleRequestWithSubprotocol(s.ctx, subprotocol, rawReq)
164172
if err != nil {
165173
log.WithError(err).Warn("subprotocol returned error")
166174
return
@@ -182,6 +190,7 @@ func (s *Service) HandleStream(stream network.Stream) {
182190
"error": err.Error(),
183191
"requester": remotePeerID.Pretty(),
184192
}).Warn("could not encode ordersync response")
193+
return
185194
}
186195
if res.Complete {
187196
return
@@ -200,7 +209,6 @@ func (s *Service) GetOrders(ctx context.Context, minPeers int) error {
200209
}
201210

202211
// TODO(albrow): Do this for loop partly in parallel.
203-
// TODO(albrow): Add a timeout when waiting for a response.
204212
currentNeighbors := s.node.Neighbors()
205213
shufflePeers(currentNeighbors)
206214
for _, peerID := range currentNeighbors {
@@ -233,24 +241,31 @@ func (s *Service) GetOrders(ctx context.Context, minPeers int) error {
233241
}
234242
}
235243

244+
backoffMut.Lock()
236245
delayBeforeNextRetry := retryBackoff.Duration()
246+
backoffMut.Unlock()
237247
log.WithFields(log.Fields{
238248
"delayBeforeNextRetry": delayBeforeNextRetry.String(),
239249
"minPeers": minPeers,
240250
"successfullySyncedPeers": len(successfullySyncedPeers),
241251
}).Debug("ordersync could not get orders from enough peers (trying again soon)")
242-
time.Sleep(delayBeforeNextRetry)
252+
select {
253+
case <-ctx.Done():
254+
return ctx.Err()
255+
case <-time.After(delayBeforeNextRetry):
256+
continue
257+
}
243258
}
244259

245260
return nil
246261
}
247262

248-
func handleRequestWithSubprotocol(subprotocol Subprotocol, rawReq *rawRequest) (*Response, error) {
263+
func handleRequestWithSubprotocol(ctx context.Context, subprotocol Subprotocol, rawReq *rawRequest) (*Response, error) {
249264
req, err := parseRequestWithSubprotocol(subprotocol, rawReq)
250265
if err != nil {
251266
return nil, err
252267
}
253-
return subprotocol.GetOrders(req)
268+
return subprotocol.GetOrders(ctx, req)
254269
}
255270

256271
func parseRequestWithSubprotocol(subprotocol Subprotocol, rawReq *rawRequest) (*Request, error) {
@@ -316,10 +331,12 @@ func (s *Service) getOrdersFromPeer(ctx context.Context, providerID peer.ID) err
316331
if err := json.NewEncoder(stream).Encode(rawReq); err != nil {
317332
return err
318333
}
334+
319335
rawRes, err := waitForResponse(ctx, stream)
320336
if err != nil {
321337
return err
322338
}
339+
323340
subprotocol, found := s.subprotocols[rawRes.Subprotocol]
324341
if !found {
325342
return fmt.Errorf("unsupported subprotocol: %s", subprotocol)
@@ -329,7 +346,8 @@ func (s *Service) getOrdersFromPeer(ctx context.Context, providerID peer.ID) err
329346
if err != nil {
330347
return err
331348
}
332-
nextReq, err = subprotocol.HandleOrders(res)
349+
350+
nextReq, err = subprotocol.HandleOrders(ctx, res)
333351
if err != nil {
334352
return err
335353
}
@@ -356,10 +374,10 @@ func waitForRequest(parentCtx context.Context, stream network.Stream) (*rawReque
356374
"error": err.Error(),
357375
"requester": stream.Conn().RemotePeer().Pretty(),
358376
}).Warn("could not encode ordersync request")
359-
reqChan <- &rawReq
360377
// TODO(albrow): Handle peer scores somewhere else?
361378
// s.host.ConnManager().UpsertTag(remotePeerID, scoreTag, func(current int) int { return current + inavlidMessageScoreDiff })
362379
}
380+
reqChan <- &rawReq
363381
}()
364382

365383
select {
@@ -385,10 +403,10 @@ func waitForResponse(parentCtx context.Context, stream network.Stream) (*rawResp
385403
"error": err.Error(),
386404
"provider": stream.Conn().RemotePeer().Pretty(),
387405
}).Warn("could not encode ordersync response")
388-
resChan <- &rawRes
389406
// TODO(albrow): Handle peer scores somewhere else?
390407
// s.host.ConnManager().UpsertTag(remotePeerID, scoreTag, func(current int) int { return current + inavlidMessageScoreDiff })
391408
}
409+
resChan <- &rawRes
392410
}()
393411

394412
select {

core/ordersync_subprotocols.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func (p *FilteredPaginationSubProtocol) Name() string {
3838
return "/pagination-with-filter/version/0"
3939
}
4040

41-
func (p *FilteredPaginationSubProtocol) GetOrders(req *ordersync.Request) (*ordersync.Response, error) {
41+
func (p *FilteredPaginationSubProtocol) GetOrders(ctx context.Context, req *ordersync.Request) (*ordersync.Response, error) {
4242
var metadata *FilteredPaginationRequestMetadata
4343
if req.Metadata == nil {
4444
// Default metadata for the first request.
@@ -73,17 +73,16 @@ func (p *FilteredPaginationSubProtocol) GetOrders(req *ordersync.Request) (*orde
7373
}, nil
7474
}
7575

76-
func (p *FilteredPaginationSubProtocol) HandleOrders(res *ordersync.Response) (*ordersync.Request, error) {
76+
func (p *FilteredPaginationSubProtocol) HandleOrders(ctx context.Context, res *ordersync.Response) (*ordersync.Request, error) {
7777
if res.Metadata == nil {
7878
return nil, errors.New("FilteredPaginationSubProtocol received response with nil metadata")
7979
}
8080
metadata, ok := res.Metadata.(*FilteredPaginationResponseMetadata)
8181
if !ok {
8282
return nil, fmt.Errorf("FilteredPaginationSubProtocol received response with wrong metadata type (got %T)", res.Metadata)
8383
}
84-
// TODO(albrow): Pass in context to ValidateAndStoreValidOrders
8584
// TODO(albrow): Check that this order matches our current filter/topic
86-
_, err := p.app.orderWatcher.ValidateAndStoreValidOrders(context.Background(), res.Orders, false, p.app.chainID)
85+
_, err := p.app.orderWatcher.ValidateAndStoreValidOrders(ctx, res.Orders, false, p.app.chainID)
8786
if err != nil {
8887
return nil, err
8988
}

zeroex/orderwatch/order_watcher.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1417,7 +1417,13 @@ func (w *Watcher) ValidateAndStoreValidOrders(ctx context.Context, orders []*zer
14171417
allOrderEvents = append(allOrderEvents, orderEvents...)
14181418
}
14191419

1420-
w.orderFeed.Send(allOrderEvents)
1420+
if len(allOrderEvents) > 0 {
1421+
// NOTE(albrow): Send can block if the subscriber(s) are slow. Blocking here can cause problems when Mesh is
1422+
// shutting down, so to prevent that, we call Send in a goroutine.
1423+
go func() {
1424+
w.orderFeed.Send(allOrderEvents)
1425+
}()
1426+
}
14211427

14221428
return results, nil
14231429
}

0 commit comments

Comments
 (0)