Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 20 additions & 39 deletions synapse/storage/util/id_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,38 @@ def _load_current_id(
class AbstractStreamIdGenerator(metaclass=abc.ABCMeta):
@abc.abstractmethod
def get_next(self) -> AsyncContextManager[int]:
"""
Usage:
async with stream_id_gen.get_next() as stream_id:
# ... persist event ...
"""
raise NotImplementedError()

@abc.abstractmethod
def get_next_mult(self, n: int) -> AsyncContextManager[Sequence[int]]:
"""
Usage:
async with stream_id_gen.get_next(n) as stream_ids:
# ... persist events ...
"""
raise NotImplementedError()

@abc.abstractmethod
def get_current_token(self) -> int:
"""Returns the maximum stream id such that all stream ids less than or
equal to it have been successfully persisted.

Returns:
The maximum stream id.
"""
raise NotImplementedError()

@abc.abstractmethod
def get_current_token_for_writer(self, instance_name: str) -> int:
"""Returns the position of the given writer.

For streams with single writers this is equivalent to `get_current_token`.
"""
raise NotImplementedError()


Expand Down Expand Up @@ -158,11 +178,6 @@ def __init__(
self._unfinished_ids: OrderedDict[int, int] = OrderedDict()

def get_next(self) -> AsyncContextManager[int]:
"""
Usage:
async with stream_id_gen.get_next() as stream_id:
# ... persist event ...
"""
Comment on lines -161 to -165
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd argue that these bits of docs shouldn't be removed, i'll note them in the next few comments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They've been moved into AbstractStreamId{Generator,Tracker}. I was under the impression that IDEs would pick it up automatically. inspect.getdoc is noted to do the right thing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, interesting, though i'd like a confirmation from a synapse member on this change, because im not 100% sure myself

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’ll be closing the other comments, apologies if it came over as overwhelming, I simply meant to highlight all places where this issued occurred without much duplication

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a problem!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems utterly reasonable.

There's also a class IdGenerator. I guess that a StreamIdGenerator is more specific and that the two aren't directly related?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't feeling up for making sense of IdGenerator and unifying it with the stream id generators. They do look vaguely similar.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out IdGenerator is far simpler than StreamIdGenerator. The former only issues IDs while the latter has to track when they are committed.

with self._lock:
self._current += self._step
next_id = self._current
Expand All @@ -180,11 +195,6 @@ def manager() -> Generator[int, None, None]:
return _AsyncCtxManagerWrapper(manager())

def get_next_mult(self, n: int) -> AsyncContextManager[Sequence[int]]:
"""
Usage:
async with stream_id_gen.get_next(n) as stream_ids:
# ... persist events ...
"""
with self._lock:
next_ids = range(
self._current + self._step,
Expand All @@ -208,24 +218,13 @@ def manager() -> Generator[Sequence[int], None, None]:
return _AsyncCtxManagerWrapper(manager())

def get_current_token(self) -> int:
"""Returns the maximum stream id such that all stream ids less than or
equal to it have been successfully persisted.

Returns:
The maximum stream id.
"""
with self._lock:
if self._unfinished_ids:
return next(iter(self._unfinished_ids)) - self._step

return self._current

def get_current_token_for_writer(self, instance_name: str) -> int:
"""Returns the position of the given writer.

For streams with single writers this is equivalent to
`get_current_token`.
"""
return self.get_current_token()


Expand Down Expand Up @@ -475,12 +474,6 @@ def _load_next_mult_id_txn(self, txn: Cursor, n: int) -> List[int]:
return stream_ids

def get_next(self) -> AsyncContextManager[int]:
"""
Usage:
async with stream_id_gen.get_next() as stream_id:
# ... persist event ...
"""

# If we have a list of instances that are allowed to write to this
# stream, make sure we're in it.
if self._writers and self._instance_name not in self._writers:
Expand All @@ -492,12 +485,6 @@ def get_next(self) -> AsyncContextManager[int]:
return cast(AsyncContextManager[int], _MultiWriterCtxManager(self))

def get_next_mult(self, n: int) -> AsyncContextManager[List[int]]:
"""
Usage:
async with stream_id_gen.get_next_mult(5) as stream_ids:
# ... persist events ...
"""

# If we have a list of instances that are allowed to write to this
# stream, make sure we're in it.
if self._writers and self._instance_name not in self._writers:
Expand Down Expand Up @@ -597,15 +584,9 @@ def _mark_id_as_finished(self, next_id: int) -> None:
self._add_persisted_position(next_id)

def get_current_token(self) -> int:
"""Returns the maximum stream id such that all stream ids less than or
equal to it have been successfully persisted.
"""

return self.get_persisted_upto_position()

def get_current_token_for_writer(self, instance_name: str) -> int:
"""Returns the position of the given writer."""

# If we don't have an entry for the given instance name, we assume it's a
# new writer.
#
Expand Down