@@ -23,6 +23,7 @@ import (
2323 "code.gitea.io/gitea/modules/graceful"
2424 "code.gitea.io/gitea/modules/hostmatcher"
2525 "code.gitea.io/gitea/modules/log"
26+ "code.gitea.io/gitea/modules/process"
2627 "code.gitea.io/gitea/modules/proxy"
2728 "code.gitea.io/gitea/modules/queue"
2829 "code.gitea.io/gitea/modules/setting"
@@ -43,7 +44,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
4344 return
4445 }
4546 // There was a panic whilst delivering a hook...
46- log .Error ("PANIC whilst trying to deliver webhook[%d] to %s Panic: %v\n Stacktrace: %s" , t .ID , w .URL , err , log .Stack (2 ))
47+ log .Error ("PANIC whilst trying to deliver webhook task [%d] to webhook %s Panic: %v\n Stacktrace: %s" , t .ID , w .URL , err , log .Stack (2 ))
4748 }()
4849
4950 t .IsDelivered = true
@@ -52,7 +53,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
5253
5354 switch w .HTTPMethod {
5455 case "" :
55- log .Info ("HTTP Method for webhook %d empty, setting to POST as default" , t . ID )
56+ log .Info ("HTTP Method for webhook %s empty, setting to POST as default" , w . URL )
5657 fallthrough
5758 case http .MethodPost :
5859 switch w .ContentType {
@@ -78,14 +79,14 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
7879 case http .MethodGet :
7980 u , err := url .Parse (w .URL )
8081 if err != nil {
81- return err
82+ return fmt . Errorf ( "unable to deliver webhook task[%d] as cannot parse webhook url %s: %w" , t . ID , w . URL , err )
8283 }
8384 vals := u .Query ()
8485 vals ["payload" ] = []string {t .PayloadContent }
8586 u .RawQuery = vals .Encode ()
8687 req , err = http .NewRequest ("GET" , u .String (), nil )
8788 if err != nil {
88- return err
89+ return fmt . Errorf ( "unable to deliver webhook task[%d] as unable to create HTTP request for webhook url %s: %w" , t . ID , w . URL , err )
8990 }
9091 case http .MethodPut :
9192 switch w .Type {
@@ -97,13 +98,13 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
9798 url := fmt .Sprintf ("%s/%s" , w .URL , url .PathEscape (txnID ))
9899 req , err = http .NewRequest ("PUT" , url , strings .NewReader (t .PayloadContent ))
99100 if err != nil {
100- return err
101+ return fmt . Errorf ( "unable to deliver webhook task[%d] as cannot create matrix request for webhook url %s: %w" , t . ID , w . URL , err )
101102 }
102103 default :
103- return fmt .Errorf ("invalid http method for webhook: [%d] % v" , t .ID , w .HTTPMethod )
104+ return fmt .Errorf ("invalid http method for webhook task [%d] in webhook %s: % v" , t .ID , w . URL , w .HTTPMethod )
104105 }
105106 default :
106- return fmt .Errorf ("invalid http method for webhook: [%d] % v" , t .ID , w .HTTPMethod )
107+ return fmt .Errorf ("invalid http method for webhook task [%d] in webhook %s: % v" , t .ID , w . URL , w .HTTPMethod )
107108 }
108109
109110 var signatureSHA1 string
@@ -159,6 +160,20 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
159160 Headers : map [string ]string {},
160161 }
161162
163+ // OK We're now ready to attempt to deliver the task - we must double check that it
164+ // has not been delivered in the meantime
165+ updated , err := webhook_model .MarkTaskDelivered (ctx , t )
166+ if err != nil {
167+ log .Error ("MarkTaskDelivered[%d]: %v" , t .ID , err )
168+ return fmt .Errorf ("unable to mark task[%d] delivered in the db: %w" , t .ID , err )
169+ }
170+ if ! updated {
171+ // This webhook task has already been attempted to be delivered or is in the process of being delivered
172+ log .Trace ("Webhook Task[%d] already delivered" , t .ID )
173+ return nil
174+ }
175+
176+ // All code from this point will update the hook task
162177 defer func () {
163178 t .Delivered = time .Now ().UnixNano ()
164179 if t .IsSucceed {
@@ -190,13 +205,14 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
190205 }
191206
192207 if ! w .IsActive {
208+ log .Trace ("Webhook %s in Webhook Task[%d] is not active" , w .URL , t .ID )
193209 return nil
194210 }
195211
196212 resp , err := webhookHTTPClient .Do (req .WithContext (ctx ))
197213 if err != nil {
198214 t .ResponseInfo .Body = fmt .Sprintf ("Delivery: %v" , err )
199- return err
215+ return fmt . Errorf ( "unable to deliver webhook task[%d] in %s due to error in http client: %w" , t . ID , w . URL , err )
200216 }
201217 defer resp .Body .Close ()
202218
@@ -210,7 +226,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
210226 p , err := io .ReadAll (resp .Body )
211227 if err != nil {
212228 t .ResponseInfo .Body = fmt .Sprintf ("read body: %s" , err )
213- return err
229+ return fmt . Errorf ( "unable to deliver webhook task[%d] in %s as unable to read response body: %w" , t . ID , w . URL , err )
214230 }
215231 t .ResponseInfo .Body = string (p )
216232 return nil
@@ -272,17 +288,37 @@ func Init() error {
272288 }
273289 go graceful .GetManager ().RunWithShutdownFns (hookQueue .Run )
274290
275- tasks , err := webhook_model .FindUndeliveredHookTasks (graceful .GetManager ().HammerContext ())
276- if err != nil {
277- log .Error ("FindUndeliveredHookTasks failed: %v" , err )
278- return err
279- }
291+ go graceful .GetManager ().RunWithShutdownContext (populateWebhookSendingQueue )
292+
293+ return nil
294+ }
295+
296+ func populateWebhookSendingQueue (ctx context.Context ) {
297+ ctx , _ , finished := process .GetManager ().AddContext (ctx , "Webhook: Populate sending queue" )
298+ defer finished ()
280299
281- for _ , task := range tasks {
282- if err := enqueueHookTask (task ); err != nil {
283- log .Error ("enqueueHookTask failed: %v" , err )
300+ lowerID := int64 (0 )
301+ for {
302+ taskIDs , err := webhook_model .FindUndeliveredHookTaskIDs (ctx , lowerID )
303+ if err != nil {
304+ log .Error ("Unable to populate webhook queue as FindUndeliveredHookTaskIDs failed: %v" , err )
305+ return
306+ }
307+ if len (taskIDs ) == 0 {
308+ return
309+ }
310+ lowerID = taskIDs [len (taskIDs )- 1 ]
311+
312+ for _ , taskID := range taskIDs {
313+ select {
314+ case <- ctx .Done ():
315+ log .Warn ("Shutdown before Webhook Sending queue finishing being populated" )
316+ return
317+ default :
318+ }
319+ if err := enqueueHookTask (taskID ); err != nil {
320+ log .Error ("Unable to push HookTask[%d] to the Webhook Sending queue: %v" , taskID , err )
321+ }
284322 }
285323 }
286-
287- return nil
288324}
0 commit comments