Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 0a60bbf

Browse files
committed
Finish appservice txn storage impl and tests.
1 parent 1ead1ca commit 0a60bbf

File tree

3 files changed

+139
-16
lines changed

3 files changed

+139
-16
lines changed

synapse/storage/appservice.py

Lines changed: 70 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -429,15 +429,7 @@ def _create_appservice_txn(self, txn, service, events):
429429
# The highest id may be the last one sent (in which case it is last_txn)
430430
# or it may be the highest in the txns list (which are waiting to be/are
431431
# being sent)
432-
result = txn.execute(
433-
"SELECT last_txn FROM application_services_state WHERE as_id=?",
434-
(service.id,)
435-
)
436-
last_txn_id = result.fetchone()
437-
if last_txn_id is None: # no row exists
438-
last_txn_id = 0
439-
else:
440-
last_txn_id = int(last_txn_id[0]) # select 'last_txn' col
432+
last_txn_id = self._get_last_txn(txn, service.id)
441433

442434
result = txn.execute(
443435
"SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?",
@@ -467,12 +459,43 @@ def complete_appservice_txn(self, txn_id, service):
467459
service(ApplicationService): The application service which was sent
468460
this transaction.
469461
Returns:
470-
A Deferred which resolves to True if this transaction was completed
462+
A Deferred which resolves if this transaction was stored
471463
successfully.
472464
"""
473-
# TODO: Set current txn_id for AS to 'txn_id'
474-
# TODO: Delete txn contents
475-
pass
465+
return self.runInteraction(
466+
"complete_appservice_txn",
467+
self._complete_appservice_txn,
468+
txn_id, service
469+
)
470+
471+
def _complete_appservice_txn(self, txn, txn_id, service):
472+
txn_id = int(txn_id)
473+
474+
# Debugging query: Make sure the txn being completed is EXACTLY +1 from
475+
# what was there before. If it isn't, we've got problems (e.g. the AS
476+
# has probably missed some events), so whine loudly but still continue,
477+
# since it shouldn't fail completion of the transaction.
478+
last_txn_id = self._get_last_txn(txn, service.id)
479+
if (last_txn_id + 1) != txn_id:
480+
logger.error(
481+
"appservice: Completing a transaction which has an ID > 1 from "
482+
"the last ID sent to this AS. We've either dropped events or "
483+
"sent it to the AS out of order. FIX ME. last_txn=%s "
484+
"completing_txn=%s service_id=%s", last_txn_id, txn_id,
485+
service.id
486+
)
487+
488+
# Set current txn_id for AS to 'txn_id'
489+
self._simple_upsert_txn(
490+
txn, "application_services_state", dict(as_id=service.id),
491+
dict(last_txn=txn_id)
492+
)
493+
494+
# Delete txn contents
495+
self._simple_delete_txn(
496+
txn, "application_services_txns",
497+
dict(txn_id=txn_id, as_id=service.id)
498+
)
476499

477500
def get_oldest_unsent_txn(self, service):
478501
"""Get the oldest transaction which has not been sent for this
@@ -484,6 +507,38 @@ def get_oldest_unsent_txn(self, service):
484507
A Deferred which resolves to an AppServiceTransaction or
485508
None.
486509
"""
487-
# TODO: Monotonically increasing txn ids, so just select the smallest
510+
return self.runInteraction(
511+
"get_oldest_unsent_appservice_txn",
512+
self._get_oldest_unsent_txn,
513+
service
514+
)
515+
516+
def _get_oldest_unsent_txn(self, txn, service):
517+
# Monotonically increasing txn ids, so just select the smallest
488518
# one in the txns table (we delete them when they are sent)
489-
pass
519+
result = txn.execute(
520+
"SELECT *,MIN(txn_id) FROM application_services_txns WHERE as_id=?",
521+
(service.id,)
522+
)
523+
entry = self.cursor_to_dict(result)[0]
524+
525+
if not entry or entry["txn_id"] is None:
526+
# the min(txn_id) part will force a row, so entry may not be None
527+
return None
528+
529+
return AppServiceTransaction(
530+
service=service, id=entry["txn_id"], events=json.loads(
531+
entry["content"]
532+
)
533+
)
534+
535+
def _get_last_txn(self, txn, service_id):
536+
result = txn.execute(
537+
"SELECT last_txn FROM application_services_state WHERE as_id=?",
538+
(service_id,)
539+
)
540+
last_txn_id = result.fetchone()
541+
if last_txn_id is None: # no row exists
542+
return 0
543+
else:
544+
return int(last_txn_id[0]) # select 'last_txn' col

synapse/storage/schema/delta/15/appservice_txns.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
CREATE TABLE IF NOT EXISTS application_services_state(
1717
as_id INTEGER PRIMARY KEY,
18-
state TEXT NOT NULL,
18+
state TEXT,
1919
last_txn TEXT,
2020
FOREIGN KEY(as_id) REFERENCES application_services(id)
2121
);

tests/storage/test_appservice.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,74 @@ def test_create_appservice_txn_up_fuzzing(self):
304304
self.assertEquals(txn.events, events)
305305
self.assertEquals(txn.service, service)
306306

307+
@defer.inlineCallbacks
308+
def test_complete_appservice_txn_first_txn(self):
309+
service = Mock(id=self.as_list[0]["id"])
310+
events = [{"foo": "bar"}]
311+
txn_id = 1
312+
313+
yield self._insert_txn(service.id, txn_id, events)
314+
yield self.store.complete_appservice_txn(txn_id=txn_id, service=service)
315+
316+
res = yield self.db_pool.runQuery(
317+
"SELECT last_txn FROM application_services_state WHERE as_id=?",
318+
(service.id,)
319+
)
320+
self.assertEquals(1, len(res))
321+
self.assertEquals(str(txn_id), res[0][0])
322+
323+
res = yield self.db_pool.runQuery(
324+
"SELECT * FROM application_services_txns WHERE txn_id=?",
325+
(txn_id,)
326+
)
327+
self.assertEquals(0, len(res))
328+
329+
@defer.inlineCallbacks
330+
def test_complete_appservice_txn_existing_in_state_table(self):
331+
service = Mock(id=self.as_list[0]["id"])
332+
events = [{"foo": "bar"}]
333+
txn_id = 5
334+
yield self._set_last_txn(service.id, 4)
335+
yield self._insert_txn(service.id, txn_id, events)
336+
yield self.store.complete_appservice_txn(txn_id=txn_id, service=service)
337+
338+
res = yield self.db_pool.runQuery(
339+
"SELECT last_txn, state FROM application_services_state WHERE "
340+
"as_id=?",
341+
(service.id,)
342+
)
343+
self.assertEquals(1, len(res))
344+
self.assertEquals(str(txn_id), res[0][0])
345+
self.assertEquals(ApplicationServiceState.UP, res[0][1])
346+
347+
res = yield self.db_pool.runQuery(
348+
"SELECT * FROM application_services_txns WHERE txn_id=?",
349+
(txn_id,)
350+
)
351+
self.assertEquals(0, len(res))
352+
353+
@defer.inlineCallbacks
354+
def test_get_oldest_unsent_txn_none(self):
355+
service = Mock(id=self.as_list[0]["id"])
356+
357+
txn = yield self.store.get_oldest_unsent_txn(service)
358+
self.assertEquals(None, txn)
359+
360+
@defer.inlineCallbacks
361+
def test_get_oldest_unsent_txn(self):
362+
service = Mock(id=self.as_list[0]["id"])
363+
events = [{"type": "nothing"}, {"type": "here"}]
364+
365+
yield self._insert_txn(self.as_list[1]["id"], 9, {"badger": "mushroom"})
366+
yield self._insert_txn(service.id, 10, events)
367+
yield self._insert_txn(service.id, 11, [{"foo":"bar"}])
368+
yield self._insert_txn(service.id, 12, [{"argh":"bargh"}])
369+
370+
txn = yield self.store.get_oldest_unsent_txn(service)
371+
self.assertEquals(service, txn.service)
372+
self.assertEquals(10, txn.id)
373+
self.assertEquals(events, txn.events)
374+
307375
@defer.inlineCallbacks
308376
def test_get_appservices_by_state_single(self):
309377
yield self._set_state(

0 commit comments

Comments
 (0)