Skip to content

Commit 4fcdc09

Browse files
authored
Merge pull request #26 from RiveryIO/fix/eitam/safe_data_handling
Fix/eitam/safe data handling
2 parents 4c4eab5 + 051d6b6 commit 4fcdc09

File tree

3 files changed

+100
-127
lines changed

3 files changed

+100
-127
lines changed

canal/canal.go

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -436,27 +436,29 @@ func isSafeIdentifier(s string) bool {
436436

437437
func (c *Canal) GenerateCharsetQuery() (string, error) {
438438
query := `
439-
SELECT
440-
c.ORDINAL_POSITION,
441-
CASE
442-
WHEN c.CHARACTER_SET_NAME IS NOT NULL THEN c.CHARACTER_SET_NAME
443-
WHEN c.DATA_TYPE IN ('binary','varbinary','tinyblob','blob','mediumblob','longblob') THEN col.CHARACTER_SET_NAME
444-
END AS CHARACTER_SET_NAME,
445-
c.COLUMN_NAME
446-
FROM
447-
information_schema.COLUMNS c
448-
LEFT JOIN information_schema.TABLES t
449-
ON t.TABLE_SCHEMA = c.TABLE_SCHEMA AND t.TABLE_NAME = c.TABLE_NAME
450-
LEFT JOIN information_schema.COLLATIONS col
451-
ON col.COLLATION_NAME = t.TABLE_COLLATION
452-
WHERE
453-
c.TABLE_SCHEMA = ?
454-
AND c.TABLE_NAME = ?
455-
AND (c.CHARACTER_SET_NAME IS NOT NULL OR c.DATA_TYPE IN ('binary','varbinary','tinyblob','blob','mediumblob','longblob'));
456-
`
439+
SELECT
440+
c.ORDINAL_POSITION,
441+
CASE
442+
WHEN c.CHARACTER_SET_NAME IS NOT NULL THEN c.CHARACTER_SET_NAME
443+
WHEN c.DATA_TYPE IN ('binary','varbinary','tinyblob','blob','mediumblob','longblob') THEN col.CHARACTER_SET_NAME
444+
ELSE col.CHARACTER_SET_NAME
445+
END AS CHARACTER_SET_NAME,
446+
c.COLUMN_NAME
447+
FROM
448+
information_schema.COLUMNS c
449+
LEFT JOIN information_schema.TABLES t
450+
ON t.TABLE_SCHEMA = c.TABLE_SCHEMA AND t.TABLE_NAME = c.TABLE_NAME
451+
LEFT JOIN information_schema.COLLATIONS col
452+
ON col.COLLATION_NAME = t.TABLE_COLLATION
453+
WHERE
454+
c.TABLE_SCHEMA = ?
455+
AND c.TABLE_NAME = ?
456+
AND (c.CHARACTER_SET_NAME IS NOT NULL
457+
OR c.DATA_TYPE IN ('binary','varbinary','tinyblob','blob','mediumblob','longblob')
458+
OR c.DATA_TYPE IN ('varchar','char','text','tinytext','mediumtext','longtext'));
459+
`
457460

458461
return query, nil
459-
460462
}
461463

462464
func (c *Canal) setColumnsCharsetFromRows(tableRegex string, rows *sql.Rows) error {
@@ -507,20 +509,19 @@ func (c *Canal) GetColumnsCharsets() error {
507509
if err != nil {
508510
return fmt.Errorf("failed to generate charset query: %w", err)
509511
}
510-
511512
rows, err := db.QueryContext(c.ctx, query, dbName, tableName)
512513
if err != nil {
513514
return fmt.Errorf("error occurred while executing query: %s on db: %s on table: %s. error: %v",
514515
query, dbName, tableName, errors.Trace(err))
515516
}
516-
517517
// Ensure rows are closed after processing
518518
func() {
519519
defer rows.Close()
520520
if err := c.setColumnsCharsetFromRows(tableRegex, rows); err != nil {
521521
panic(fmt.Errorf("failed to set charset from rows: %w", err))
522522
}
523523
}()
524+
524525
}
525526

526527
return nil

replication/row_event.go

Lines changed: 6 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1363,66 +1363,24 @@ func supportsSmartQuotes(enc encoding.Encoding) bool {
13631363
return false
13641364
}
13651365

1366-
func replaceUnsupportedCharacters(data []byte, length int) []byte {
1367-
if len(data) == 0 {
1368-
return data
1369-
}
1370-
1371-
var content []byte
1372-
var prefix []byte
1373-
var contentLength int
1374-
var prefixLen int
1375-
1376-
if length > 255 {
1377-
// 2-byte length prefix (LittleEndian)
1378-
prefixLen = 2
1379-
contentLength = int(binary.LittleEndian.Uint16(data[:2]))
1380-
if contentLength > len(data)-prefixLen {
1381-
contentLength = len(data) - prefixLen
1382-
}
1383-
content = data[prefixLen : prefixLen+contentLength]
1384-
} else {
1385-
// 1-byte length prefix
1386-
prefixLen = 1
1387-
contentLength = int(data[0])
1388-
if contentLength > len(data)-prefixLen {
1389-
contentLength = len(data) - prefixLen
1390-
}
1391-
content = data[prefixLen : prefixLen+contentLength]
1392-
}
1393-
1394-
// Replace unsupported characters
1395-
content = normalizeSmartQuotes(content)
1396-
1397-
// Rebuild prefix with new length
1398-
if prefixLen == 2 {
1399-
prefix = make([]byte, 2)
1400-
binary.LittleEndian.PutUint16(prefix, uint16(len(content)))
1401-
} else {
1402-
prefix = []byte{byte(len(content))}
1403-
}
1404-
1405-
return append(prefix, content...)
1406-
}
1407-
14081366
func decodeStringWithEncoder(data []byte, length int, enc encoding.Encoding) (v string, n int) {
1409-
// Define the Latin1 decoder
14101367
decoder := enc.NewDecoder()
1411-
if !supportsSmartQuotes(enc) {
1412-
data = replaceUnsupportedCharacters(data, length)
1413-
}
14141368

14151369
if length < 256 {
1416-
// If the length is smaller than 256, extract the length from the first byte
14171370
length = int(data[0])
14181371
n = length + 1
14191372
decodedBytes, _, _ := transform.Bytes(decoder, data[1:n])
1373+
if !supportsSmartQuotes(enc) {
1374+
decodedBytes = normalizeSmartQuotes(decodedBytes)
1375+
}
14201376
v = string(decodedBytes)
14211377
} else {
1422-
// If the length is larger, extract it using LittleEndian
14231378
length = int(binary.LittleEndian.Uint16(data[0:]))
14241379
n = length + 2
14251380
decodedBytes, _, _ := transform.Bytes(decoder, data[2:n])
1381+
if !supportsSmartQuotes(enc) {
1382+
decodedBytes = normalizeSmartQuotes(decodedBytes)
1383+
}
14261384
v = string(decodedBytes)
14271385
}
14281386

replication/row_event_test.go

Lines changed: 72 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1363,44 +1363,44 @@ func TestDecodeStringLatin1(t *testing.T) {
13631363
wantRead: 6,
13641364
},
13651365
{
1366-
name: "Short Latin1 string with ‘",
1367-
input: append(
1368-
append([]byte{7}, []byte{0xe2, 'f', 'h', 0xe9}...),
1369-
[]byte("‘")...),
1370-
length: 5,
1371-
wantStr: "âfhé'",
1366+
name: "Short Latin1 string with UTF-8 smart quote '",
1367+
input: append([]byte{7}, append(
1368+
[]byte{0xe2, 'f', 'h', 0xe9},
1369+
[]byte{0xE2, 0x80, 0x99}...)...), // UTF-8 ' (3 bytes)
1370+
length: 255,
13721371
charset: "latin1",
1373-
wantRead: 6,
1372+
wantStr: "âfhéâ ", // UTF-8 smart quote decoded as Latin-1 becomes "â "
1373+
wantRead: 8,
13741374
},
13751375
{
1376-
name: "Short Latin1 string with ’",
1377-
input: append(
1378-
append([]byte{7}, []byte{0xe2, 'f', 'h', 0xe9}...),
1379-
[]byte("’")...),
1380-
length: 5,
1381-
wantStr: "âfhé'",
1376+
name: "Short Latin1 string with UTF-8 smart quote '",
1377+
input: append([]byte{7}, append(
1378+
[]byte{0xe2, 'f', 'h', 0xe9},
1379+
[]byte{0xE2, 0x80, 0x98}...)...), // UTF-8 ' (3 bytes)
1380+
length: 255,
13821381
charset: "latin1",
1383-
wantRead: 6,
1382+
wantStr: "âfhéâ ", // UTF-8 smart quote decoded as Latin-1
1383+
wantRead: 8,
13841384
},
13851385
{
1386-
name: "Short Latin1 string with ”",
1387-
input: append(
1388-
append([]byte{7}, []byte{0xe2, 'f', 'h', 0xe9}...),
1389-
[]byte("”")...),
1390-
length: 5,
1391-
wantStr: "âfhé\"",
1386+
name: "Short Latin1 string with UTF-8 smart quote ",
1387+
input: append([]byte{7}, append(
1388+
[]byte{0xe2, 'f', 'h', 0xe9},
1389+
[]byte{0xE2, 0x80, 0x9C}...)...), // UTF-8 " (3 bytes)
1390+
length: 255,
13921391
charset: "latin1",
1393-
wantRead: 6,
1392+
wantStr: "âfhéâ ", // UTF-8 smart quote decoded as Latin-1
1393+
wantRead: 8,
13941394
},
13951395
{
1396-
name: "Short Latin1 string with “",
1397-
input: append(
1398-
append([]byte{7}, []byte{0xe2, 'f', 'h', 0xe9}...),
1399-
[]byte("“")...),
1400-
length: 5,
1401-
wantStr: "âfhé\"",
1396+
name: "Short Latin1 string with UTF-8 smart quote ",
1397+
input: append([]byte{7}, append(
1398+
[]byte{0xe2, 'f', 'h', 0xe9},
1399+
[]byte{0xE2, 0x80, 0x9D}...)...), // UTF-8 " (3 bytes)
1400+
length: 255,
14021401
charset: "latin1",
1403-
wantRead: 6,
1402+
wantStr: "âfhéâ ", // UTF-8 smart quote decoded as Latin-1
1403+
wantRead: 8,
14041404
},
14051405
{
14061406
name: "Invalid UTF-8 valid Latin1",
@@ -1414,7 +1414,8 @@ func TestDecodeStringLatin1(t *testing.T) {
14141414
name: "Latin1 with null byte",
14151415
input: append([]byte{4}, []byte{'A', 0x00, 'B', 'C'}...), // A\0BC
14161416
length: 4,
1417-
wantStr: "A\u0000BC",
1417+
charset: "latin1",
1418+
wantStr: "A BC", // null byte becomes space after sanitization
14181419
wantRead: 5,
14191420
},
14201421
{
@@ -1445,54 +1446,67 @@ func TestDecodeStringLatin1(t *testing.T) {
14451446
name: "Long string (>255, 2-byte length)",
14461447
input: func() []byte {
14471448
buf := new(bytes.Buffer)
1448-
err := binary.Write(buf, binary.LittleEndian, uint16(6))
1449-
if err != nil {
1450-
return nil
1451-
}
1452-
buf.Write([]byte{0xe2, 'f', 'g', 'h', 0xe9}) // 'âfghé'
1449+
binary.Write(buf, binary.LittleEndian, uint16(6))
1450+
buf.Write([]byte{0xe2, 'f', 'g', 'h', 0xe9, 0x00}) // 'âfghé\0'
14531451
return buf.Bytes()
14541452
}(),
14551453
length: 300,
14561454
charset: "latin1",
1457-
wantStr: "âfghé",
1458-
wantRead: 7,
1455+
wantStr: "âfghé ", // null byte becomes space
1456+
wantRead: 8,
14591457
},
14601458
{
1461-
name: "Term Date and Retro Term Policy",
1462-
input: append([]byte{byte(len("‘30 day term date’"))}, []byte("‘30 day term date’")...),
1463-
length: len("‘30 day term date’"),
1464-
wantStr: "'30 day term date'",
1459+
name: "UTF-8 smart quotes in text",
1460+
input: append([]byte{22}, append(
1461+
[]byte{0xE2, 0x80, 0x98}, // 3 bytes
1462+
append([]byte("30 day term date"), // 16 bytes
1463+
[]byte{0xE2, 0x80, 0x99}...)...)...), // 3 bytes
1464+
length: 255,
14651465
charset: "latin1",
1466-
wantRead: 19, // Include the prepended length byte
1466+
wantStr: "â 30 day term dateâ ",
1467+
wantRead: 23,
14671468
},
14681469
{
1469-
name: "Term Date and Retro Term Policy",
1470-
input: append([]byte{byte(len("“30 day term date”"))}, []byte("“30 day term date”")...),
1471-
length: len("“30 day term date”"),
1472-
wantStr: "\"30 day term date\"",
1470+
name: "UTF-8 double quotes in text",
1471+
input: append([]byte{22}, append(
1472+
[]byte{0xE2, 0x80, 0x9C},
1473+
append([]byte("30 day term date"), // 16 bytes
1474+
[]byte{0xE2, 0x80, 0x9D}...)...)...), // 3 bytes
1475+
length: 255,
14731476
charset: "latin1",
1474-
wantRead: 19, // Include the prepended length byte
1477+
wantStr: "â 30 day term dateâ ",
1478+
wantRead: 23, // 1 (length byte) + 22 (content)
14751479
},
1476-
14771480
{
1478-
name: "UTF-8 followed by Latin1",
1479-
input: func() []byte {
1480-
data := []byte{ // Hello' âfghé
1481-
'H', 'e', 'l', 'l', 'o', ' ', 0x27, ' ', 0xe2, 'f', 'g', 'h', 0xe9,
1482-
}
1483-
return append([]byte{13}, data...) // Prepend length byte
1484-
}(),
1485-
length: 12,
1486-
wantStr: "Hello ' âfghé",
1481+
name: "UTF-8 followed by Latin1",
1482+
input: append([]byte{13}, []byte{'H', 'e', 'l', 'l', 'o', ' ', '\'', ' ', 0xe2, 'f', 'g', 'h', 0xe9}...),
1483+
length: 255,
14871484
charset: "latin1",
1485+
wantStr: "Hello ' âfghé",
14881486
wantRead: 14,
14891487
},
14901488
{
1491-
name: "UTF-8 with Latin1 byte after UTF-8 valid chars",
1492-
input: append([]byte{7}, []byte{0xe2, ' ', 'H', 'e', 'l', 'l', 'o'}...), // 'Hello â'
1489+
name: "Latin1 byte followed by ASCII",
1490+
input: append([]byte{7}, []byte{0xe2, ' ', 'H', 'e', 'l', 'l', 'o'}...),
14931491
length: 7,
1492+
charset: "latin1",
14941493
wantStr: "â Hello",
1494+
wantRead: 8,
1495+
},
1496+
{
1497+
name: "Windows-1252 single-byte smart quote (0x92)",
1498+
input: append([]byte{5}, []byte{'J', 'o', 'h', 'n', 0x92}...), // John' in Windows-1252
1499+
length: 5,
1500+
charset: "latin1",
1501+
wantStr: "John ", // 0x92 is control character in Latin-1, becomes space
1502+
wantRead: 6,
1503+
},
1504+
{
1505+
name: "Windows-1252 double quotes (0x93, 0x94)",
1506+
input: append([]byte{7}, []byte{0x93, 'H', 'e', 'l', 'l', 'o', 0x94}...),
1507+
length: 7,
14951508
charset: "latin1",
1509+
wantStr: " Hello ", // 0x93 and 0x94 are control chars in Latin-1
14961510
wantRead: 8,
14971511
},
14981512
}

0 commit comments

Comments
 (0)