Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 174 additions & 16 deletions pkg/backup/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,33 @@ func (b *Backuper) prepareRestoreMapping(objectMapping []string, objectType stri
if objectType == "database" {
b.cfg.General.RestoreDatabaseMapping[splitByColon[0]] = splitByColon[1]
} else {
b.cfg.General.RestoreTableMapping[splitByColon[0]] = splitByColon[1]
sourceKey := strings.TrimSpace(splitByColon[0])
targetValue := strings.TrimSpace(splitByColon[1])
originalSource := sourceKey
originalTarget := targetValue

if strings.Contains(sourceKey, ".") {
parts := strings.Split(sourceKey, ".")
if len(parts) == 2 {
sourceKey = parts[1]
log.Debug().Msgf("restore-table-mapping: extracted table name '%s' from '%s'", sourceKey, originalSource)
} else if len(parts) > 2 {
log.Warn().Msgf("restore-table-mapping: source '%s' has multiple dots, using as-is", originalSource)
}
}

if strings.Contains(targetValue, ".") {
parts := strings.Split(targetValue, ".")
if len(parts) == 2 {
targetValue = parts[1]
log.Debug().Msgf("restore-table-mapping: extracted table name '%s' from '%s'", targetValue, originalTarget)
} else if len(parts) > 2 {
log.Warn().Msgf("restore-table-mapping: target '%s' has multiple dots, using as-is", originalTarget)
}
}

b.cfg.General.RestoreTableMapping[sourceKey] = targetValue
log.Info().Msgf("restore-table-mapping: '%s' -> '%s'", sourceKey, targetValue)
}
}
}
Expand Down Expand Up @@ -1845,25 +1871,78 @@ func (b *Backuper) restoreDataEmbedded(ctx context.Context, backupName string, d
}

func (b *Backuper) restoreDataRegular(ctx context.Context, backupName string, backupMetadata metadata.BackupMetadata, tablePattern string, tablesForRestore ListOfTables, diskMap, diskTypes map[string]string, disks []clickhouse.Disk, skipProjections []string, replicatedCopyToDetached bool) error {
originalTablePattern := tablePattern
log.Info().Msgf("restoreDataRegular: START - tablePattern='%s' RestoreTableMapping=%+v", tablePattern, b.cfg.General.RestoreTableMapping)

if len(b.cfg.General.RestoreDatabaseMapping) > 0 {
tablePattern = b.changeTablePatternFromRestoreMapping(tablePattern, "database")
log.Debug().Msgf("restoreDataRegular: database mapping updated pattern from '%s' to '%s'", originalTablePattern, tablePattern)
}
// https://github.com/Altinity/clickhouse-backup/issues/937
if len(b.cfg.General.RestoreTableMapping) > 0 {
beforeTableMapping := tablePattern
tablePattern = b.changeTablePatternFromRestoreMapping(tablePattern, "table")
log.Info().Msgf("restoreDataRegular: table mapping updated pattern from '%s' to '%s'", beforeTableMapping, tablePattern)
} else {
log.Warn().Msgf("restoreDataRegular: RestoreTableMapping is EMPTY! Cannot apply table mapping.")
}

if err := b.applyMacrosToObjectDiskPath(ctx); err != nil {
return err
}

chTables, err := b.ch.GetTables(ctx, tablePattern)
for _, tbl := range tablesForRestore {
log.Info().Msgf("restoreDataRegular: candidate table metadata '%s.%s'", tbl.Database, tbl.Table)
}

mergePatterns := func(existing, extra string) string {
if extra == "" {
return existing
}
seen := make(map[string]struct{})
result := make([]string, 0)
add := func(part string) {
part = strings.TrimSpace(part)
if part == "" {
return
}
if _, ok := seen[part]; ok {
return
}
seen[part] = struct{}{}
result = append(result, part)
}
for _, item := range strings.Split(existing, ",") {
add(item)
}
for _, item := range strings.Split(extra, ",") {
add(item)
}
return strings.Join(result, ",")
}

lookupPattern := originalTablePattern
if len(b.cfg.General.RestoreDatabaseMapping) > 0 {
lookupPattern = mergePatterns(lookupPattern, b.changeTablePatternFromRestoreMapping(originalTablePattern, "database"))
}
if len(b.cfg.General.RestoreTableMapping) > 0 {
lookupPattern = mergePatterns(lookupPattern, b.changeTablePatternFromRestoreMapping(originalTablePattern, "table"))
}
if lookupPattern == "" {
lookupPattern = tablePattern
}

log.Info().Msgf("restoreDataRegular: calling GetTables with pattern '%s'", lookupPattern)
chTables, err := b.ch.GetTables(ctx, lookupPattern)
if err != nil {
return err
}
for _, systemTable := range chTables {
log.Info().Msgf("restoreDataRegular: system table available '%s.%s'", systemTable.Database, systemTable.Name)
}
dstTablesMap := b.prepareDstTablesMap(chTables)

missingTables := b.checkMissingTables(tablesForRestore, chTables)
missingTables := b.checkMissingTables(ctx, tablesForRestore, chTables)
if len(missingTables) > 0 {
return fmt.Errorf("%s is not created. Restore schema first or create missing tables manually", strings.Join(missingTables, ", "))
}
Expand Down Expand Up @@ -2202,30 +2281,83 @@ func (b *Backuper) findObjectDiskPartRecursive(ctx context.Context, backup metad
return "", "", fmt.Errorf("part %s have required flag in %s, but not found in %s", part.Name, backup.BackupName, backup.RequiredBackup)
}

func (b *Backuper) checkMissingTables(tablesForRestore ListOfTables, chTables []clickhouse.Table) []string {
func (b *Backuper) checkMissingTables(ctx context.Context, tablesForRestore ListOfTables, chTables []clickhouse.Table) []string {
var missingTables []string

type tableKey struct {
Database string
Table string
}
existing := make(map[tableKey]struct{}, len(chTables))
for _, chTable := range chTables {
existing[tableKey{Database: chTable.Database, Table: chTable.Name}] = struct{}{}
}

for _, table := range tablesForRestore {
dstDatabase := table.Database
dstTable := table.Table
if len(b.cfg.General.RestoreDatabaseMapping) > 0 {
if targetDB, isMapped := b.cfg.General.RestoreDatabaseMapping[table.Database]; isMapped {
dstDatabase = targetDB
originalDB := table.Database
originalTable := table.Table

targetDB := originalDB
if mappedDB, ok := b.cfg.General.RestoreDatabaseMapping[originalDB]; ok && mappedDB != "" {
targetDB = mappedDB
}

targetTable := originalTable
tableMapped := false
if mappedTable, ok := b.cfg.General.RestoreTableMapping[originalTable]; ok && mappedTable != "" {
targetTable = mappedTable
tableMapped = true
}

candidatesSet := map[tableKey]struct{}{}
addCandidate := func(db, tbl string) {
if db == "" || tbl == "" {
return
}
candidatesSet[tableKey{Database: db, Table: tbl}] = struct{}{}
}
if len(b.cfg.General.RestoreTableMapping) > 0 {
if targetTable, isMapped := b.cfg.General.RestoreTableMapping[table.Table]; isMapped {
dstTable = targetTable

addCandidate(targetDB, targetTable)
addCandidate(originalDB, originalTable)

for srcTable, dstTable := range b.cfg.General.RestoreTableMapping {
if dstTable == originalTable {
addCandidate(targetDB, srcTable)
addCandidate(originalDB, srcTable)
}
if dstTable == targetTable {
addCandidate(targetDB, srcTable)
addCandidate(originalDB, srcTable)
}
}

found := false
for _, chTable := range chTables {
if (dstDatabase == chTable.Database) && (dstTable == chTable.Name) {
candidateLoop:
for cand := range candidatesSet {
if _, ok := existing[cand]; ok {
found = true
break
}
pattern := fmt.Sprintf("%s.%s", cand.Database, cand.Table)
tables, err := b.ch.GetTables(ctx, pattern)
if err != nil {
log.Warn().Msgf("checkMissingTables: can't get tables by pattern '%s': %v", pattern, err)
continue
}
for _, t := range tables {
existing[tableKey{Database: t.Database, Table: t.Name}] = struct{}{}
if t.Database == cand.Database && t.Name == cand.Table {
found = true
break candidateLoop
}
}
}
if !found {
missingTables = append(missingTables, fmt.Sprintf("'%s.%s'", dstDatabase, table.Table))
displayName := fmt.Sprintf("'%s.%s'", targetDB, targetTable)
if tableMapped || targetDB != originalDB || targetTable != originalTable {
displayName = fmt.Sprintf("%s (mapped from '%s.%s')", displayName, originalDB, originalTable)
}
missingTables = append(missingTables, displayName)
}
}
return missingTables
Expand All @@ -2248,10 +2380,36 @@ func (b *Backuper) changeTablePatternFromRestoreMapping(tablePattern, objType st
case "database":
mapping = b.cfg.General.RestoreDatabaseMapping
case "table":
mapping = b.cfg.General.RestoreDatabaseMapping
mapping = b.cfg.General.RestoreTableMapping
default:
return ""
}

if objType == "table" {
for sourceTable, targetTable := range mapping {
if tablePattern != "" {
sourceTablePattern := fmt.Sprintf(`(^|,)([^,]*\.)?%s($|,)`, regexp.QuoteMeta(sourceTable))
sourceTableRE := regexp.MustCompile(sourceTablePattern)

if sourceTableRE.MatchString(tablePattern) {
tablePattern = sourceTableRE.ReplaceAllStringFunc(tablePattern, func(match string) string {
submatch := sourceTableRE.FindStringSubmatch(match)
prefix := submatch[1]
dbPart := submatch[2]
suffix := submatch[3]

result := prefix + dbPart + targetTable + suffix
log.Debug().Msgf("changeTablePatternFromRestoreMapping: matched='%s' prefix='%s' dbPart='%s' suffix='%s' result='%s'", match, prefix, dbPart, suffix, result)
return result
})
log.Info().Msgf("changeTablePatternFromRestoreMapping: updated pattern from contains '%s' to '%s'", sourceTable, tablePattern)
}
}
}
return tablePattern
}

// Original logic for database mapping
for sourceObj, targetObj := range mapping {
if tablePattern != "" {
sourceObjRE := regexp.MustCompile(fmt.Sprintf("(^%s.*)|(,%s.*)", sourceObj, sourceObj))
Expand Down
Loading
Loading