|
11 | 11 | from typing import Dict, Iterable, List, Optional, Set, Tuple |
12 | 12 |
|
13 | 13 | from .clickhouse_client_helper import CHCliFactory |
| 14 | +from .utils import RetryWithBackoff |
14 | 15 |
|
15 | 16 |
|
16 | 17 | @dataclass |
@@ -160,45 +161,47 @@ def _fetch_workflow_data(self): |
160 | 161 | wf.workflow_name, push_dedup.timestamp DESC, wf.head_sha, wf.name |
161 | 162 | """ |
162 | 163 |
|
163 | | - result = CHCliFactory().client.query( |
164 | | - query, |
165 | | - parameters={ |
166 | | - "workflow_names": self.workflow_names, |
167 | | - "lookback_time": lookback_time, |
168 | | - }, |
169 | | - ) |
170 | | - |
171 | | - # Group by workflow and commit SHA |
172 | | - workflow_commits_data = {} |
173 | | - for row in result.result_rows: |
174 | | - ( |
175 | | - workflow_name, |
176 | | - head_sha, |
177 | | - name, |
178 | | - conclusion, |
179 | | - status, |
180 | | - classification_rule, |
181 | | - created_at, |
182 | | - ) = row |
183 | | - |
184 | | - if workflow_name not in workflow_commits_data: |
185 | | - workflow_commits_data[workflow_name] = {} |
186 | | - |
187 | | - if head_sha not in workflow_commits_data[workflow_name]: |
188 | | - workflow_commits_data[workflow_name][head_sha] = CommitJobs( |
189 | | - head_sha=head_sha, created_at=created_at, jobs=[] |
| 164 | + for attempt in RetryWithBackoff(): |
| 165 | + with attempt: |
| 166 | + result = CHCliFactory().client.query( |
| 167 | + query, |
| 168 | + parameters={ |
| 169 | + "workflow_names": self.workflow_names, |
| 170 | + "lookback_time": lookback_time, |
| 171 | + }, |
190 | 172 | ) |
191 | 173 |
|
192 | | - workflow_commits_data[workflow_name][head_sha].jobs.append( |
193 | | - JobResult( |
194 | | - head_sha=head_sha, |
195 | | - name=name, |
196 | | - conclusion=conclusion, |
197 | | - status=status, |
198 | | - classification_rule=classification_rule or "", |
199 | | - workflow_created_at=created_at, |
200 | | - ) |
201 | | - ) |
| 174 | + # Group by workflow and commit SHA |
| 175 | + workflow_commits_data = {} |
| 176 | + for row in result.result_rows: |
| 177 | + ( |
| 178 | + workflow_name, |
| 179 | + head_sha, |
| 180 | + name, |
| 181 | + conclusion, |
| 182 | + status, |
| 183 | + classification_rule, |
| 184 | + created_at, |
| 185 | + ) = row |
| 186 | + |
| 187 | + if workflow_name not in workflow_commits_data: |
| 188 | + workflow_commits_data[workflow_name] = {} |
| 189 | + |
| 190 | + if head_sha not in workflow_commits_data[workflow_name]: |
| 191 | + workflow_commits_data[workflow_name][head_sha] = CommitJobs( |
| 192 | + head_sha=head_sha, created_at=created_at, jobs=[] |
| 193 | + ) |
| 194 | + |
| 195 | + workflow_commits_data[workflow_name][head_sha].jobs.append( |
| 196 | + JobResult( |
| 197 | + head_sha=head_sha, |
| 198 | + name=name, |
| 199 | + conclusion=conclusion, |
| 200 | + status=status, |
| 201 | + classification_rule=classification_rule or "", |
| 202 | + workflow_created_at=created_at, |
| 203 | + ) |
| 204 | + ) |
202 | 205 |
|
203 | 206 | # Sort and cache results per workflow |
204 | 207 | for workflow_name, commits_data in workflow_commits_data.items(): |
@@ -230,14 +233,16 @@ def _fetch_commit_history(self): |
230 | 233 | ORDER BY timestamp DESC |
231 | 234 | """ |
232 | 235 |
|
233 | | - result = CHCliFactory().client.query( |
234 | | - query, parameters={"lookback_time": lookback_time} |
235 | | - ) |
| 236 | + for attempt in RetryWithBackoff(): |
| 237 | + with attempt: |
| 238 | + result = CHCliFactory().client.query( |
| 239 | + query, parameters={"lookback_time": lookback_time} |
| 240 | + ) |
236 | 241 |
|
237 | | - return [ |
238 | | - {"sha": row[0], "message": row[1], "timestamp": row[2]} |
239 | | - for row in result.result_rows |
240 | | - ] |
| 242 | + return [ |
| 243 | + {"sha": row[0], "message": row[1], "timestamp": row[2]} |
| 244 | + for row in result.result_rows |
| 245 | + ] |
241 | 246 |
|
242 | 247 | def _find_last_commit_with_job( |
243 | 248 | self, commits: Iterable[CommitJobs], job_name: str |
@@ -472,18 +477,20 @@ def _fetch_single_commit_jobs( |
472 | 477 | we = "workflow_dispatch" if restarted_only else "workflow_dispatch" |
473 | 478 | # Note: for non-restarted we exclude workflow_dispatch via != in WHERE above |
474 | 479 |
|
475 | | - result = CHCliFactory().client.query( |
476 | | - query, |
477 | | - parameters={ |
478 | | - "workflow_name": workflow_name, |
479 | | - "head_sha": head_sha, |
480 | | - "we": we, |
481 | | - "hb": hb, |
482 | | - "lookback_time": lookback_time, |
483 | | - }, |
484 | | - ) |
| 480 | + for attempt in RetryWithBackoff(): |
| 481 | + with attempt: |
| 482 | + result = CHCliFactory().client.query( |
| 483 | + query, |
| 484 | + parameters={ |
| 485 | + "workflow_name": workflow_name, |
| 486 | + "head_sha": head_sha, |
| 487 | + "we": we, |
| 488 | + "hb": hb, |
| 489 | + "lookback_time": lookback_time, |
| 490 | + }, |
| 491 | + ) |
485 | 492 |
|
486 | | - rows = list(result.result_rows) |
| 493 | + rows = list(result.result_rows) |
487 | 494 | if not rows: |
488 | 495 | return None |
489 | 496 |
|
@@ -644,24 +651,27 @@ def extract_revert_categories_batch(self, messages: List[str]) -> Dict[str, str] |
644 | 651 | FROM issue_comment |
645 | 652 | WHERE id IN {comment_ids:Array(Int64)} |
646 | 653 | """ |
647 | | - result = CHCliFactory().client.query( |
648 | | - query, parameters={"comment_ids": comment_ids} |
649 | | - ) |
650 | 654 |
|
651 | | - for row in result.result_rows: |
652 | | - comment_id, body = row |
653 | | - # Look for -c flag in comment body |
654 | | - match = re.search(r"-c\s+(\w+)", body) |
655 | | - if match: |
656 | | - category = match.group(1).lower() |
657 | | - if category in [ |
658 | | - "nosignal", |
659 | | - "ignoredsignal", |
660 | | - "landrace", |
661 | | - "weird", |
662 | | - "ghfirst", |
663 | | - ]: |
664 | | - comment_id_to_category[comment_id] = category |
| 655 | + for attempt in RetryWithBackoff(): |
| 656 | + with attempt: |
| 657 | + result = CHCliFactory().client.query( |
| 658 | + query, parameters={"comment_ids": comment_ids} |
| 659 | + ) |
| 660 | + |
| 661 | + for row in result.result_rows: |
| 662 | + comment_id, body = row |
| 663 | + # Look for -c flag in comment body |
| 664 | + match = re.search(r"-c\s+(\w+)", body) |
| 665 | + if match: |
| 666 | + category = match.group(1).lower() |
| 667 | + if category in [ |
| 668 | + "nosignal", |
| 669 | + "ignoredsignal", |
| 670 | + "landrace", |
| 671 | + "weird", |
| 672 | + "ghfirst", |
| 673 | + ]: |
| 674 | + comment_id_to_category[comment_id] = category |
665 | 675 | except Exception: |
666 | 676 | # If query fails, continue without error |
667 | 677 | pass |
|
0 commit comments