-
Notifications
You must be signed in to change notification settings - Fork 15
Parallel Diff #62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parallel Diff #62
Conversation
|
|
||
| type mockBlocks struct { | ||
| data map[cid.Cid]block.Block | ||
| dataMu sync.Mutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
required for accessing the blockstore in parallel (as is done by ParallelDiff), the race detector throws a fit otherwise.
| return &mockBlocks{make(map[cid.Cid]block.Block), 0, 0} | ||
| type mockBlocksOpt func(m *mockBlocks) | ||
|
|
||
| func WithGetDelay(d time.Duration) mockBlocksOpt { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Used for (poorly) simulating a disk read
| After: "bar", | ||
| } | ||
|
|
||
| ec.assertExpectation(t, cs[0]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've moved these assertions into diffAndAssertLength, here and elsewhere
| grp, ctx := errgroup.WithContext(ctx) | ||
| out := make(chan *Change) | ||
| parallelDiffNode(ctx, prevCtx, curCtx, prevAmt.node, curAmt.node, 0, grp, out) | ||
|
|
||
| var changes []*Change | ||
| done := make(chan struct{}, 1) | ||
| go func() { | ||
| for change := range out { | ||
| changes = append(changes, change) | ||
| } | ||
| done <- struct{}{} | ||
| }() | ||
|
|
||
| if err := grp.Wait(); err != nil { | ||
| close(out) | ||
| return nil, err | ||
| } | ||
| close(out) | ||
| <-done |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's probably a nice pattern for this, but it works for now.
| } | ||
|
|
||
| func parallelDiffNode(ctx context.Context, prevCtx, curCtx *nodeContext, prev, cur *node, offset uint64, grp *errgroup.Group, outCh chan *Change) { | ||
| grp.Go(func() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
parallelism here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks very nice. I made some suggestions for making the test names a bit more regular.
I noticed that the speedup for parallelism isn't that great when using larger bitwidths. At width 2 we see a 5x speedup, but at width 18 they are the same speed and parallel may even be a bit slower.
I did a bit of a hack and added an atomic count of the number of goroutines started. For the largest diff at bitwidth 2 we use 1716 to get the 5x speedup. However from bitwidth 16 upwards we only start a single goroutine. That suggests that the test AMT is not large enough to warrant the parallelism.
Filecoin actors use bitwidths 5 and 6 which see a 2-3x speedup in this code which is great.
Include some timing results in the PR description or final commit.
| } | ||
|
|
||
| func (mb *mockBlocks) Get(c cid.Cid) (block.Block, error) { | ||
| time.Sleep(mb.delay) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Skip the sleep if mb.delay == 0
| Before: "foo" + strconv.Itoa(i+1), | ||
| After: "", | ||
| for multiplier := 1; multiplier < 7; multiplier++ { | ||
| t.Run(fmt.Sprintf("Multiplier %d", multiplier), func(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can remove this and change the name of the tests in diffAndAssertLength to include the number of changes
| } | ||
| var serial time.Duration | ||
| var parallel time.Duration | ||
| t.Run("assert serial diff", func(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename this to something like fmt.Sprintf("w%d-l%d-serial", a.bitWidth, expectedLength) to give a subtest name of w3-l8000-serial.
|
|
||
| return cs | ||
| }) | ||
| t.Logf("Serial Diff took %s\tsize\t%d\tbitwidth %d", serial.Round(time.Millisecond), expectedLength, a.bitWidth) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move these logs into the appropriate subtest
Codecov Report
@@ Coverage Diff @@
## master #62 +/- ##
==========================================
+ Coverage 60.02% 61.98% +1.96%
==========================================
Files 8 9 +1
Lines 973 1231 +258
==========================================
+ Hits 584 763 +179
- Misses 265 319 +54
- Partials 124 149 +25
Continue to review full report at Codecov.
|
|
in favor of #67 |
Add new method
ParallelDiffthat performs diffing in parallel. Reuses existing tests.