Skip to content

Add basic tests to migrator #1168

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 6, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 118 additions & 91 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package logic

import (
"context"
"errors"
"fmt"
"io"
"math"
Expand All @@ -22,6 +23,10 @@ import (
"github.com/github/gh-ost/go/sql"
)

var (
ErrMigratorUnsupportedRenameAlter = errors.New("ALTER statement seems to RENAME the table. This is not supported, and you should run your RENAME outside gh-ost.")
)

type ChangelogState string

const (
Expand Down Expand Up @@ -223,28 +228,22 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
case Migrated, ReadMigrationRangeValues:
// no-op event
case GhostTableMigrated:
{
this.ghostTableMigrated <- true
}
this.ghostTableMigrated <- true
case AllEventsUpToLockProcessed:
{
var applyEventFunc tableWriteFunc = func() error {
this.allEventsUpToLockProcessed <- changelogStateString
return nil
}
// at this point we know all events up to lock have been read from the streamer,
// because the streamer works sequentially. So those events are either already handled,
// or have event functions in applyEventsQueue.
// So as not to create a potential deadlock, we write this func to applyEventsQueue
// asynchronously, understanding it doesn't really matter.
go func() {
this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc)
}()
var applyEventFunc tableWriteFunc = func() error {
this.allEventsUpToLockProcessed <- changelogStateString
return nil
}
// at this point we know all events up to lock have been read from the streamer,
// because the streamer works sequentially. So those events are either already handled,
// or have event functions in applyEventsQueue.
// So as not to create a potential deadlock, we write this func to applyEventsQueue
// asynchronously, understanding it doesn't really matter.
go func() {
this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc)
}()
default:
{
return fmt.Errorf("Unknown changelog state: %+v", changelogState)
}
return fmt.Errorf("Unknown changelog state: %+v", changelogState)
}
this.migrationContext.Log.Infof("Handled changelog state %s", changelogState)
return nil
Expand All @@ -268,13 +267,13 @@ func (this *Migrator) listenOnPanicAbort() {
this.migrationContext.Log.Fatale(err)
}

// validateStatement validates the `alter` statement meets criteria.
// validateAlterStatement validates the `alter` statement meets criteria.
// At this time this means:
// - column renames are approved
// - no table rename allowed
func (this *Migrator) validateStatement() (err error) {
func (this *Migrator) validateAlterStatement() (err error) {
if this.parser.IsRenameTable() {
return fmt.Errorf("ALTER statement seems to RENAME the table. This is not supported, and you should run your RENAME outside gh-ost.")
return ErrMigratorUnsupportedRenameAlter
}
if this.parser.HasNonTrivialRenames() && !this.migrationContext.SkipRenamedColumns {
this.migrationContext.ColumnRenameMap = this.parser.GetNonTrivialRenames()
Expand Down Expand Up @@ -352,7 +351,7 @@ func (this *Migrator) Migrate() (err error) {
if err := this.parser.ParseAlterStatement(this.migrationContext.AlterStatement); err != nil {
return err
}
if err := this.validateStatement(); err != nil {
if err := this.validateAlterStatement(); err != nil {
return err
}

Expand Down Expand Up @@ -903,72 +902,49 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) {
}
}

// printStatus prints the progress status, and optionally additionally detailed
// dump of configuration.
// `rule` indicates the type of output expected.
// By default the status is written to standard output, but other writers can
// be used as well.
func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
if rule == NoPrintStatusRule {
return
}
writers = append(writers, os.Stdout)

elapsedTime := this.migrationContext.ElapsedTime()
elapsedSeconds := int64(elapsedTime.Seconds())
totalRowsCopied := this.migrationContext.GetTotalRowsCopied()
rowsEstimate := atomic.LoadInt64(&this.migrationContext.RowsEstimate) + atomic.LoadInt64(&this.migrationContext.RowsDeltaEstimate)
if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 {
// Done copying rows. The totalRowsCopied value is the de-facto number of rows,
// and there is no further need to keep updating the value.
rowsEstimate = totalRowsCopied
}
var progressPct float64
if rowsEstimate == 0 {
progressPct = 100.0
} else {
progressPct = 100.0 * float64(totalRowsCopied) / float64(rowsEstimate)
}
// we take the opportunity to update migration context with progressPct
this.migrationContext.SetProgressPct(progressPct)
// Before status, let's see if we should print a nice reminder for what exactly we're doing here.
shouldPrintMigrationStatusHint := (elapsedSeconds%600 == 0)
if rule == ForcePrintStatusAndHintRule {
shouldPrintMigrationStatusHint = true
}
if rule == ForcePrintStatusOnlyRule {
shouldPrintMigrationStatusHint = false
}
if shouldPrintMigrationStatusHint {
this.printMigrationStatusHint(writers...)
// getProgressPercent returns an estimate of migration progess as a percent.
func (this *Migrator) getProgressPercent(rowsEstimate int64) (progressPct float64) {
progressPct = 100.0
if rowsEstimate > 0 {
progressPct *= float64(this.migrationContext.GetTotalRowsCopied()) / float64(rowsEstimate)
}
return progressPct
}

var etaSeconds float64 = math.MaxFloat64
var etaDuration = time.Duration(base.ETAUnknown)
// getMigrationETA returns the estimated duration of the migration
func (this *Migrator) getMigrationETA(rowsEstimate int64) (eta string, duration time.Duration) {
duration = time.Duration(base.ETAUnknown)
progressPct := this.getProgressPercent(rowsEstimate)
if progressPct >= 100.0 {
etaDuration = 0
duration = 0
} else if progressPct >= 0.1 {
totalRowsCopied := this.migrationContext.GetTotalRowsCopied()
elapsedRowCopySeconds := this.migrationContext.ElapsedRowCopyTime().Seconds()
totalExpectedSeconds := elapsedRowCopySeconds * float64(rowsEstimate) / float64(totalRowsCopied)
etaSeconds = totalExpectedSeconds - elapsedRowCopySeconds
etaSeconds := totalExpectedSeconds - elapsedRowCopySeconds
if etaSeconds >= 0 {
etaDuration = time.Duration(etaSeconds) * time.Second
duration = time.Duration(etaSeconds) * time.Second
} else {
etaDuration = 0
duration = 0
}
}
this.migrationContext.SetETADuration(etaDuration)
var eta string
switch etaDuration {

switch duration {
case 0:
eta = "due"
case time.Duration(base.ETAUnknown):
eta = "N/A"
default:
eta = base.PrettifyDurationOutput(etaDuration)
eta = base.PrettifyDurationOutput(duration)
}

state := "migrating"
return eta, duration
}

// getMigrationStateAndETA returns the state and eta of the migration.
func (this *Migrator) getMigrationStateAndETA(rowsEstimate int64) (state, eta string, etaDuration time.Duration) {
eta, etaDuration = this.getMigrationETA(rowsEstimate)
state = "migrating"
if atomic.LoadInt64(&this.migrationContext.CountingRowsFlag) > 0 && !this.migrationContext.ConcurrentCountTableRows {
state = "counting rows"
} else if atomic.LoadInt64(&this.migrationContext.IsPostponingCutOver) > 0 {
Expand All @@ -977,27 +953,78 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
} else if isThrottled, throttleReason, _ := this.migrationContext.IsThrottled(); isThrottled {
state = fmt.Sprintf("throttled, %s", throttleReason)
}
return state, eta, etaDuration
}

var shouldPrintStatus bool
if rule == HeuristicPrintStatusRule {
if elapsedSeconds <= 60 {
shouldPrintStatus = true
} else if etaSeconds <= 60 {
shouldPrintStatus = true
} else if etaSeconds <= 180 {
shouldPrintStatus = (elapsedSeconds%5 == 0)
} else if elapsedSeconds <= 180 {
shouldPrintStatus = (elapsedSeconds%5 == 0)
} else if this.migrationContext.TimeSincePointOfInterest().Seconds() <= 60 {
shouldPrintStatus = (elapsedSeconds%5 == 0)
} else {
shouldPrintStatus = (elapsedSeconds%30 == 0)
}
// shouldPrintStatus returns true when the migrator is due to print status info.
func (this *Migrator) shouldPrintStatus(rule PrintStatusRule, elapsedSeconds int64, etaDuration time.Duration) (shouldPrint bool) {
if rule != HeuristicPrintStatusRule {
return true
}

etaSeconds := etaDuration.Seconds()
if elapsedSeconds <= 60 {
shouldPrint = true
} else if etaSeconds <= 60 {
shouldPrint = true
} else if etaSeconds <= 180 {
shouldPrint = (elapsedSeconds%5 == 0)
} else if elapsedSeconds <= 180 {
shouldPrint = (elapsedSeconds%5 == 0)
} else if this.migrationContext.TimeSincePointOfInterest().Seconds() <= 60 {
shouldPrint = (elapsedSeconds%5 == 0)
} else {
// Not heuristic
shouldPrintStatus = true
shouldPrint = (elapsedSeconds%30 == 0)
}

return shouldPrint
}

// shouldPrintMigrationStatus returns true when the migrator is due to print the migration status hint
func (this *Migrator) shouldPrintMigrationStatusHint(rule PrintStatusRule, elapsedSeconds int64) (shouldPrint bool) {
if elapsedSeconds%600 == 0 {
shouldPrint = true
} else if rule == ForcePrintStatusAndHintRule {
shouldPrint = true
}
return shouldPrint
}

// printStatus prints the progress status, and optionally additionally detailed
// dump of configuration.
// `rule` indicates the type of output expected.
// By default the status is written to standard output, but other writers can
// be used as well.
func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
if rule == NoPrintStatusRule {
return
}
writers = append(writers, os.Stdout)

elapsedTime := this.migrationContext.ElapsedTime()
elapsedSeconds := int64(elapsedTime.Seconds())
totalRowsCopied := this.migrationContext.GetTotalRowsCopied()
rowsEstimate := atomic.LoadInt64(&this.migrationContext.RowsEstimate) + atomic.LoadInt64(&this.migrationContext.RowsDeltaEstimate)
if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 {
// Done copying rows. The totalRowsCopied value is the de-facto number of rows,
// and there is no further need to keep updating the value.
rowsEstimate = totalRowsCopied
}

// we take the opportunity to update migration context with progressPct
progressPct := this.getProgressPercent(rowsEstimate)
this.migrationContext.SetProgressPct(progressPct)

// Before status, let's see if we should print a nice reminder for what exactly we're doing here.
if this.shouldPrintMigrationStatusHint(rule, elapsedSeconds) {
this.printMigrationStatusHint(writers...)
}
if !shouldPrintStatus {

// Get state + ETA
state, eta, etaDuration := this.getMigrationStateAndETA(rowsEstimate)
this.migrationContext.SetETADuration(etaDuration)

if !this.shouldPrintStatus(rule, elapsedSeconds, etaDuration) {
return
}

Expand All @@ -1016,7 +1043,7 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
)
this.applier.WriteChangelog(
fmt.Sprintf("copy iteration %d at %d", this.migrationContext.GetIteration(), time.Now().Unix()),
status,
state,
)
w := io.MultiWriter(writers...)
fmt.Fprintln(w, status)
Expand Down
Loading