2323from typing_extensions import ParamSpec
2424
2525from twisted .internet import defer , task
26+ from twisted .internet .defer import Deferred
2627from twisted .internet .interfaces import IDelayedCall
2728from twisted .internet .task import LoopingCall
2829
@@ -46,6 +47,8 @@ class Clock:
4647
4748 async def sleep (self , seconds : float ) -> None :
4849 d : defer .Deferred [float ] = defer .Deferred ()
50+ # Start task in the `sentinel` logcontext, to avoid leaking the current context
51+ # into the reactor once it finishes.
4952 with context .PreserveLoggingContext ():
5053 self ._reactor .callLater (seconds , d .callback , seconds )
5154 await d
@@ -74,8 +77,9 @@ def looping_call(
7477 this functionality thanks to this function being a thin wrapper around
7578 `twisted.internet.task.LoopingCall`.
7679
77- Note that the function will be called with no logcontext, so if it is anything
78- other than trivial, you probably want to wrap it in run_as_background_process.
80+ Note that the function will be called with generic `looping_call` logcontext, so
81+ if it is anything other than a trivial task, you probably want to wrap it in
82+ `run_as_background_process` to give it more specific label and track metrics.
7983
8084 Args:
8185 f: The function to call repeatedly.
@@ -97,8 +101,9 @@ def looping_call_now(
97101 As with `looping_call`: subsequent calls are not scheduled until after the
98102 the Awaitable returned by a previous call has finished.
99103
100- Also as with `looping_call`: the function is called with no logcontext and
101- you probably want to wrap it in `run_as_background_process`.
104+ Note that the function will be called with generic `looping_call` logcontext, so
105+ if it is anything other than a trivial task, you probably want to wrap it in
106+ `run_as_background_process` to give it more specific label and track metrics.
102107
103108 Args:
104109 f: The function to call repeatedly.
@@ -117,9 +122,43 @@ def _looping_call_common(
117122 ** kwargs : P .kwargs ,
118123 ) -> LoopingCall :
119124 """Common functionality for `looping_call` and `looping_call_now`"""
120- call = task .LoopingCall (f , * args , ** kwargs )
125+
126+ def wrapped_f (* args : P .args , ** kwargs : P .kwargs ) -> Deferred :
127+ assert context .current_context () is context .SENTINEL_CONTEXT , (
128+ "Expected `looping_call` callback from the reactor to start with the sentinel logcontext "
129+ f"but saw { context .current_context ()} . In other words, another task shouldn't have "
130+ "leaked their logcontext to us."
131+ )
132+
133+ # Because this is a callback from the reactor, we will be using the
134+ # `sentinel` log context at this point. We want the function to log with
135+ # some logcontext as we want to know which server the logs came from.
136+ #
137+ # We use `PreserveLoggingContext` to prevent our new `looping_call`
138+ # logcontext from finishing as soon as we exit this function, in case `f`
139+ # returns an awaitable/deferred which would continue running and may try to
140+ # restore the `loop_call` context when it's done (because it's trying to
141+ # adhere to the Synapse logcontext rules.)
142+ #
143+ # This also ensures that we return to the `sentinel` context when we exit
144+ # this function and yield control back to the reactor to avoid leaking the
145+ # current logcontext to the reactor (which would then get picked up and
146+ # associated with the next thing the reactor does)
147+ with context .PreserveLoggingContext (context .LoggingContext ("looping_call" )):
148+ # We use `run_in_background` to reset the logcontext after `f` (or the
149+ # awaitable returned by `f`) completes to avoid leaking the current
150+ # logcontext to the reactor
151+ return context .run_in_background (f , * args , ** kwargs )
152+
153+ call = task .LoopingCall (wrapped_f , * args , ** kwargs )
121154 call .clock = self ._reactor
122- d = call .start (msec / 1000.0 , now = now )
155+ # If `now=true`, the function will be called here immediately so we need to be
156+ # in the sentinel context now.
157+ #
158+ # We want to start the task in the `sentinel` logcontext, to avoid leaking the
159+ # current context into the reactor after the function finishes.
160+ with context .PreserveLoggingContext ():
161+ d = call .start (msec / 1000.0 , now = now )
123162 d .addErrback (log_failure , "Looping call died" , consumeErrors = False )
124163 return call
125164
@@ -128,8 +167,9 @@ def call_later(
128167 ) -> IDelayedCall :
129168 """Call something later
130169
131- Note that the function will be called with no logcontext, so if it is anything
132- other than trivial, you probably want to wrap it in run_as_background_process.
170+ Note that the function will be called with generic `call_later` logcontext, so
171+ if it is anything other than a trivial task, you probably want to wrap it in
172+ `run_as_background_process` to give it more specific label and track metrics.
133173
134174 Args:
135175 delay: How long to wait in seconds.
@@ -139,11 +179,33 @@ def call_later(
139179 """
140180
141181 def wrapped_callback (* args : Any , ** kwargs : Any ) -> None :
142- with context .PreserveLoggingContext ():
143- callback (* args , ** kwargs )
182+ assert context .current_context () is context .SENTINEL_CONTEXT , (
183+ "Expected `call_later` callback from the reactor to start with the sentinel logcontext "
184+ f"but saw { context .current_context ()} . In other words, another task shouldn't have "
185+ "leaked their logcontext to us."
186+ )
144187
145- with context .PreserveLoggingContext ():
146- return self ._reactor .callLater (delay , wrapped_callback , * args , ** kwargs )
188+ # Because this is a callback from the reactor, we will be using the
189+ # `sentinel` log context at this point. We want the function to log with
190+ # some logcontext as we want to know which server the logs came from.
191+ #
192+ # We use `PreserveLoggingContext` to prevent our new `call_later`
193+ # logcontext from finishing as soon as we exit this function, in case `f`
194+ # returns an awaitable/deferred which would continue running and may try to
195+ # restore the `loop_call` context when it's done (because it's trying to
196+ # adhere to the Synapse logcontext rules.)
197+ #
198+ # This also ensures that we return to the `sentinel` context when we exit
199+ # this function and yield control back to the reactor to avoid leaking the
200+ # current logcontext to the reactor (which would then get picked up and
201+ # associated with the next thing the reactor does)
202+ with context .PreserveLoggingContext (context .LoggingContext ("call_later" )):
203+ # We use `run_in_background` to reset the logcontext after `f` (or the
204+ # awaitable returned by `f`) completes to avoid leaking the current
205+ # logcontext to the reactor
206+ context .run_in_background (callback , * args , ** kwargs )
207+
208+ return self ._reactor .callLater (delay , wrapped_callback , * args , ** kwargs )
147209
148210 def cancel_call_later (self , timer : IDelayedCall , ignore_errs : bool = False ) -> None :
149211 try :
0 commit comments