Skip to content

Commit a560088

Browse files
author
Mark Haines
committed
Start implementing incremental initial sync
1 parent 4365130 commit a560088

File tree

3 files changed

+241
-34
lines changed

3 files changed

+241
-34
lines changed

synapse/events/utils.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ def serialize_event(e, time_now_ms, client_event=True, strip_ids=False):
137137
d.pop("depth", None)
138138
d.pop("unsigned", None)
139139
d.pop("origin", None)
140+
d.pop("prev_state", None)
140141

141142
if strip_ids:
142143
d.pop("room_id", None)

synapse/handlers/sync.py

Lines changed: 208 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,18 @@
3737
])
3838

3939

40-
RoomSyncResult = collections.namedtuple("RoomSyncResult", [
40+
class RoomSyncResult(collections.namedtuple("RoomSyncResult", [
4141
"room_id",
4242
"limited",
4343
"published",
44-
"events", # dict of event
44+
"events",
4545
"state",
4646
"prev_batch",
47-
])
47+
])):
48+
__slots__ = []
49+
50+
def __nonzero__(self):
51+
return bool(self.events or self.state)
4852

4953

5054
class SyncResult(collections.namedtuple("SyncResult", [
@@ -56,7 +60,9 @@ class SyncResult(collections.namedtuple("SyncResult", [
5660
__slots__ = []
5761

5862
def __nonzero__(self):
59-
return self.private_user_data or self.public_user_data or self.rooms
63+
return bool(
64+
self.private_user_data or self.public_user_data or self.rooms
65+
)
6066

6167

6268
class SyncHandler(BaseHandler):
@@ -67,7 +73,13 @@ def __init__(self, hs):
6773
self.clock = hs.get_clock()
6874

6975
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0):
70-
if timeout == 0:
76+
"""Get the sync for a client if we have new data for it now. Otherwise
77+
wait for new data to arrive on the server. If the timeout expires, then
78+
return an empty sync result.
79+
Returns:
80+
A Deferred SyncResult.
81+
"""
82+
if timeout == 0 or since_token is None:
7183
return self.current_sync_for_user(sync_config, since_token)
7284
else:
7385
def current_sync_callback(since_token):
@@ -79,13 +91,25 @@ def current_sync_callback(since_token):
7991
)
8092

8193
def current_sync_for_user(self, sync_config, since_token=None):
94+
"""Get the sync for client needed to match what the server has now.
95+
Returns:
96+
A Deferred SyncResult.
97+
"""
8298
if since_token is None:
8399
return self.initial_sync(sync_config)
84100
else:
85-
return self.incremental_sync(sync_config)
101+
if sync_config.gap:
102+
return self.incremental_sync_with_gap(sync_config, since_token)
103+
else:
104+
#TODO(mjark): Handle gapless sync
105+
pass
86106

87107
@defer.inlineCallbacks
88108
def initial_sync(self, sync_config):
109+
"""Get a sync for a client which is starting without any state
110+
Returns:
111+
A Deferred SyncResult.
112+
"""
89113
if sync_config.sort == "timeline,desc":
90114
# TODO(mjark): Handle going through events in reverse order?.
91115
# What does "most recent events" mean when applying the limits mean
@@ -114,25 +138,86 @@ def initial_sync(self, sync_config):
114138

115139
rooms = []
116140
for event in room_list:
117-
#TODO (mjark): Apply the event filter in sync_config.
118-
recent_events, token = yield self.store.get_recent_events_for_room(
119-
event.room_id,
120-
limit=sync_config.limit,
121-
end_token=now_token.room_key,
122-
)
123-
prev_batch_token = now_token.copy_and_replace("room_key", token[0])
124-
current_state_events = yield self.state_handler.get_current_state(
125-
event.room_id
141+
room_sync = yield self.initial_sync_for_room(
142+
event.room_id, sync_config, now_token, published_room_ids
126143
)
144+
rooms.append(room_sync)
145+
146+
defer.returnValue(SyncResult(
147+
public_user_data=presence,
148+
private_user_data=[],
149+
rooms=rooms,
150+
next_batch=now_token,
151+
))
152+
153+
@defer.inlineCallbacks
154+
def intial_sync_for_room(self, room_id, sync_config, now_token,
155+
published_room_ids):
156+
"""Sync a room for a client which is starting without any state
157+
Returns:
158+
A Deferred RoomSyncResult.
159+
"""
160+
recent_events, token = yield self.store.get_recent_events_for_room(
161+
room_id,
162+
limit=sync_config.limit,
163+
end_token=now_token.room_key,
164+
)
165+
prev_batch_token = now_token.copy_and_replace("room_key", token[0])
166+
current_state_events = yield self.state_handler.get_current_state(
167+
room_id
168+
)
169+
170+
defer.returnValue(RoomSyncResult(
171+
room_id=room_id,
172+
published=room_id in published_room_ids,
173+
events=recent_events,
174+
prev_batch=prev_batch_token,
175+
state=current_state_events,
176+
limited=True,
177+
))
178+
179+
180+
@defer.inlineCallbacks
181+
def incremental_sync_with_gap(self, sync_config, since_token):
182+
""" Get the incremental delta needed to bring the client up to
183+
date with the server.
184+
Returns:
185+
A Deferred SyncResult.
186+
"""
187+
if sync_config.sort == "timeline,desc":
188+
# TODO(mjark): Handle going through events in reverse order?.
189+
# What does "most recent events" mean when applying the limits mean
190+
# in this case?
191+
raise NotImplementedError()
192+
193+
now_token = yield self.event_sources.get_current_token()
194+
195+
presence_stream = self.event_sources.sources["presence"]
196+
pagination_config = PaginationConfig(
197+
from_token=since_token, to_token=now_token
198+
)
199+
presence, _ = yield presence_stream.get_pagination_rows(
200+
user=sync_config.user,
201+
pagination_config=pagination_config.get_source_config("presence"),
202+
key=None
203+
)
204+
room_list = yield self.store.get_rooms_for_user_where_membership_is(
205+
user_id=sync_config.user.to_string(),
206+
membership_list=[Membership.INVITE, Membership.JOIN]
207+
)
208+
209+
# TODO (mjark): Does public mean "published"?
210+
published_rooms = yield self.store.get_rooms(is_public=True)
211+
published_room_ids = set(r["room_id"] for r in published_rooms)
127212

128-
rooms.append(RoomSyncResult(
129-
room_id=event.room_id,
130-
published=event.room_id in published_room_ids,
131-
events=recent_events,
132-
prev_batch=prev_batch_token,
133-
state=current_state_events,
134-
limited=True,
135-
))
213+
rooms = []
214+
for event in room_list:
215+
room_sync = yield self.incremental_sync_with_gap_for_room(
216+
event.room_id, sync_config, since_token, now_token,
217+
published_room_ids
218+
)
219+
if room_sync:
220+
rooms.append(room_sync)
136221

137222
defer.returnValue(SyncResult(
138223
public_user_data=presence,
@@ -143,5 +228,103 @@ def initial_sync(self, sync_config):
143228

144229

145230
@defer.inlineCallbacks
146-
def incremental_sync(self, sync_config):
147-
pass
231+
def incremental_sync_with_gap_for_room(self, room_id, sync_config,
232+
since_token, now_token,
233+
published_room_ids):
234+
""" Get the incremental delta needed to bring the client up to date for
235+
the room. Gives the client the most recent events and the changes to
236+
state.
237+
Returns:
238+
A Deferred RoomSyncResult
239+
"""
240+
# TODO(mjark): Check if they have joined the room between
241+
# the previous sync and this one.
242+
# TODO(mjark): Apply the event filter in sync_config
243+
# TODO(mjark): Check for redactions we might have missed.
244+
# TODO(mjark): Typing notifications.
245+
recents, token = yield self.store.get_recent_events_for_room(
246+
room_id,
247+
limit=sync_config.limit + 1,
248+
from_token=since_token.room_key,
249+
end_token=now_token.room_key,
250+
)
251+
252+
logging.debug("Recents %r", recents)
253+
254+
if len(recents) > sync_config.limit:
255+
limited = True
256+
recents = recents[1:]
257+
else:
258+
limited = False
259+
260+
prev_batch_token = now_token.copy_and_replace("room_key", token[0])
261+
262+
# TODO(mjark): This seems racy since this isn't being passed a
263+
# token to indicate what point in the stream this is
264+
current_state_events = yield self.state_handler.get_current_state(
265+
room_id
266+
)
267+
268+
state_at_previous_sync = yield self.get_state_at_previous_sync(
269+
room_id, since_token=since_token
270+
)
271+
272+
state_events_delta = yield self.compute_state_delta(
273+
since_token=since_token,
274+
previous_state=state_at_previous_sync,
275+
current_state=current_state_events,
276+
)
277+
278+
room_sync = RoomSyncResult(
279+
room_id=room_id,
280+
published=room_id in published_room_ids,
281+
events=recents,
282+
prev_batch=prev_batch_token,
283+
state=state_events_delta,
284+
limited=limited,
285+
)
286+
287+
logging.debug("Room sync: %r", room_sync)
288+
289+
defer.returnValue(room_sync)
290+
291+
@defer.inlineCallbacks
292+
def get_state_at_previous_sync(self, room_id, since_token):
293+
""" Get the room state at the previous sync the client made.
294+
Returns:
295+
A Deferred list of Events.
296+
"""
297+
last_events, token = yield self.store.get_recent_events_for_room(
298+
room_id, end_token=since_token.room_key, limit=1,
299+
)
300+
301+
if last_events:
302+
last_event = last_events[0]
303+
last_context = yield self.state_handler.compute_event_context(
304+
last_event
305+
)
306+
if last_event.is_state():
307+
state = [last_event] + last_context.current_state.values()
308+
else:
309+
state = last_context.current_state.values()
310+
else:
311+
state = ()
312+
defer.returnValue(state)
313+
314+
315+
def compute_state_delta(self, since_token, previous_state, current_state):
316+
""" Works out the differnce in state between the current state and the
317+
state the client got when it last performed a sync.
318+
Returns:
319+
A list of events.
320+
"""
321+
# TODO(mjark) Check if the state events were received by the server
322+
# after the previous sync, since we need to include those state
323+
# updates even if they occured logically before the previous event.
324+
# TODO(mjark) Check for new redactions in the state events.
325+
previous_dict = {event.event_id:event for event in previous_state}
326+
state_delta = []
327+
for event in current_state:
328+
if event.event_id not in previous_dict:
329+
state_delta.append(event)
330+
return state_delta

synapse/storage/stream.py

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -265,17 +265,38 @@ def f(txn):
265265
return self.runInteraction("paginate_room_events", f)
266266

267267
def get_recent_events_for_room(self, room_id, limit, end_token,
268-
with_feedback=False):
268+
with_feedback=False, from_token=None):
269269
# TODO (erikj): Handle compressed feedback
270270

271-
sql = (
272-
"SELECT stream_ordering, topological_ordering, event_id FROM events "
273-
"WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0 "
274-
"ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? "
275-
)
271+
end_token = _StreamToken.parse_stream_token(end_token)
276272

277-
def f(txn):
278-
txn.execute(sql, (room_id, end_token, limit,))
273+
if from_token is None:
274+
sql = (
275+
"SELECT stream_ordering, topological_ordering, event_id"
276+
" FROM events"
277+
" WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0"
278+
" ORDER BY topological_ordering DESC, stream_ordering DESC"
279+
" LIMIT ?"
280+
)
281+
else:
282+
from_token = _StreamToken.parse_stream_token(from_token)
283+
sql = (
284+
"SELECT stream_ordering, topological_ordering, event_id"
285+
" FROM events"
286+
" WHERE room_id = ? AND stream_ordering > ?"
287+
" AND stream_ordering <= ? AND outlier = 0"
288+
" ORDER BY topological_ordering DESC, stream_ordering DESC"
289+
" LIMIT ?"
290+
)
291+
292+
293+
def get_recent_events_for_room_txn(txn):
294+
if from_token is None:
295+
txn.execute(sql, (room_id, end_token.stream, limit,))
296+
else:
297+
txn.execute(sql, (
298+
room_id, from_token.stream, end_token.stream, limit
299+
))
279300

280301
rows = self.cursor_to_dict(txn)
281302

@@ -303,7 +324,9 @@ def f(txn):
303324

304325
return events, token
305326

306-
return self.runInteraction("get_recent_events_for_room", f)
327+
return self.runInteraction(
328+
"get_recent_events_for_room", get_recent_events_for_room_txn
329+
)
307330

308331
def get_room_events_max_id(self):
309332
return self.runInteraction(

0 commit comments

Comments
 (0)