Skip to content

Commit 8d9761d

Browse files
authored
Merge pull request #1141 from shaohk/fix-two-phase-commit-lost-data_v2
fix(lost data in mysql two-phase commit): lost data in mysql two-phas…
2 parents f0209e8 + b80b6e7 commit 8d9761d

File tree

2 files changed

+24
-1
lines changed

2 files changed

+24
-1
lines changed

go/logic/applier.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -421,8 +421,28 @@ func (this *Applier) ReadMigrationMaxValues(uniqueKey *sql.UniqueKey) error {
421421
return err
422422
}
423423

424-
// ReadMigrationRangeValues reads min/max values that will be used for rowcopy
424+
// ReadMigrationRangeValues reads min/max values that will be used for rowcopy.
425+
// Before read min/max, write a changelog state into the ghc table to avoid lost data in mysql two-phase commit.
426+
/*
427+
Detail description of the lost data in mysql two-phase commit issue by @Fanduzi:
428+
When using semi-sync and setting rpl_semi_sync_master_wait_point=AFTER_SYNC,
429+
if an INSERT statement is being committed but blocks due to an unmet ack count,
430+
the data inserted by the transaction is not visible to ReadMigrationRangeValues,
431+
so the copy of the existing data in the table does not include the new row inserted by the transaction.
432+
However, the binlog event for the transaction is already written to the binlog,
433+
so the addDMLEventsListener only captures the binlog event after the transaction,
434+
and thus the transaction's binlog event is not captured, resulting in data loss.
435+
436+
If write a changelog into ghc table before ReadMigrationRangeValues, and the transaction commit blocks
437+
because the ack is not met, then the changelog will not be able to write, so the ReadMigrationRangeValues
438+
will not be run. When the changelog writes successfully, the ReadMigrationRangeValues will read the
439+
newly inserted data, thus Avoiding data loss due to the above problem.
440+
*/
425441
func (this *Applier) ReadMigrationRangeValues() error {
442+
if _, err := this.WriteChangelogState(string(ReadMigrationRangeValues)); err != nil {
443+
return err
444+
}
445+
426446
if err := this.ReadMigrationMinValues(this.migrationContext.UniqueKey); err != nil {
427447
return err
428448
}

go/logic/migrator.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type ChangelogState string
2626
const (
2727
GhostTableMigrated ChangelogState = "GhostTableMigrated"
2828
AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed"
29+
ReadMigrationRangeValues = "ReadMigrationRangeValues"
2930
)
3031

3132
func ReadChangelogState(s string) ChangelogState {
@@ -234,6 +235,8 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
234235
this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc)
235236
}()
236237
}
238+
case ReadMigrationRangeValues:
239+
// no-op event
237240
default:
238241
{
239242
return fmt.Errorf("Unknown changelog state: %+v", changelogState)

0 commit comments

Comments
 (0)