Skip to content

Commit 263ad79

Browse files
dm-2RainbowDashy
authored andcommitted
Merge pull request github#1141 from shaohk/fix-two-phase-commit-lost-data_v2
fix(lost data in mysql two-phase commit): lost data in mysql two-phas…
1 parent 0f1ad00 commit 263ad79

File tree

2 files changed

+24
-1
lines changed

2 files changed

+24
-1
lines changed

go/logic/applier.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -424,8 +424,28 @@ func (this *Applier) ReadMigrationMaxValues(uniqueKey *sql.UniqueKey) error {
424424
return err
425425
}
426426

427-
// ReadMigrationRangeValues reads min/max values that will be used for rowcopy
427+
// ReadMigrationRangeValues reads min/max values that will be used for rowcopy.
428+
// Before read min/max, write a changelog state into the ghc table to avoid lost data in mysql two-phase commit.
429+
/*
430+
Detail description of the lost data in mysql two-phase commit issue by @Fanduzi:
431+
When using semi-sync and setting rpl_semi_sync_master_wait_point=AFTER_SYNC,
432+
if an INSERT statement is being committed but blocks due to an unmet ack count,
433+
the data inserted by the transaction is not visible to ReadMigrationRangeValues,
434+
so the copy of the existing data in the table does not include the new row inserted by the transaction.
435+
However, the binlog event for the transaction is already written to the binlog,
436+
so the addDMLEventsListener only captures the binlog event after the transaction,
437+
and thus the transaction's binlog event is not captured, resulting in data loss.
438+
439+
If write a changelog into ghc table before ReadMigrationRangeValues, and the transaction commit blocks
440+
because the ack is not met, then the changelog will not be able to write, so the ReadMigrationRangeValues
441+
will not be run. When the changelog writes successfully, the ReadMigrationRangeValues will read the
442+
newly inserted data, thus Avoiding data loss due to the above problem.
443+
*/
428444
func (this *Applier) ReadMigrationRangeValues() error {
445+
if _, err := this.WriteChangelogState(string(ReadMigrationRangeValues)); err != nil {
446+
return err
447+
}
448+
429449
if err := this.ReadMigrationMinValues(this.migrationContext.UniqueKey); err != nil {
430450
return err
431451
}

go/logic/migrator.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type ChangelogState string
2626
const (
2727
GhostTableMigrated ChangelogState = "GhostTableMigrated"
2828
AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed"
29+
ReadMigrationRangeValues = "ReadMigrationRangeValues"
2930
)
3031

3132
func ReadChangelogState(s string) ChangelogState {
@@ -236,6 +237,8 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
236237
this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc)
237238
}()
238239
}
240+
case ReadMigrationRangeValues:
241+
// no-op event
239242
default:
240243
{
241244
return fmt.Errorf("Unknown changelog state: %+v", changelogState)

0 commit comments

Comments
 (0)