1+ from __future__ import annotations
12import sys
23from collections .abc import Mapping
34from functools import wraps
@@ -62,11 +63,10 @@ class CeleryIntegration(Integration):
6263
6364 def __init__ (
6465 self ,
65- propagate_traces = True ,
66- monitor_beat_tasks = False ,
67- exclude_beat_tasks = None ,
68- ):
69- # type: (bool, bool, Optional[List[str]]) -> None
66+ propagate_traces : bool = True ,
67+ monitor_beat_tasks : bool = False ,
68+ exclude_beat_tasks : Optional [List [str ]] = None ,
69+ ) -> None :
7070 self .propagate_traces = propagate_traces
7171 self .monitor_beat_tasks = monitor_beat_tasks
7272 self .exclude_beat_tasks = exclude_beat_tasks
@@ -76,8 +76,7 @@ def __init__(
7676 _setup_celery_beat_signals (monitor_beat_tasks )
7777
7878 @staticmethod
79- def setup_once ():
80- # type: () -> None
79+ def setup_once () -> None :
8180 _check_minimum_version (CeleryIntegration , CELERY_VERSION )
8281
8382 _patch_build_tracer ()
@@ -97,16 +96,14 @@ def setup_once():
9796 ignore_logger ("celery.redirected" )
9897
9998
100- def _set_status (status ):
101- # type: (str) -> None
99+ def _set_status (status : str ) -> None :
102100 with capture_internal_exceptions ():
103101 span = sentry_sdk .get_current_span ()
104102 if span is not None :
105103 span .set_status (status )
106104
107105
108- def _capture_exception (task , exc_info ):
109- # type: (Any, ExcInfo) -> None
106+ def _capture_exception (task : Any , exc_info : ExcInfo ) -> None :
110107 client = sentry_sdk .get_client ()
111108 if client .get_integration (CeleryIntegration ) is None :
112109 return
@@ -129,10 +126,10 @@ def _capture_exception(task, exc_info):
129126 sentry_sdk .capture_event (event , hint = hint )
130127
131128
132- def _make_event_processor (task , uuid , args , kwargs , request = None ):
133- # type: ( Any, Any, Any, Any, Optional[Any]) -> EventProcessor
134- def event_processor ( event , hint ) :
135- # type: ( Event, Hint) -> Optional[Event]
129+ def _make_event_processor (
130+ task : Any , uuid : Any , args : Any , kwargs : Any , request : Optional [Any ] = None
131+ ) -> EventProcessor :
132+ def event_processor ( event : Event , hint : Hint ) -> Optional [Event ]:
136133
137134 with capture_internal_exceptions ():
138135 tags = event .setdefault ("tags" , {})
@@ -158,8 +155,9 @@ def event_processor(event, hint):
158155 return event_processor
159156
160157
161- def _update_celery_task_headers (original_headers , span , monitor_beat_tasks ):
162- # type: (dict[str, Any], Optional[Span], bool) -> dict[str, Any]
158+ def _update_celery_task_headers (
159+ original_headers : dict [str , Any ], span : Optional [Span ], monitor_beat_tasks : bool
160+ ) -> dict [str , Any ]:
163161 """
164162 Updates the headers of the Celery task with the tracing information
165163 and eventually Sentry Crons monitoring information for beat tasks.
@@ -233,20 +231,16 @@ def _update_celery_task_headers(original_headers, span, monitor_beat_tasks):
233231
234232
235233class NoOpMgr :
236- def __enter__ (self ):
237- # type: () -> None
234+ def __enter__ (self ) -> None :
238235 return None
239236
240- def __exit__ (self , exc_type , exc_value , traceback ):
241- # type: (Any, Any, Any) -> None
237+ def __exit__ (self , exc_type : Any , exc_value : Any , traceback : Any ) -> None :
242238 return None
243239
244240
245- def _wrap_task_run (f ):
246- # type: (F) -> F
241+ def _wrap_task_run (f : F ) -> F :
247242 @wraps (f )
248- def apply_async (* args , ** kwargs ):
249- # type: (*Any, **Any) -> Any
243+ def apply_async (* args : Any , ** kwargs : Any ) -> Any :
250244 # Note: kwargs can contain headers=None, so no setdefault!
251245 # Unsure which backend though.
252246 integration = sentry_sdk .get_client ().get_integration (CeleryIntegration )
@@ -262,15 +256,15 @@ def apply_async(*args, **kwargs):
262256 return f (* args , ** kwargs )
263257
264258 if isinstance (args [0 ], Task ):
265- task_name = args [0 ].name # type: str
259+ task_name : str = args [0 ].name
266260 elif len (args ) > 1 and isinstance (args [1 ], str ):
267261 task_name = args [1 ]
268262 else :
269263 task_name = "<unknown Celery task>"
270264
271265 task_started_from_beat = sentry_sdk .get_isolation_scope ()._name == "celery-beat"
272266
273- span_mgr = (
267+ span_mgr : Union [ Span , NoOpMgr ] = (
274268 sentry_sdk .start_span (
275269 op = OP .QUEUE_SUBMIT_CELERY ,
276270 name = task_name ,
@@ -279,7 +273,7 @@ def apply_async(*args, **kwargs):
279273 )
280274 if not task_started_from_beat
281275 else NoOpMgr ()
282- ) # type: Union[Span, NoOpMgr]
276+ )
283277
284278 with span_mgr as span :
285279 kwargs ["headers" ] = _update_celery_task_headers (
@@ -290,8 +284,7 @@ def apply_async(*args, **kwargs):
290284 return apply_async # type: ignore
291285
292286
293- def _wrap_tracer (task , f ):
294- # type: (Any, F) -> F
287+ def _wrap_tracer (task : Any , f : F ) -> F :
295288
296289 # Need to wrap tracer for pushing the scope before prerun is sent, and
297290 # popping it after postrun is sent.
@@ -301,8 +294,7 @@ def _wrap_tracer(task, f):
301294 # crashes.
302295 @wraps (f )
303296 @ensure_integration_enabled (CeleryIntegration , f )
304- def _inner (* args , ** kwargs ):
305- # type: (*Any, **Any) -> Any
297+ def _inner (* args : Any , ** kwargs : Any ) -> Any :
306298 with isolation_scope () as scope :
307299 scope ._name = "celery"
308300 scope .clear_breadcrumbs ()
@@ -333,8 +325,7 @@ def _inner(*args, **kwargs):
333325 return _inner # type: ignore
334326
335327
336- def _set_messaging_destination_name (task , span ):
337- # type: (Any, Span) -> None
328+ def _set_messaging_destination_name (task : Any , span : Span ) -> None :
338329 """Set "messaging.destination.name" tag for span"""
339330 with capture_internal_exceptions ():
340331 delivery_info = task .request .delivery_info
@@ -346,8 +337,7 @@ def _set_messaging_destination_name(task, span):
346337 span .set_attribute (SPANDATA .MESSAGING_DESTINATION_NAME , routing_key )
347338
348339
349- def _wrap_task_call (task , f ):
350- # type: (Any, F) -> F
340+ def _wrap_task_call (task : Any , f : F ) -> F :
351341
352342 # Need to wrap task call because the exception is caught before we get to
353343 # see it. Also celery's reported stacktrace is untrustworthy.
@@ -358,8 +348,7 @@ def _wrap_task_call(task, f):
358348 # to add @functools.wraps(f) here.
359349 # https://github.com/getsentry/sentry-python/issues/421
360350 @ensure_integration_enabled (CeleryIntegration , f )
361- def _inner (* args , ** kwargs ):
362- # type: (*Any, **Any) -> Any
351+ def _inner (* args : Any , ** kwargs : Any ) -> Any :
363352 try :
364353 with sentry_sdk .start_span (
365354 op = OP .QUEUE_PROCESS ,
@@ -409,14 +398,12 @@ def _inner(*args, **kwargs):
409398 return _inner # type: ignore
410399
411400
412- def _patch_build_tracer ():
413- # type: () -> None
401+ def _patch_build_tracer () -> None :
414402 import celery .app .trace as trace # type: ignore
415403
416404 original_build_tracer = trace .build_tracer
417405
418- def sentry_build_tracer (name , task , * args , ** kwargs ):
419- # type: (Any, Any, *Any, **Any) -> Any
406+ def sentry_build_tracer (name : Any , task : Any , * args : Any , ** kwargs : Any ) -> Any :
420407 if not getattr (task , "_sentry_is_patched" , False ):
421408 # determine whether Celery will use __call__ or run and patch
422409 # accordingly
@@ -435,29 +422,25 @@ def sentry_build_tracer(name, task, *args, **kwargs):
435422 trace .build_tracer = sentry_build_tracer
436423
437424
438- def _patch_task_apply_async ():
439- # type: () -> None
425+ def _patch_task_apply_async () -> None :
440426 Task .apply_async = _wrap_task_run (Task .apply_async )
441427
442428
443- def _patch_celery_send_task ():
444- # type: () -> None
429+ def _patch_celery_send_task () -> None :
445430 from celery import Celery
446431
447432 Celery .send_task = _wrap_task_run (Celery .send_task )
448433
449434
450- def _patch_worker_exit ():
451- # type: () -> None
435+ def _patch_worker_exit () -> None :
452436
453437 # Need to flush queue before worker shutdown because a crashing worker will
454438 # call os._exit
455439 from billiard .pool import Worker # type: ignore
456440
457441 original_workloop = Worker .workloop
458442
459- def sentry_workloop (* args , ** kwargs ):
460- # type: (*Any, **Any) -> Any
443+ def sentry_workloop (* args : Any , ** kwargs : Any ) -> Any :
461444 try :
462445 return original_workloop (* args , ** kwargs )
463446 finally :
@@ -471,13 +454,11 @@ def sentry_workloop(*args, **kwargs):
471454 Worker .workloop = sentry_workloop
472455
473456
474- def _patch_producer_publish ():
475- # type: () -> None
457+ def _patch_producer_publish () -> None :
476458 original_publish = Producer .publish
477459
478460 @ensure_integration_enabled (CeleryIntegration , original_publish )
479- def sentry_publish (self , * args , ** kwargs ):
480- # type: (Producer, *Any, **Any) -> Any
461+ def sentry_publish (self : Producer , * args : Any , ** kwargs : Any ) -> Any :
481462 kwargs_headers = kwargs .get ("headers" , {})
482463 if not isinstance (kwargs_headers , Mapping ):
483464 # Ensure kwargs_headers is a Mapping, so we can safely call get().
@@ -521,8 +502,7 @@ def sentry_publish(self, *args, **kwargs):
521502 Producer .publish = sentry_publish
522503
523504
524- def _prepopulate_attributes (task , args , kwargs ):
525- # type: (Any, *Any, **Any) -> dict[str, str]
505+ def _prepopulate_attributes (task : Any , args : Any , kwargs : Any ) -> dict [str , str ]:
526506 attributes = {
527507 "celery.job.task" : task .name ,
528508 }
0 commit comments