5353logger = logging .getLogger (__name__ )
5454
5555
56+ ONE_TIME_KEY_UPLOAD = "one_time_key_upload_lock"
57+
58+
5659class E2eKeysHandler :
5760 def __init__ (self , hs : "HomeServer" ):
5861 self .config = hs .config
@@ -62,6 +65,7 @@ def __init__(self, hs: "HomeServer"):
6265 self ._appservice_handler = hs .get_application_service_handler ()
6366 self .is_mine = hs .is_mine
6467 self .clock = hs .get_clock ()
68+ self ._worker_lock_handler = hs .get_worker_locks_handler ()
6569
6670 federation_registry = hs .get_federation_registry ()
6771
@@ -855,45 +859,53 @@ async def upload_keys_for_user(
855859 async def _upload_one_time_keys_for_user (
856860 self , user_id : str , device_id : str , time_now : int , one_time_keys : JsonDict
857861 ) -> None :
858- logger .info (
859- "Adding one_time_keys %r for device %r for user %r at %d" ,
860- one_time_keys .keys (),
861- device_id ,
862- user_id ,
863- time_now ,
864- )
862+ # We take out a lock so that we don't have to worry about a client
863+ # sending duplicate requests.
864+ lock_key = f"{ user_id } _{ device_id } "
865+ async with self ._worker_lock_handler .acquire_lock (
866+ ONE_TIME_KEY_UPLOAD , lock_key
867+ ):
868+ logger .info (
869+ "Adding one_time_keys %r for device %r for user %r at %d" ,
870+ one_time_keys .keys (),
871+ device_id ,
872+ user_id ,
873+ time_now ,
874+ )
865875
866- # make a list of (alg, id, key) tuples
867- key_list = []
868- for key_id , key_obj in one_time_keys .items ():
869- algorithm , key_id = key_id .split (":" )
870- key_list .append ((algorithm , key_id , key_obj ))
876+ # make a list of (alg, id, key) tuples
877+ key_list = []
878+ for key_id , key_obj in one_time_keys .items ():
879+ algorithm , key_id = key_id .split (":" )
880+ key_list .append ((algorithm , key_id , key_obj ))
871881
872- # First we check if we have already persisted any of the keys.
873- existing_key_map = await self .store .get_e2e_one_time_keys (
874- user_id , device_id , [k_id for _ , k_id , _ in key_list ]
875- )
882+ # First we check if we have already persisted any of the keys.
883+ existing_key_map = await self .store .get_e2e_one_time_keys (
884+ user_id , device_id , [k_id for _ , k_id , _ in key_list ]
885+ )
876886
877- new_keys = [] # Keys that we need to insert. (alg, id, json) tuples.
878- for algorithm , key_id , key in key_list :
879- ex_json = existing_key_map .get ((algorithm , key_id ), None )
880- if ex_json :
881- if not _one_time_keys_match (ex_json , key ):
882- raise SynapseError (
883- 400 ,
884- (
885- "One time key %s:%s already exists. "
886- "Old key: %s; new key: %r"
887+ new_keys = [] # Keys that we need to insert. (alg, id, json) tuples.
888+ for algorithm , key_id , key in key_list :
889+ ex_json = existing_key_map .get ((algorithm , key_id ), None )
890+ if ex_json :
891+ if not _one_time_keys_match (ex_json , key ):
892+ raise SynapseError (
893+ 400 ,
894+ (
895+ "One time key %s:%s already exists. "
896+ "Old key: %s; new key: %r"
897+ )
898+ % (algorithm , key_id , ex_json , key ),
887899 )
888- % (algorithm , key_id , ex_json , key ),
900+ else :
901+ new_keys .append (
902+ (algorithm , key_id , encode_canonical_json (key ).decode ("ascii" ))
889903 )
890- else :
891- new_keys .append (
892- (algorithm , key_id , encode_canonical_json (key ).decode ("ascii" ))
893- )
894904
895- log_kv ({"message" : "Inserting new one_time_keys." , "keys" : new_keys })
896- await self .store .add_e2e_one_time_keys (user_id , device_id , time_now , new_keys )
905+ log_kv ({"message" : "Inserting new one_time_keys." , "keys" : new_keys })
906+ await self .store .add_e2e_one_time_keys (
907+ user_id , device_id , time_now , new_keys
908+ )
897909
898910 async def upload_signing_keys_for_user (
899911 self , user_id : str , keys : JsonDict
0 commit comments