Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions synapse/replication/slave/storage/push_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams import PushRulesStream
from synapse.storage.databases.main.push_rule import PushRulesWorkerStore

Expand All @@ -25,9 +24,6 @@ def get_max_push_rules_stream_id(self):
return self._push_rules_stream_id_gen.get_current_token()

def process_replication_rows(self, stream_name, instance_name, token, rows):
# We assert this for the benefit of mypy
assert isinstance(self._push_rules_stream_id_gen, SlavedIdTracker)

if stream_name == PushRulesStream.NAME:
self._push_rules_stream_id_gen.advance(instance_name, token)
for row in rows:
Expand Down
11 changes: 7 additions & 4 deletions synapse/storage/databases/main/push_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.storage.push_rule import InconsistentRuleException, RuleNotFoundException
from synapse.storage.util.id_generators import StreamIdGenerator
from synapse.storage.util.id_generators import (
AbstractStreamIdTracker,
StreamIdGenerator,
)
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache
Expand Down Expand Up @@ -82,9 +85,9 @@ def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
super().__init__(database, db_conn, hs)

if hs.config.worker.worker_app is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha.

It feels a bit icky, but I take your point that it's no worse than the situation we have today.

Is the pain here that we end up checking hs.config.worker.worker_app midway through? It seems odd to have to do so when the store is called a PushRuleWorkerStore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's more to do with what's in process_replication_rowss - we would have to check that the id_gens we update are SlavedIdTrackers or MultiWriterIdGenerators otherwise.

There are really 3 interfaces:
IdStatus(?): get_current_token(), get_current_token_for_writer(), for getting the current stream id
IdTracker(IdStatus): advance(), for instances that get notified about writes
IdGenerator(IdStatus): get_next(), get_next_mult(), for instances that do the writes

StreamIdGenerator is used by instances that perform writes, with the expectation that they are the only writer.
MultiWriterIdGenerator is both an IdGenerator and IdTracker but requires Postgres, which is why StreamIdGenerator exists.

I didn't want to create a third abstract class. And get_current_token_for_writer was already out of place on StreamIdGenerator

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for writing that out---this is gold dust! Would it be worth sticking that as a module-level comment or docstring somewhere?

Copy link
Contributor Author

@squahtx squahtx Nov 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It took a while to understand all the id trackers/generators well enough to write docstrings. Let me know what you think.

self._push_rules_stream_id_gen: Union[
StreamIdGenerator, SlavedIdTracker
] = StreamIdGenerator(db_conn, "push_rules_stream", "stream_id")
self._push_rules_stream_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
db_conn, "push_rules_stream", "stream_id"
)
else:
self._push_rules_stream_id_gen = SlavedIdTracker(
db_conn, "push_rules_stream", "stream_id"
Expand Down