Skip to content
This repository was archived by the owner on Oct 11, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 51 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
3dc71ba
Remove old algorithm for order sharing via GossipSub
albrow Jan 28, 2020
12e94a5
WIP initial implementation
albrow Jan 29, 2020
835fec3
Improve test for p2p/ordersync
albrow Jan 29, 2020
1b4218b
Implement ordersync.Provider in core package
albrow Jan 29, 2020
ba6591e
Make new core tests serial
albrow Jan 30, 2020
43fbeb6
Wait for nodes to exit in core_test
albrow Jan 31, 2020
cc1dc49
Try running new core tests without the race detector
albrow Feb 4, 2020
20b230b
Only run core serial tests in CI
albrow Feb 4, 2020
b95db9d
Add more log spam
albrow Feb 4, 2020
4743c61
Log spam about number of orders requested/provided
albrow Feb 4, 2020
20d7fa7
More logging
albrow Feb 4, 2020
7c57e00
log when orders are invalid
albrow Feb 4, 2020
2dd98d5
Add additional sleep statement
albrow Feb 4, 2020
1004272
Change timing for test
albrow Feb 4, 2020
616de9d
Remove some logs
albrow Feb 4, 2020
52e22d3
Increase ETH RPC rate limiting limits for tests
albrow Feb 4, 2020
5663d67
Re-enable all tests
albrow Feb 4, 2020
e5611ec
Remove more old logs
albrow Feb 4, 2020
f799e3d
Move ordersync to core package; create subprotocols
albrow Feb 5, 2020
48c90f9
Use BlockchainLifecycle in new core tests
albrow Feb 6, 2020
1569ba3
Update core.go with new ordersync logic
albrow Feb 6, 2020
ab315c1
Fix some bugs and get integration tests passing
albrow Feb 7, 2020
76cb7a2
make core_test more robust
albrow Feb 7, 2020
7d29d5f
Add timeout for ordersync requests and responses
albrow Feb 7, 2020
46d0e20
Fix failing tests
albrow Feb 10, 2020
ef1a6a7
Wait for event to be sent before returning from ValidateAndStoreValid…
albrow Feb 12, 2020
4ce1cf6
Increase test timeouts
albrow Feb 13, 2020
e9c23d3
Add manual delay in browser integraiton tests
albrow Feb 13, 2020
d2fd249
Add note about timing hack in browser integration tests
albrow Feb 13, 2020
ca01b9a
Remove old files
albrow Feb 13, 2020
fa3c492
Add a lot more comments
albrow Feb 13, 2020
05165cd
Break out of for loop in GetOrders when minPeers reached
albrow Feb 13, 2020
e356d86
Add appropriate peer score events
albrow Feb 14, 2020
bce828b
Add rate limiting
albrow Feb 14, 2020
1bdd00a
Remove old constants
albrow Feb 14, 2020
57fce7d
Update core/ordersync/ordersync.go
albrow Feb 14, 2020
9f575b3
Update core/ordersync_subprotocols.go
albrow Feb 14, 2020
51f9477
Update core/ordersync_subprotocols.go
albrow Feb 14, 2020
748c7bd
Add missing newline
albrow Feb 14, 2020
c7c36a0
Merge branch 'feature/better-old-order-sharing' of github.com:0xProje…
albrow Feb 14, 2020
91b334b
Fix assertion in core_test.go
albrow Feb 14, 2020
06a6d2f
Rename some methods of ordersync.Subprotocol
albrow Feb 15, 2020
1c73f2d
Increase minPeers to 5
albrow Feb 15, 2020
fdfa934
Take filters into account in ordersync protocol
albrow Feb 18, 2020
b743b5d
Update some comments
albrow Feb 18, 2020
067e94b
Add log message when receiving valid orders via ordersync
albrow Feb 18, 2020
4c8b0b6
Update core/ordersync_subprotocols.go
albrow Feb 19, 2020
1b3f820
Use same log message for receiving new order from peer
albrow Feb 19, 2020
ab2ba8a
Update core/ordersync_subprotocols.go
albrow Feb 19, 2020
ae4b65b
Return errors in waitForResponse and waitForRequest
albrow Feb 19, 2020
bc38f92
Merge branch 'feature/better-old-order-sharing' of github.com:0xProje…
albrow Feb 19, 2020
5f77523
Add note to changelog
albrow Feb 19, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ test-all: test-go test-wasm-node test-wasm-browser
.PHONY: test-go
test-go: test-go-parallel test-go-serial test-browser-conversion


Copy link
Contributor

@jalextowle jalextowle Feb 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this purely a stylistic change? Just wondering if this is one of those weird make syntax elements that actually matter a lot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is a carryover from conventions we used at a previous job. I like to put two spaces between Make tasks. It's subjective and probably not worth enforcing. Do you think I should just undo this commit and not make any formatting-only changes to the Makefile?

Copy link
Contributor

@jalextowle jalextowle Feb 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, I'm totally fine with this change and am actually a fan of the style. I just wanted to make sure that I understood it properly. I think that some formatting-only changes are bad because they mess up git --blame, but a blank line isn't going to cause a bug.

.PHONY: test-go-parallel
test-go-parallel:
go test ./... -race -timeout 30s


.PHONY: test-go-serial
test-go-serial:
go test ./zeroex/ordervalidator ./zeroex/orderwatch ./core -race -timeout 90s -p=1 --serial
Expand All @@ -54,7 +56,7 @@ test-browser-integration:

.PHONY: test-browser-conversion
test-browser-conversion:
go test ./browser/go/conversion-test -timeout 120s --enable-browser-conversion-tests -run BrowserConversions
go test ./browser/go/conversion-test -timeout 185s --enable-browser-conversion-tests -run BrowserConversions

.PHONY: test-wasm-node
test-wasm-node:
Expand Down
2 changes: 1 addition & 1 deletion browser/go/conversion-test/conversion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestBrowserConversions(t *testing.T) {

// Declare a context that will be used for all child processes, servers, and
// other goroutines.
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second)
ctx, _ = chromedp.NewContext(ctx, chromedp.WithErrorf(t.Errorf))
defer cancel()

Expand Down
65 changes: 56 additions & 9 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/0xProject/0x-mesh/common/types"
"github.com/0xProject/0x-mesh/constants"
"github.com/0xProject/0x-mesh/core/ordersync"
"github.com/0xProject/0x-mesh/db"
"github.com/0xProject/0x-mesh/encoding"
"github.com/0xProject/0x-mesh/ethereum"
Expand Down Expand Up @@ -61,6 +62,10 @@ const (
// logStatsInterval is how often to log stats for this node.
logStatsInterval = 5 * time.Minute
version = "development"
// ordersyncMinPeers is the minimum amount of peers to receive orders from
// before considering the ordersync process finished.
ordersyncMinPeers = 5
paginationSubprotocolPerPage = 500
)

// Note(albrow): The Config type is currently copied to browser/ts/index.ts. We
Expand Down Expand Up @@ -193,8 +198,8 @@ type App struct {
idToSnapshotInfo map[string]snapshotInfo
ethRPCRateLimiter ratelimit.RateLimiter
ethRPCClient ethrpcclient.Client
orderSelector *orderSelector
db *meshdb.MeshDB
ordersyncService *ordersync.Service

// started is closed to signal that the App has been started. Some methods
// will block until after the App is started.
Expand Down Expand Up @@ -348,11 +353,6 @@ func New(config Config) (*App, error) {

// Initialize remaining fields.
snapshotExpirationWatcher := expirationwatch.New()
orderSelector := &orderSelector{
topic: orderFilter.Topic(),
nextOffset: 0,
db: meshDB,
}

app := &App{
started: make(chan struct{}),
Expand All @@ -366,7 +366,6 @@ func New(config Config) (*App, error) {
orderFilter: orderFilter,
snapshotExpirationWatcher: snapshotExpirationWatcher,
idToSnapshotInfo: map[string]snapshotInfo{},
orderSelector: orderSelector,
ethRPCRateLimiter: ethRPCRateLimiter,
ethRPCClient: ethClient,
db: meshDB,
Expand Down Expand Up @@ -476,6 +475,9 @@ func (app *App) Start(ctx context.Context) error {
wg.Add(1)
go func() {
defer wg.Done()
defer func() {
log.Debug("closing app.db")
}()
<-innerCtx.Done()
app.db.Close()
}()
Expand All @@ -485,13 +487,19 @@ func (app *App) Start(ctx context.Context) error {
wg.Add(1)
go func() {
defer wg.Done()
defer func() {
log.Debug("closing eth RPC rate limiter")
}()
ethRPCRateLimiterErrChan <- app.ethRPCRateLimiter.Start(innerCtx, rateLimiterCheckpointInterval)
}()

// Set up the snapshot expiration watcher pruning logic
wg.Add(1)
go func() {
defer wg.Done()
defer func() {
log.Debug("closing snapshot expiration watcher")
}()
ticker := time.NewTicker(expirationPollingInterval)
for {
select {
Expand All @@ -513,6 +521,9 @@ func (app *App) Start(ctx context.Context) error {
wg.Add(1)
go func() {
defer wg.Done()
defer func() {
log.Debug("closing order watcher")
}()
log.Info("starting order watcher")
orderWatcherErrChan <- app.orderWatcher.Watch(innerCtx)
}()
Expand All @@ -528,6 +539,9 @@ func (app *App) Start(ctx context.Context) error {
wg.Add(1)
go func() {
defer wg.Done()
defer func() {
log.Debug("closing block watcher")
}()
log.Info("starting block watcher")
blockWatcherErrChan <- app.blockWatcher.Watch(innerCtx)
}()
Expand Down Expand Up @@ -580,11 +594,31 @@ func (app *App) Start(ctx context.Context) error {
return err
}

// Register and start ordersync service.
ordersyncSubprotocols := []ordersync.Subprotocol{
NewFilteredPaginationSubprotocol(app, paginationSubprotocolPerPage),
}
app.ordersyncService = ordersync.New(innerCtx, app.node, ordersyncSubprotocols)
Copy link
Contributor

@jalextowle jalextowle Feb 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the fact that we are calling New in a separate thread than we are calling GetOrders without synchronization makes the case that I describe in the other file (it is the comment that describes an example heuristic) more likely. It's entirely possible that this peer will be hit with a GetOrders request from a new peer before it has actually finished bootstrapping it's database using the ordersync protocol.

I think that an even more impactful change than the heuristic I described would be for New to take a done channel that will be sent a signal whenever the GetOrders function has finished executing. Prior to receiving the signal, HandleStream would reject with a "Database not initialized" warning.

Thoughts?

Copy link
Contributor Author

@albrow albrow Feb 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The downside to this approach is that for networks or topics that don't have minPeers peers, the ordersync protocol would fail and no one would be able to get any existing orders. In a general sense we have no way to guarantee that you are done syncing orders and should no longer be considered "new". I think it's better to leave the implementation as is. It is not very statistically likely that every peer you dial will be "new", especially if we increase minPeers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. I'm still somewhat concerned about a future in which there are a lot of browser nodes that are spinning up and down quickly and are being used as ordersync providers, but I think we can cross that bridge when we come to it considering the downsides that you mention.

orderSyncErrChan := make(chan error, 1)
wg.Add(1)
go func() {
defer wg.Done()
defer func() {
log.Debug("closing ordersync service")
}()
if err := app.ordersyncService.GetOrders(innerCtx, ordersyncMinPeers); err != nil {
orderSyncErrChan <- err
}
}()

// Start the p2p node.
p2pErrChan := make(chan error, 1)
wg.Add(1)
go func() {
defer wg.Done()
defer func() {
log.Debug("closing p2p node")
}()
addrs := app.node.Multiaddrs()
log.WithFields(map[string]interface{}{
"addresses": addrs,
Expand All @@ -594,6 +628,9 @@ func (app *App) Start(ctx context.Context) error {
wg.Add(1)
go func() {
defer wg.Done()
defer func() {
log.Debug("closing new addrs checker")
}()
app.periodicallyCheckForNewAddrs(innerCtx, addrs)
}()

Expand All @@ -604,6 +641,9 @@ func (app *App) Start(ctx context.Context) error {
wg.Add(1)
go func() {
defer wg.Done()
defer func() {
log.Debug("closing periodic stats logger")
}()
app.periodicallyLogStats(innerCtx)
}()

Expand All @@ -628,14 +668,20 @@ func (app *App) Start(ctx context.Context) error {
return err
}
case err := <-blockWatcherErrChan:
log.WithError(err).Error("block watcher exited with error")
if err != nil {
log.WithError(err).Error("block watcher exited with error")
cancel()
return err
}
case err := <-ethRPCRateLimiterErrChan:
log.WithError(err).Error("ETH JSON-RPC ratelimiter exited with error")
if err != nil {
log.WithError(err).Error("ETH JSON-RPC ratelimiter exited with error")
cancel()
return err
}
case err := <-orderSyncErrChan:
if err != nil {
log.WithError(err).Error("ordersync service exited with error")
cancel()
return err
}
Expand All @@ -644,6 +690,7 @@ func (app *App) Start(ctx context.Context) error {
// Wait for all goroutines to exit. If we reached here it means we are done
// and there are no errors.
wg.Wait()
log.Debug("app successfully closed")
return nil
}

Expand Down
Loading