@@ -28,31 +28,24 @@ type GoMySQLReader struct {
28
28
LastAppliedRowsEventHint mysql.BinlogCoordinates
29
29
}
30
30
31
- func NewGoMySQLReader (migrationContext * base.MigrationContext ) (binlogReader * GoMySQLReader , err error ) {
32
- binlogReader = & GoMySQLReader {
31
+ func NewGoMySQLReader (migrationContext * base.MigrationContext ) * GoMySQLReader {
32
+ connectionConfig := migrationContext .InspectorConnectionConfig
33
+ return & GoMySQLReader {
33
34
migrationContext : migrationContext ,
34
- connectionConfig : migrationContext . InspectorConnectionConfig ,
35
+ connectionConfig : connectionConfig ,
35
36
currentCoordinates : mysql.BinlogCoordinates {},
36
37
currentCoordinatesMutex : & sync.Mutex {},
37
- binlogSyncer : nil ,
38
- binlogStreamer : nil ,
38
+ binlogSyncer : replication .NewBinlogSyncer (replication.BinlogSyncerConfig {
39
+ ServerID : uint32 (migrationContext .ReplicaServerId ),
40
+ Flavor : gomysql .MySQLFlavor ,
41
+ Host : connectionConfig .Key .Hostname ,
42
+ Port : uint16 (connectionConfig .Key .Port ),
43
+ User : connectionConfig .User ,
44
+ Password : connectionConfig .Password ,
45
+ TLSConfig : connectionConfig .TLSConfig (),
46
+ UseDecimal : true ,
47
+ }),
39
48
}
40
-
41
- serverId := uint32 (migrationContext .ReplicaServerId )
42
-
43
- binlogSyncerConfig := replication.BinlogSyncerConfig {
44
- ServerID : serverId ,
45
- Flavor : "mysql" ,
46
- Host : binlogReader .connectionConfig .Key .Hostname ,
47
- Port : uint16 (binlogReader .connectionConfig .Key .Port ),
48
- User : binlogReader .connectionConfig .User ,
49
- Password : binlogReader .connectionConfig .Password ,
50
- TLSConfig : binlogReader .connectionConfig .TLSConfig (),
51
- UseDecimal : true ,
52
- }
53
- binlogReader .binlogSyncer = replication .NewBinlogSyncer (binlogSyncerConfig )
54
-
55
- return binlogReader , err
56
49
}
57
50
58
51
// ConnectBinlogStreamer
@@ -145,15 +138,17 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha
145
138
defer this .currentCoordinatesMutex .Unlock ()
146
139
this .currentCoordinates .LogPos = int64 (ev .Header .LogPos )
147
140
}()
148
- if rotateEvent , ok := ev .Event .(* replication.RotateEvent ); ok {
141
+
142
+ switch binlogEvent := ev .Event .(type ) {
143
+ case * replication.RotateEvent :
149
144
func () {
150
145
this .currentCoordinatesMutex .Lock ()
151
146
defer this .currentCoordinatesMutex .Unlock ()
152
- this .currentCoordinates .LogFile = string (rotateEvent .NextLogName )
147
+ this .currentCoordinates .LogFile = string (binlogEvent .NextLogName )
153
148
}()
154
- this .migrationContext .Log .Infof ("rotate to next log from %s:%d to %s" , this .currentCoordinates .LogFile , int64 (ev .Header .LogPos ), rotateEvent .NextLogName )
155
- } else if rowsEvent , ok := ev . Event .( * replication.RowsEvent ); ok {
156
- if err := this .handleRowsEvent (ev , rowsEvent , entriesChannel ); err != nil {
149
+ this .migrationContext .Log .Infof ("rotate to next log from %s:%d to %s" , this .currentCoordinates .LogFile , int64 (ev .Header .LogPos ), binlogEvent .NextLogName )
150
+ case * replication.RowsEvent :
151
+ if err := this .handleRowsEvent (ev , binlogEvent , entriesChannel ); err != nil {
157
152
return err
158
153
}
159
154
}
0 commit comments