Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions replication/binlogsyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -918,3 +918,24 @@ func (b *BinlogSyncer) killConnection(conn *client.Conn, id uint32) {
}
b.cfg.Logger.Infof("kill last connection id %d", id)
}

// GetSyncedGTIDSet returns the current GTID set.
func (b *BinlogSyncer) GetSyncedGTIDSet() GTIDSet {
b.m.RLock()
defer b.m.RUnlock()
return b.currGset
Copy link
Collaborator

@lance6716 lance6716 Sep 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I see the function name is "synced" GTID set, but b.currGset is not "synchronized to downsteam" GTID set, it just a "synchronized to BinlogSyncer" one. Maybe the function comment can describe it as "the GTID set that BinlogSyncer is fully fetched".
  2. However, "fully fetched" does not mean the caller of BinlogSyncer can see it, because GetEvent is FIFO and there are backlog event in BinlogStreamer.ch. I think GetSyncedGTIDSet is meaningless to caller.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your feedback!

  1. Could you clarify or provide documentation that explains what you mean by "synchronized to downstream" in this context?
  2. My goal is to pause syncing, fully flush the stream, and then retrieve the resulting GTID set from the BinlogSyncer. In a scenario where there is no backlog of events, wouldn't GetSyncedGTIDSet serve a useful purpose?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I support your use case is reading the binlog events and synchronize them to elsewhere, like save in files, replicate to another database or message queue. I want to mention the "Synced" in new function name GetSyncedGTIDSet does not mean those things.
  2. What do you mean by pause syncing? Close the BinlogSyncer or stop writing in upstream MySQL? I think some events may stuck in-flight in the network between MySQL and BinlogSyncer. So you can't make sure it's flushed. A better way is check events from GetEvent and record the GTID related event manually.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Yes, but "Synced" is used relative to the BinlogSyncer alone, as it's a function on the BinlogSyncer, right? From its perspective, what events is it aware of? Do you have a suggestion for a name that you think is more correct?
  2. I'd like to call Close() then StartSync(...). Are you suggesting skipping over BinlogSyncer in general and using BinlogStreamer directly to make sure I have a consistent stream of events?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I think your usage can not ensure correctness.

I'd like to call Close() then StartSync(...)

How do you use the BinlogStreamer? I assume that you have a goroutine reading binlog events from GetEvent and process them. And your usage is

t1: the application has processed many binlog events.
t2: somehow you want to close the BinlogStreamer and restart it later, so record the GTID set using the new getter
t3: now close the BinlogStreamer and do some stuff
t4: start BinlogStreamer from the GTID set and the application continues.

There's a problem in above workflow. There are residue binlogevents in BinlogStreamer.ch that are skipped, because they have not been processed in the time <= t1 and the GTID set get in t2 is larger than them.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While looking through this, we noticed that BinlogSyncer.parseEvent should close before acking when needStop and needACK are both true. Should I open a separate issue for that?

Yes, thanks for finding this problem. When needStop is true, the event is not sent to s.ch so BinlogSyncer should not ACK

We want to ACK after writing to disk.
We want the ability to retrieve the full GTID set that has been written to disk.

I still recommend you type cast the events get from event, err := streamer.GetEvent(ctx) and get the GTID set from some events like XIDEvent. The event is really the stuff the caller has written to disk so the GTID set has the same meaning. The new getter does not have the same meaning considering BinlogSyncer has inner channel buffer, it's error-prone to have extra action to match the two GTID set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the MySQL documentation:

The replica acknowledges receipt of a transaction's events only after they have been written to its relay log and flushed to disk.

Is achieving parity with the MySQL protocol a goal here? The asynchronous ordering of these two events in this library appears to violate that behavior.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a requirement for MySQL relay log, not this package. If you want to implement the behaviour like MySQL relay log, more adjustment should be made.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, right okay. We have that requirement so I'm going to close this PR and see how we can implement that invariant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you, by the way, for your attention and responsiveness

}

// IsRunning returns whether the BinlogSyncer is running.
func (b *BinlogSyncer) IsRunning() bool {
b.m.RLock()
defer b.m.RUnlock()
return b.running
}

// IsSemiSyncEnabled returns true if semi-synchronous replication is enabled.
func (b *BinlogSyncer) IsSemiSyncEnabled() bool {
b.m.RLock()
defer b.m.RUnlock()
return b.cfg.SemiSyncEnabled
}