11# -*- coding: utf-8 -*-
2- # Copyright 2015-2020 The Matrix.org Foundation C.I.C.
2+ # Copyright 2015 OpenMarket Ltd
3+ # Copyright 2018 New Vector Ltd
34#
45# Licensed under the Apache License, Version 2.0 (the "License");
56# you may not use this file except in compliance with the License.
1415# limitations under the License.
1516
1617import logging
17- from typing import Dict , Tuple
1818
19- import attr
2019from canonicaljson import json
2120
2221from twisted .internet import defer
3736]
3837
3938
40- @attr .s
41- class EventPushSummary :
42- """Summary of pending event push actions for a given user in a given room."""
43-
44- unread_count = attr .ib (type = int )
45- stream_ordering = attr .ib (type = int )
46- old_user_id = attr .ib (type = str )
47- notif_count = attr .ib (type = int )
48-
49-
5039def _serialize_action (actions , is_highlight ):
5140 """Custom serializer for actions. This allows us to "compress" common actions.
5241
@@ -133,50 +122,32 @@ def _get_unread_counts_by_receipt_txn(
133122
134123 def _get_unread_counts_by_pos_txn (self , txn , room_id , user_id , stream_ordering ):
135124
136- # First get number of actions, grouped on whether the action notifies.
125+ # First get number of notifications.
126+ # We don't need to put a notif=1 clause as all rows always have
127+ # notif=1
137128 sql = (
138- "SELECT count(*), notif "
129+ "SELECT count(*)"
139130 " FROM event_push_actions ea"
140131 " WHERE"
141132 " user_id = ?"
142133 " AND room_id = ?"
143134 " AND stream_ordering > ?"
144- " GROUP BY notif"
145135 )
146- txn .execute (sql , (user_id , room_id , stream_ordering ))
147- rows = txn .fetchall ()
148136
149- # We should get a maximum number of two rows: one for notif = 0, which is the
150- # number of actions that contribute to the unread_count but not to the
151- # notify_count, and one for notif = 1, which is the number of actions that
152- # contribute to both counters. If one or both rows don't appear, then the
153- # value for the matching counter should be 0.
154- unread_count = 0
155- notify_count = 0
156- for row in rows :
157- # We always increment unread_count because actions that notify also
158- # contribute to it.
159- unread_count += row [0 ]
160- if row [1 ] == 1 :
161- notify_count = row [0 ]
162- elif row [1 ] != 0 :
163- logger .warning (
164- "Unexpected value %d for column 'notif' in table"
165- " 'event_push_actions'" ,
166- row [1 ],
167- )
137+ txn .execute (sql , (user_id , room_id , stream_ordering ))
138+ row = txn .fetchone ()
139+ notify_count = row [0 ] if row else 0
168140
169141 txn .execute (
170142 """
171- SELECT notif_count, unread_count FROM event_push_summary
143+ SELECT notif_count FROM event_push_summary
172144 WHERE room_id = ? AND user_id = ? AND stream_ordering > ?
173145 """ ,
174146 (room_id , user_id , stream_ordering ),
175147 )
176148 rows = txn .fetchall ()
177149 if rows :
178150 notify_count += rows [0 ][0 ]
179- unread_count += rows [0 ][1 ]
180151
181152 # Now get the number of highlights
182153 sql = (
@@ -193,11 +164,7 @@ def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, stream_ordering):
193164 row = txn .fetchone ()
194165 highlight_count = row [0 ] if row else 0
195166
196- return {
197- "unread_count" : unread_count ,
198- "notify_count" : notify_count ,
199- "highlight_count" : highlight_count ,
200- }
167+ return {"notify_count" : notify_count , "highlight_count" : highlight_count }
201168
202169 @defer .inlineCallbacks
203170 def get_push_action_users_in_range (self , min_stream_ordering , max_stream_ordering ):
@@ -255,7 +222,6 @@ def get_after_receipt(txn):
255222 " AND ep.user_id = ?"
256223 " AND ep.stream_ordering > ?"
257224 " AND ep.stream_ordering <= ?"
258- " AND ep.notif = 1"
259225 " ORDER BY ep.stream_ordering ASC LIMIT ?"
260226 )
261227 args = [user_id , user_id , min_stream_ordering , max_stream_ordering , limit ]
@@ -284,7 +250,6 @@ def get_no_receipt(txn):
284250 " AND ep.user_id = ?"
285251 " AND ep.stream_ordering > ?"
286252 " AND ep.stream_ordering <= ?"
287- " AND ep.notif = 1"
288253 " ORDER BY ep.stream_ordering ASC LIMIT ?"
289254 )
290255 args = [user_id , user_id , min_stream_ordering , max_stream_ordering , limit ]
@@ -357,7 +322,6 @@ def get_after_receipt(txn):
357322 " AND ep.user_id = ?"
358323 " AND ep.stream_ordering > ?"
359324 " AND ep.stream_ordering <= ?"
360- " AND ep.notif = 1"
361325 " ORDER BY ep.stream_ordering DESC LIMIT ?"
362326 )
363327 args = [user_id , user_id , min_stream_ordering , max_stream_ordering , limit ]
@@ -386,7 +350,6 @@ def get_no_receipt(txn):
386350 " AND ep.user_id = ?"
387351 " AND ep.stream_ordering > ?"
388352 " AND ep.stream_ordering <= ?"
389- " AND ep.notif = 1"
390353 " ORDER BY ep.stream_ordering DESC LIMIT ?"
391354 )
392355 args = [user_id , user_id , min_stream_ordering , max_stream_ordering , limit ]
@@ -436,7 +399,7 @@ def get_if_maybe_push_in_range_for_user(self, user_id, min_stream_ordering):
436399 def _get_if_maybe_push_in_range_for_user_txn (txn ):
437400 sql = """
438401 SELECT 1 FROM event_push_actions
439- WHERE user_id = ? AND stream_ordering > ? AND notif = 1
402+ WHERE user_id = ? AND stream_ordering > ?
440403 LIMIT 1
441404 """
442405
@@ -465,15 +428,14 @@ def add_push_actions_to_staging(self, event_id, user_id_actions):
465428 return
466429
467430 # This is a helper function for generating the necessary tuple that
468- # can be used to insert into the `event_push_actions_staging` table.
431+ # can be used to inert into the `event_push_actions_staging` table.
469432 def _gen_entry (user_id , actions ):
470433 is_highlight = 1 if _action_has_highlight (actions ) else 0
471- notif = 0 if "org.matrix.msc2625.mark_unread" in actions else 1
472434 return (
473435 event_id , # event_id column
474436 user_id , # user_id column
475437 _serialize_action (actions , is_highlight ), # actions column
476- notif , # notif column
438+ 1 , # notif column
477439 is_highlight , # highlight column
478440 )
479441
@@ -855,51 +817,24 @@ def _rotate_notifs_before_txn(self, txn, rotate_to_stream_ordering):
855817 # Calculate the new counts that should be upserted into event_push_summary
856818 sql = """
857819 SELECT user_id, room_id,
858- coalesce(old.%s , 0) + upd.cnt ,
820+ coalesce(old.notif_count , 0) + upd.notif_count ,
859821 upd.stream_ordering,
860822 old.user_id
861823 FROM (
862- SELECT user_id, room_id, count(*) as cnt ,
824+ SELECT user_id, room_id, count(*) as notif_count ,
863825 max(stream_ordering) as stream_ordering
864826 FROM event_push_actions
865827 WHERE ? <= stream_ordering AND stream_ordering < ?
866828 AND highlight = 0
867- %s
868829 GROUP BY user_id, room_id
869830 ) AS upd
870831 LEFT JOIN event_push_summary AS old USING (user_id, room_id)
871832 """
872833
873- # First get the count of unread messages.
874- txn .execute (
875- sql % ("unread_count" , "" ),
876- (old_rotate_stream_ordering , rotate_to_stream_ordering ),
877- )
878-
879- # We need to merge both lists into a single object because we might not have the
880- # same amount of rows in each of them. In this case we use a dict indexed on the
881- # user ID and room ID to make it easier to populate.
882- summaries = {} # type: Dict[Tuple[str, str], EventPushSummary]
883- for row in txn :
884- summaries [(row [0 ], row [1 ])] = EventPushSummary (
885- unread_count = row [2 ],
886- stream_ordering = row [3 ],
887- old_user_id = row [4 ],
888- notif_count = 0 ,
889- )
890-
891- # Then get the count of notifications.
892- txn .execute (
893- sql % ("notif_count" , "AND notif = 1" ),
894- (old_rotate_stream_ordering , rotate_to_stream_ordering ),
895- )
896-
897- # notif_rows is populated based on a subset of the query used to populate
898- # unread_rows, so we can be sure that there will be no KeyError here.
899- for row in txn :
900- summaries [(row [0 ], row [1 ])].notif_count = row [2 ]
834+ txn .execute (sql , (old_rotate_stream_ordering , rotate_to_stream_ordering ))
835+ rows = txn .fetchall ()
901836
902- logger .info ("Rotating notifications, handling %d rows" , len (summaries ))
837+ logger .info ("Rotating notifications, handling %d rows" , len (rows ))
903838
904839 # If the `old.user_id` above is NULL then we know there isn't already an
905840 # entry in the table, so we simply insert it. Otherwise we update the
@@ -909,34 +844,22 @@ def _rotate_notifs_before_txn(self, txn, rotate_to_stream_ordering):
909844 table = "event_push_summary" ,
910845 values = [
911846 {
912- "user_id" : user_id ,
913- "room_id" : room_id ,
914- "notif_count" : summary .notif_count ,
915- "unread_count" : summary .unread_count ,
916- "stream_ordering" : summary .stream_ordering ,
847+ "user_id" : row [0 ],
848+ "room_id" : row [1 ],
849+ "notif_count" : row [2 ],
850+ "stream_ordering" : row [3 ],
917851 }
918- for (( user_id , room_id ), summary ) in summaries . items ()
919- if summary . old_user_id is None
852+ for row in rows
853+ if row [ 4 ] is None
920854 ],
921855 )
922856
923857 txn .executemany (
924858 """
925- UPDATE event_push_summary
926- SET notif_count = ?, unread_count = ?, stream_ordering = ?
859+ UPDATE event_push_summary SET notif_count = ?, stream_ordering = ?
927860 WHERE user_id = ? AND room_id = ?
928861 """ ,
929- (
930- (
931- summary .notif_count ,
932- summary .unread_count ,
933- summary .stream_ordering ,
934- user_id ,
935- room_id ,
936- )
937- for ((user_id , room_id ), summary ) in summaries .items ()
938- if summary .old_user_id is not None
939- ),
862+ ((row [2 ], row [3 ], row [0 ], row [1 ]) for row in rows if row [4 ] is not None ),
940863 )
941864
942865 txn .execute (
0 commit comments