Skip to content
This repository was archived by the owner on Oct 11, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
3dc71ba
Remove old algorithm for order sharing via GossipSub
albrow Jan 28, 2020
12e94a5
WIP initial implementation
albrow Jan 29, 2020
835fec3
Improve test for p2p/ordersync
albrow Jan 29, 2020
1b4218b
Implement ordersync.Provider in core package
albrow Jan 29, 2020
ba6591e
Make new core tests serial
albrow Jan 30, 2020
43fbeb6
Wait for nodes to exit in core_test
albrow Jan 31, 2020
cc1dc49
Try running new core tests without the race detector
albrow Feb 4, 2020
20b230b
Only run core serial tests in CI
albrow Feb 4, 2020
b95db9d
Add more log spam
albrow Feb 4, 2020
4743c61
Log spam about number of orders requested/provided
albrow Feb 4, 2020
20d7fa7
More logging
albrow Feb 4, 2020
7c57e00
log when orders are invalid
albrow Feb 4, 2020
2dd98d5
Add additional sleep statement
albrow Feb 4, 2020
1004272
Change timing for test
albrow Feb 4, 2020
616de9d
Remove some logs
albrow Feb 4, 2020
52e22d3
Increase ETH RPC rate limiting limits for tests
albrow Feb 4, 2020
5663d67
Re-enable all tests
albrow Feb 4, 2020
e5611ec
Remove more old logs
albrow Feb 4, 2020
f799e3d
Move ordersync to core package; create subprotocols
albrow Feb 5, 2020
48c90f9
Use BlockchainLifecycle in new core tests
albrow Feb 6, 2020
1569ba3
Update core.go with new ordersync logic
albrow Feb 6, 2020
ab315c1
Fix some bugs and get integration tests passing
albrow Feb 7, 2020
76cb7a2
make core_test more robust
albrow Feb 7, 2020
7d29d5f
Add timeout for ordersync requests and responses
albrow Feb 7, 2020
46d0e20
Fix failing tests
albrow Feb 10, 2020
ef1a6a7
Wait for event to be sent before returning from ValidateAndStoreValid…
albrow Feb 12, 2020
4ce1cf6
Increase test timeouts
albrow Feb 13, 2020
e9c23d3
Add manual delay in browser integraiton tests
albrow Feb 13, 2020
d2fd249
Add note about timing hack in browser integration tests
albrow Feb 13, 2020
ca01b9a
Remove old files
albrow Feb 13, 2020
fa3c492
Add a lot more comments
albrow Feb 13, 2020
05165cd
Break out of for loop in GetOrders when minPeers reached
albrow Feb 13, 2020
e356d86
Add appropriate peer score events
albrow Feb 14, 2020
bce828b
Add rate limiting
albrow Feb 14, 2020
1bdd00a
Remove old constants
albrow Feb 14, 2020
57fce7d
Update core/ordersync/ordersync.go
albrow Feb 14, 2020
9f575b3
Update core/ordersync_subprotocols.go
albrow Feb 14, 2020
51f9477
Update core/ordersync_subprotocols.go
albrow Feb 14, 2020
748c7bd
Add missing newline
albrow Feb 14, 2020
c7c36a0
Merge branch 'feature/better-old-order-sharing' of github.com:0xProje…
albrow Feb 14, 2020
91b334b
Fix assertion in core_test.go
albrow Feb 14, 2020
06a6d2f
Rename some methods of ordersync.Subprotocol
albrow Feb 15, 2020
1c73f2d
Increase minPeers to 5
albrow Feb 15, 2020
fdfa934
Take filters into account in ordersync protocol
albrow Feb 18, 2020
b743b5d
Update some comments
albrow Feb 18, 2020
067e94b
Add log message when receiving valid orders via ordersync
albrow Feb 18, 2020
4c8b0b6
Update core/ordersync_subprotocols.go
albrow Feb 19, 2020
1b3f820
Use same log message for receiving new order from peer
albrow Feb 19, 2020
ab2ba8a
Update core/ordersync_subprotocols.go
albrow Feb 19, 2020
ae4b65b
Return errors in waitForResponse and waitForRequest
albrow Feb 19, 2020
bc38f92
Merge branch 'feature/better-old-order-sharing' of github.com:0xProje…
albrow Feb 19, 2020
5f77523
Add note to changelog
albrow Feb 19, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 24 additions & 20 deletions core/ordersync/ordersync.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ const (
// Request represents a high-level ordersync request. It abstracts away some
// of the details of subprotocol negotiation and encoding/decoding.
type Request struct {
Metadata interface{} `json:"metadata"`
RequesterID peer.ID `json:"requesterID"`
Metadata interface{} `json:"metadata"`
}

// rawRequest contains all the details we need at the lowest level to encode/decode
Expand All @@ -93,9 +94,10 @@ type rawRequest struct {
// Response represents a high-level ordersync response. It abstracts away some
// of the details of subprotocol negotiation and encoding/decoding.
type Response struct {
Orders []*zeroex.SignedOrder `json:"orders"`
Complete bool `json:"complete"`
Metadata interface{} `json:"metadata"`
ProviderID peer.ID `json:"providerID"`
Orders []*zeroex.SignedOrder `json:"orders"`
Complete bool `json:"complete"`
Metadata interface{} `json:"metadata"`
}

// rawResponse contains all the details we need at the lowest level to encode/decode
Expand Down Expand Up @@ -206,7 +208,7 @@ func (s *Service) HandleStream(stream network.Stream) {
defer func() {
_ = stream.Close()
}()
remotePeerID := stream.Conn().RemotePeer()
requesterID := stream.Conn().RemotePeer()

for {
if err := s.requestRateLimiter.Wait(s.ctx); err != nil {
Expand All @@ -225,16 +227,16 @@ func (s *Service) HandleStream(stream network.Stream) {
}).Trace("received ordersync request")
if rawReq.Type != TypeRequest {
log.WithField("gotType", rawReq.Type).Warn("wrong type for Request")
s.handlePeerScoreEvent(remotePeerID, psInvalidMessage)
s.handlePeerScoreEvent(requesterID, psInvalidMessage)
return
}
subprotocol, err := s.GetMatchingSubprotocol(rawReq)
if err != nil {
log.WithError(err).Warn("GetMatchingSubprotocol returned error")
s.handlePeerScoreEvent(remotePeerID, psSubprotocolNegotiationFailed)
s.handlePeerScoreEvent(requesterID, psSubprotocolNegotiationFailed)
return
}
res, err := handleRequestWithSubprotocol(s.ctx, subprotocol, rawReq)
res, err := handleRequestWithSubprotocol(s.ctx, subprotocol, requesterID, rawReq)
if err != nil {
log.WithError(err).Warn("subprotocol returned error")
return
Expand All @@ -244,7 +246,7 @@ func (s *Service) HandleStream(stream network.Stream) {
log.WithError(err).Error("could not encode raw metadata")
return
}
s.handlePeerScoreEvent(remotePeerID, psValidMessage)
s.handlePeerScoreEvent(requesterID, psValidMessage)
rawRes := rawResponse{
Type: TypeResponse,
Subprotocol: subprotocol.Name(),
Expand All @@ -255,9 +257,9 @@ func (s *Service) HandleStream(stream network.Stream) {
if err := json.NewEncoder(stream).Encode(rawRes); err != nil {
log.WithFields(log.Fields{
"error": err.Error(),
"requester": remotePeerID.Pretty(),
"requester": requesterID.Pretty(),
}).Warn("could not encode ordersync response")
s.handlePeerScoreEvent(remotePeerID, psUnexpectedDisconnect)
s.handlePeerScoreEvent(requesterID, psUnexpectedDisconnect)
return
}
if res.Complete {
Expand Down Expand Up @@ -336,33 +338,35 @@ func (s *Service) GetOrders(ctx context.Context, minPeers int) error {
return nil
}

func handleRequestWithSubprotocol(ctx context.Context, subprotocol Subprotocol, rawReq *rawRequest) (*Response, error) {
req, err := parseRequestWithSubprotocol(subprotocol, rawReq)
func handleRequestWithSubprotocol(ctx context.Context, subprotocol Subprotocol, requesterID peer.ID, rawReq *rawRequest) (*Response, error) {
req, err := parseRequestWithSubprotocol(subprotocol, requesterID, rawReq)
if err != nil {
return nil, err
}
return subprotocol.HandleOrderSyncRequest(ctx, req)
}

func parseRequestWithSubprotocol(subprotocol Subprotocol, rawReq *rawRequest) (*Request, error) {
func parseRequestWithSubprotocol(subprotocol Subprotocol, requesterID peer.ID, rawReq *rawRequest) (*Request, error) {
metadata, err := subprotocol.ParseRequestMetadata(rawReq.Metadata)
if err != nil {
return nil, err
}
return &Request{
Metadata: metadata,
RequesterID: requesterID,
Metadata: metadata,
}, nil
}

func parseResponseWithSubprotocol(subprotocol Subprotocol, rawRes *rawResponse) (*Response, error) {
func parseResponseWithSubprotocol(subprotocol Subprotocol, providerID peer.ID, rawRes *rawResponse) (*Response, error) {
metadata, err := subprotocol.ParseResponseMetadata(rawRes.Metadata)
if err != nil {
return nil, err
}
return &Response{
Orders: rawRes.Orders,
Complete: rawRes.Complete,
Metadata: metadata,
ProviderID: providerID,
Orders: rawRes.Orders,
Complete: rawRes.Complete,
Metadata: metadata,
}, nil
}

Expand Down Expand Up @@ -422,7 +426,7 @@ func (s *Service) getOrdersFromPeer(ctx context.Context, providerID peer.ID) err
return fmt.Errorf("unsupported subprotocol: %s", subprotocol)
}
selectedSubprotocol = subprotocol
res, err := parseResponseWithSubprotocol(subprotocol, rawRes)
res, err := parseResponseWithSubprotocol(subprotocol, providerID, rawRes)
if err != nil {
s.handlePeerScoreEvent(providerID, psInvalidMessage)
return err
Expand Down
66 changes: 48 additions & 18 deletions core/ordersync_subprotocols.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"

"github.com/0xProject/0x-mesh/core/ordersync"
"github.com/0xProject/0x-mesh/orderfilter"
"github.com/0xProject/0x-mesh/zeroex"
)

Expand All @@ -17,16 +18,18 @@ var _ ordersync.Subprotocol = (*FilteredPaginationSubProtocol)(nil)
// paginating through them. It involves sending multiple requests until pagination is
// finished and all orders have been returned.
type FilteredPaginationSubProtocol struct {
app *App
perPage int
app *App
orderFilter *orderfilter.Filter
perPage int
}

// NewFilteredPaginationSubprotocol creates and returns a new FilteredPaginationSubprotocol
// which will respond with perPage orders for each individual request/response.
func NewFilteredPaginationSubprotocol(app *App, perPage int) *FilteredPaginationSubProtocol {
return &FilteredPaginationSubProtocol{
app: app,
perPage: perPage,
app: app,
orderFilter: app.orderFilter,
perPage: perPage,
}
}

Expand Down Expand Up @@ -68,22 +71,40 @@ func (p *FilteredPaginationSubProtocol) HandleOrderSyncRequest(ctx context.Conte
return nil, fmt.Errorf("FilteredPaginationSubProtocol received request with wrong metadata type (got %T)", req.Metadata)
}
}

ordersResp, err := p.app.GetOrders(metadata.Page, p.perPage, metadata.SnapshotID)
if err != nil {
return nil, err
}
orders := make([]*zeroex.SignedOrder, len(ordersResp.OrdersInfos))
for i, orderInfo := range ordersResp.OrdersInfos {
orders[i] = orderInfo.SignedOrder
// It's possible that none of the orders in the current page match the filter.
// We don't want to respond with zero orders, so keep iterating until we find
// at least some orders that match the filter.
filteredOrders := []*zeroex.SignedOrder{}
var snapshotID string
var currentPage int
for currentPage = metadata.Page; len(filteredOrders) == 0; currentPage += 1 {
// Get the orders for this page.
ordersResp, err := p.app.GetOrders(currentPage, p.perPage, metadata.SnapshotID)
if err != nil {
return nil, err
}
snapshotID = ordersResp.SnapshotID
if len(ordersResp.OrdersInfos) == 0 {
// No more orders left.
break
}
// Filter the orders for this page. If none of them match the filter, we continue
// on to the next page.
for _, orderInfo := range ordersResp.OrdersInfos {
if matches, err := p.orderFilter.MatchOrder(orderInfo.SignedOrder); err != nil {
return nil, err
} else if matches {
filteredOrders = append(filteredOrders, orderInfo.SignedOrder)
}
}
}
// TODO(albrow): Filter orders

return &ordersync.Response{
Orders: orders,
Complete: len(orders) == 0,
Orders: filteredOrders,
Complete: len(filteredOrders) == 0,
Metadata: &FilteredPaginationResponseMetadata{
Page: metadata.Page,
SnapshotID: ordersResp.SnapshotID,
Page: currentPage,
SnapshotID: snapshotID,
},
}, nil
}
Expand All @@ -99,7 +120,16 @@ func (p *FilteredPaginationSubProtocol) HandleOrderSyncResponse(ctx context.Cont
if !ok {
return nil, fmt.Errorf("FilteredPaginationSubProtocol received response with wrong metadata type (got %T)", res.Metadata)
}
// TODO(albrow): Check that this order matches our current filter/topic
filteredOrders := []*zeroex.SignedOrder{}
for _, order := range res.Orders {
if matches, err := p.orderFilter.MatchOrder(order); err != nil {
return nil, err
} else if matches {
filteredOrders = append(filteredOrders, order)
} else if !matches {
p.app.handlePeerScoreEvent(res.ProviderID, psReceivedOrderDoesNotMatchFilter)
}
}
_, err := p.app.orderWatcher.ValidateAndStoreValidOrders(ctx, res.Orders, false, p.app.chainID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_, err := p.app.orderWatcher.ValidateAndStoreValidOrders(ctx, res.Orders, false, p.app.chainID)
_, err := p.app.orderWatcher.ValidateAndStoreValidOrders(ctx, filteredOrders, false, p.app.chainID)

if err != nil {
return nil, err
Expand Down
3 changes: 3 additions & 0 deletions core/peer_score.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const (
psInvalidMessage peerScoreEvent = iota
psValidMessage
psOrderStored
psReceivedOrderDoesNotMatchFilter
)

func (app *App) handlePeerScoreEvent(id peer.ID, event peerScoreEvent) {
Expand All @@ -26,6 +27,8 @@ func (app *App) handlePeerScoreEvent(id peer.ID, event peerScoreEvent) {
app.node.SetPeerScore(id, "valid-message", 5)
case psOrderStored:
app.node.SetPeerScore(id, "order-stored", 10)
case psReceivedOrderDoesNotMatchFilter:
app.node.SetPeerScore(id, "received-order-does-not-match-filter", -10)
default:
log.WithField("event", event).Error("unknown peerScoreEvent")
}
Expand Down