|
45 | 45 | from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string |
46 | 46 | from synapse.http.site import SynapseRequest |
47 | 47 | from synapse.logging.opentracing import trace_with_opname |
48 | | -from synapse.types import JsonDict, Requester, StreamToken |
| 48 | +from synapse.types import JsonDict, Requester, RoomStreamToken, StreamToken |
49 | 49 | from synapse.util import json_decoder |
50 | 50 |
|
51 | 51 | from ._base import client_patterns, set_timeline_upper_limit |
@@ -238,9 +238,52 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: |
238 | 238 | time_now, sync_result, requester, filter_collection |
239 | 239 | ) |
240 | 240 |
|
| 241 | + if ( |
| 242 | + since_token is not None |
| 243 | + and sync_result.next_batch is not None |
| 244 | + and self._sync_token_went_backwards( |
| 245 | + since_token.room_key, sync_result.next_batch.room_key |
| 246 | + ) |
| 247 | + ): |
| 248 | + since_str = await since_token.to_string(self.store) |
| 249 | + next_str = await sync_result.next_batch.to_string(self.store) |
| 250 | + logger.warning( |
| 251 | + "sync token went backwards! from %s to %s", since_str, next_str |
| 252 | + ) |
| 253 | + |
241 | 254 | logger.debug("Event formatting complete") |
242 | 255 | return 200, response_content |
243 | 256 |
|
| 257 | + def _sync_token_went_backwards( |
| 258 | + self, since: RoomStreamToken, next: RoomStreamToken |
| 259 | + ) -> bool: |
| 260 | + """ |
| 261 | + Returns true if and only if the given token went backwards. |
| 262 | + """ |
| 263 | + if next.stream < since.stream: |
| 264 | + return True |
| 265 | + |
| 266 | + if since.instance_map: |
| 267 | + for instance, pos in since.instance_map.items(): |
| 268 | + if ( |
| 269 | + next.instance_map |
| 270 | + and next.instance_map.get(instance, next.stream) < pos |
| 271 | + ): |
| 272 | + return True |
| 273 | + elif next.stream < pos: |
| 274 | + return True |
| 275 | + |
| 276 | + if next.instance_map: |
| 277 | + for instance, pos in next.instance_map.items(): |
| 278 | + if since.instance_map and pos < since.instance_map.get( |
| 279 | + instance, since.stream |
| 280 | + ): |
| 281 | + return True |
| 282 | + elif pos < since.stream: |
| 283 | + return True |
| 284 | + |
| 285 | + return False |
| 286 | + |
244 | 287 | @trace_with_opname("sync.encode_response") |
245 | 288 | async def encode_response( |
246 | 289 | self, |
|
0 commit comments