main
1#!/usr/bin/env python3
2from __future__ import annotations
3
4import argparse
5import json
6import re
7import subprocess
8import sys
9from pathlib import Path
10from shutil import which
11from typing import Any, Iterable, Sequence
12
13FAILURE_CONCLUSIONS = {
14 "failure",
15 "cancelled",
16 "timed_out",
17 "action_required",
18}
19
20FAILURE_STATES = {
21 "failure",
22 "error",
23 "cancelled",
24 "timed_out",
25 "action_required",
26}
27
28FAILURE_BUCKETS = {"fail"}
29
30FAILURE_MARKERS = (
31 "error",
32 "fail",
33 "failed",
34 "traceback",
35 "exception",
36 "assert",
37 "panic",
38 "fatal",
39 "timeout",
40 "segmentation fault",
41)
42
43DEFAULT_MAX_LINES = 160
44DEFAULT_CONTEXT_LINES = 30
45PENDING_LOG_MARKERS = (
46 "still in progress",
47 "log will be available when it is complete",
48)
49
50
51class GhResult:
52 def __init__(self, returncode: int, stdout: str, stderr: str):
53 self.returncode = returncode
54 self.stdout = stdout
55 self.stderr = stderr
56
57
58def run_gh_command(args: Sequence[str], cwd: Path) -> GhResult:
59 process = subprocess.run(
60 ["gh", *args],
61 cwd=cwd,
62 text=True,
63 capture_output=True,
64 )
65 return GhResult(process.returncode, process.stdout, process.stderr)
66
67
68def run_gh_command_raw(args: Sequence[str], cwd: Path) -> tuple[int, bytes, str]:
69 process = subprocess.run(
70 ["gh", *args],
71 cwd=cwd,
72 capture_output=True,
73 )
74 stderr = process.stderr.decode(errors="replace")
75 return process.returncode, process.stdout, stderr
76
77
78def parse_args() -> argparse.Namespace:
79 parser = argparse.ArgumentParser(
80 description=(
81 "Inspect failing GitHub PR checks, fetch GitHub Actions logs, and extract a "
82 "failure snippet."
83 ),
84 formatter_class=argparse.ArgumentDefaultsHelpFormatter,
85 )
86 parser.add_argument(
87 "--repo", default=".", help="Path inside the target Git repository."
88 )
89 parser.add_argument(
90 "--pr", default=None, help="PR number or URL (defaults to current branch PR)."
91 )
92 parser.add_argument("--max-lines", type=int, default=DEFAULT_MAX_LINES)
93 parser.add_argument("--context", type=int, default=DEFAULT_CONTEXT_LINES)
94 parser.add_argument(
95 "--json", action="store_true", help="Emit JSON instead of text output."
96 )
97 return parser.parse_args()
98
99
100def main() -> int:
101 args = parse_args()
102 repo_root = find_git_root(Path(args.repo))
103 if repo_root is None:
104 print("Error: not inside a Git repository.", file=sys.stderr)
105 return 1
106
107 if not ensure_gh_available(repo_root):
108 return 1
109
110 pr_value = resolve_pr(args.pr, repo_root)
111 if pr_value is None:
112 return 1
113
114 checks = fetch_checks(pr_value, repo_root)
115 if checks is None:
116 return 1
117
118 failing = [c for c in checks if is_failing(c)]
119 if not failing:
120 print(f"PR #{pr_value}: no failing checks detected.")
121 return 0
122
123 results = []
124 for check in failing:
125 results.append(
126 analyze_check(
127 check,
128 repo_root=repo_root,
129 max_lines=max(1, args.max_lines),
130 context=max(1, args.context),
131 )
132 )
133
134 if args.json:
135 print(json.dumps({"pr": pr_value, "results": results}, indent=2))
136 else:
137 render_results(pr_value, results)
138
139 return 1
140
141
142def find_git_root(start: Path) -> Path | None:
143 result = subprocess.run(
144 ["git", "rev-parse", "--show-toplevel"],
145 cwd=start,
146 text=True,
147 capture_output=True,
148 )
149 if result.returncode != 0:
150 return None
151 return Path(result.stdout.strip())
152
153
154def ensure_gh_available(repo_root: Path) -> bool:
155 if which("gh") is None:
156 print("Error: gh is not installed or not on PATH.", file=sys.stderr)
157 return False
158 result = run_gh_command(["auth", "status"], cwd=repo_root)
159 if result.returncode == 0:
160 return True
161 message = (result.stderr or result.stdout or "").strip()
162 print(message or "Error: gh not authenticated.", file=sys.stderr)
163 return False
164
165
166def resolve_pr(pr_value: str | None, repo_root: Path) -> str | None:
167 if pr_value:
168 return pr_value
169 result = run_gh_command(["pr", "view", "--json", "number"], cwd=repo_root)
170 if result.returncode != 0:
171 message = (result.stderr or result.stdout or "").strip()
172 print(message or "Error: unable to resolve PR.", file=sys.stderr)
173 return None
174 try:
175 data = json.loads(result.stdout or "{}")
176 except json.JSONDecodeError:
177 print("Error: unable to parse PR JSON.", file=sys.stderr)
178 return None
179 number = data.get("number")
180 if not number:
181 print("Error: no PR number found.", file=sys.stderr)
182 return None
183 return str(number)
184
185
186def fetch_checks(pr_value: str, repo_root: Path) -> list[dict[str, Any]] | None:
187 primary_fields = [
188 "name",
189 "state",
190 "conclusion",
191 "detailsUrl",
192 "startedAt",
193 "completedAt",
194 ]
195 result = run_gh_command(
196 ["pr", "checks", pr_value, "--json", ",".join(primary_fields)],
197 cwd=repo_root,
198 )
199 if result.returncode != 0:
200 message = "\n".join(filter(None, [result.stderr, result.stdout])).strip()
201 available_fields = parse_available_fields(message)
202 if available_fields:
203 fallback_fields = [
204 "name",
205 "state",
206 "bucket",
207 "link",
208 "startedAt",
209 "completedAt",
210 "workflow",
211 ]
212 selected_fields = [
213 field for field in fallback_fields if field in available_fields
214 ]
215 if not selected_fields:
216 print(
217 "Error: no usable fields available for gh pr checks.",
218 file=sys.stderr,
219 )
220 return None
221 result = run_gh_command(
222 ["pr", "checks", pr_value, "--json", ",".join(selected_fields)],
223 cwd=repo_root,
224 )
225 if result.returncode != 0:
226 message = (result.stderr or result.stdout or "").strip()
227 print(message or "Error: gh pr checks failed.", file=sys.stderr)
228 return None
229 else:
230 print(message or "Error: gh pr checks failed.", file=sys.stderr)
231 return None
232 try:
233 data = json.loads(result.stdout or "[]")
234 except json.JSONDecodeError:
235 print("Error: unable to parse checks JSON.", file=sys.stderr)
236 return None
237 if not isinstance(data, list):
238 print("Error: unexpected checks JSON shape.", file=sys.stderr)
239 return None
240 return data
241
242
243def is_failing(check: dict[str, Any]) -> bool:
244 conclusion = normalize_field(check.get("conclusion"))
245 if conclusion in FAILURE_CONCLUSIONS:
246 return True
247 state = normalize_field(check.get("state") or check.get("status"))
248 if state in FAILURE_STATES:
249 return True
250 bucket = normalize_field(check.get("bucket"))
251 return bucket in FAILURE_BUCKETS
252
253
254def analyze_check(
255 check: dict[str, Any],
256 repo_root: Path,
257 max_lines: int,
258 context: int,
259) -> dict[str, Any]:
260 url = check.get("detailsUrl") or check.get("link") or ""
261 run_id = extract_run_id(url)
262 job_id = extract_job_id(url)
263 base: dict[str, Any] = {
264 "name": check.get("name", ""),
265 "detailsUrl": url,
266 "runId": run_id,
267 "jobId": job_id,
268 }
269
270 if run_id is None:
271 base["status"] = "external"
272 base["note"] = "No GitHub Actions run id detected in detailsUrl."
273 return base
274
275 metadata = fetch_run_metadata(run_id, repo_root)
276 log_text, log_error, log_status = fetch_check_log(
277 run_id=run_id,
278 job_id=job_id,
279 repo_root=repo_root,
280 )
281
282 if log_status == "pending":
283 base["status"] = "log_pending"
284 base["note"] = log_error or "Logs are not available yet."
285 if metadata:
286 base["run"] = metadata
287 return base
288
289 if log_error:
290 base["status"] = "log_unavailable"
291 base["error"] = log_error
292 if metadata:
293 base["run"] = metadata
294 return base
295
296 snippet = extract_failure_snippet(log_text, max_lines=max_lines, context=context)
297 base["status"] = "ok"
298 base["run"] = metadata or {}
299 base["logSnippet"] = snippet
300 base["logTail"] = tail_lines(log_text, max_lines)
301 return base
302
303
304def extract_run_id(url: str) -> str | None:
305 if not url:
306 return None
307 for pattern in (r"/actions/runs/(\d+)", r"/runs/(\d+)"):
308 match = re.search(pattern, url)
309 if match:
310 return match.group(1)
311 return None
312
313
314def extract_job_id(url: str) -> str | None:
315 if not url:
316 return None
317 match = re.search(r"/actions/runs/\d+/job/(\d+)", url)
318 if match:
319 return match.group(1)
320 match = re.search(r"/job/(\d+)", url)
321 if match:
322 return match.group(1)
323 return None
324
325
326def fetch_run_metadata(run_id: str, repo_root: Path) -> dict[str, Any] | None:
327 fields = [
328 "conclusion",
329 "status",
330 "workflowName",
331 "name",
332 "event",
333 "headBranch",
334 "headSha",
335 "url",
336 ]
337 result = run_gh_command(
338 ["run", "view", run_id, "--json", ",".join(fields)], cwd=repo_root
339 )
340 if result.returncode != 0:
341 return None
342 try:
343 data = json.loads(result.stdout or "{}")
344 except json.JSONDecodeError:
345 return None
346 if not isinstance(data, dict):
347 return None
348 return data
349
350
351def fetch_check_log(
352 run_id: str,
353 job_id: str | None,
354 repo_root: Path,
355) -> tuple[str, str, str]:
356 log_text, log_error = fetch_run_log(run_id, repo_root)
357 if not log_error:
358 return log_text, "", "ok"
359
360 if is_log_pending_message(log_error) and job_id:
361 job_log, job_error = fetch_job_log(job_id, repo_root)
362 if job_log:
363 return job_log, "", "ok"
364 if job_error and is_log_pending_message(job_error):
365 return "", job_error, "pending"
366 if job_error:
367 return "", job_error, "error"
368 return "", log_error, "pending"
369
370 if is_log_pending_message(log_error):
371 return "", log_error, "pending"
372
373 return "", log_error, "error"
374
375
376def fetch_run_log(run_id: str, repo_root: Path) -> tuple[str, str]:
377 result = run_gh_command(["run", "view", run_id, "--log"], cwd=repo_root)
378 if result.returncode != 0:
379 error = (result.stderr or result.stdout or "").strip()
380 return "", error or "gh run view failed"
381 return result.stdout, ""
382
383
384def fetch_job_log(job_id: str, repo_root: Path) -> tuple[str, str]:
385 repo_slug = fetch_repo_slug(repo_root)
386 if not repo_slug:
387 return "", "Error: unable to resolve repository name for job logs."
388 endpoint = f"/repos/{repo_slug}/actions/jobs/{job_id}/logs"
389 returncode, stdout_bytes, stderr = run_gh_command_raw(
390 ["api", endpoint], cwd=repo_root
391 )
392 if returncode != 0:
393 message = (stderr or stdout_bytes.decode(errors="replace")).strip()
394 return "", message or "gh api job logs failed"
395 if is_zip_payload(stdout_bytes):
396 return "", "Job logs returned a zip archive; unable to parse."
397 return stdout_bytes.decode(errors="replace"), ""
398
399
400def fetch_repo_slug(repo_root: Path) -> str | None:
401 result = run_gh_command(["repo", "view", "--json", "nameWithOwner"], cwd=repo_root)
402 if result.returncode != 0:
403 return None
404 try:
405 data = json.loads(result.stdout or "{}")
406 except json.JSONDecodeError:
407 return None
408 name_with_owner = data.get("nameWithOwner")
409 if not name_with_owner:
410 return None
411 return str(name_with_owner)
412
413
414def normalize_field(value: Any) -> str:
415 if value is None:
416 return ""
417 return str(value).strip().lower()
418
419
420def parse_available_fields(message: str) -> list[str]:
421 if "Available fields:" not in message:
422 return []
423 fields: list[str] = []
424 collecting = False
425 for line in message.splitlines():
426 if "Available fields:" in line:
427 collecting = True
428 continue
429 if not collecting:
430 continue
431 field = line.strip()
432 if not field:
433 continue
434 fields.append(field)
435 return fields
436
437
438def is_log_pending_message(message: str) -> bool:
439 lowered = message.lower()
440 return any(marker in lowered for marker in PENDING_LOG_MARKERS)
441
442
443def is_zip_payload(payload: bytes) -> bool:
444 return payload.startswith(b"PK")
445
446
447def extract_failure_snippet(log_text: str, max_lines: int, context: int) -> str:
448 lines = log_text.splitlines()
449 if not lines:
450 return ""
451
452 marker_index = find_failure_index(lines)
453 if marker_index is None:
454 return "\n".join(lines[-max_lines:])
455
456 start = max(0, marker_index - context)
457 end = min(len(lines), marker_index + context)
458 window = lines[start:end]
459 if len(window) > max_lines:
460 window = window[-max_lines:]
461 return "\n".join(window)
462
463
464def find_failure_index(lines: Sequence[str]) -> int | None:
465 for idx in range(len(lines) - 1, -1, -1):
466 lowered = lines[idx].lower()
467 if any(marker in lowered for marker in FAILURE_MARKERS):
468 return idx
469 return None
470
471
472def tail_lines(text: str, max_lines: int) -> str:
473 if max_lines <= 0:
474 return ""
475 lines = text.splitlines()
476 return "\n".join(lines[-max_lines:])
477
478
479def render_results(pr_number: str, results: Iterable[dict[str, Any]]) -> None:
480 results_list = list(results)
481 print(f"PR #{pr_number}: {len(results_list)} failing checks analyzed.")
482 for result in results_list:
483 print("-" * 60)
484 print(f"Check: {result.get('name', '')}")
485 if result.get("detailsUrl"):
486 print(f"Details: {result['detailsUrl']}")
487 run_id = result.get("runId")
488 if run_id:
489 print(f"Run ID: {run_id}")
490 job_id = result.get("jobId")
491 if job_id:
492 print(f"Job ID: {job_id}")
493 status = result.get("status", "unknown")
494 print(f"Status: {status}")
495
496 run_meta = result.get("run", {})
497 if run_meta:
498 branch = run_meta.get("headBranch", "")
499 sha = (run_meta.get("headSha") or "")[:12]
500 workflow = run_meta.get("workflowName") or run_meta.get("name") or ""
501 conclusion = run_meta.get("conclusion") or run_meta.get("status") or ""
502 print(f"Workflow: {workflow} ({conclusion})")
503 if branch or sha:
504 print(f"Branch/SHA: {branch} {sha}")
505 if run_meta.get("url"):
506 print(f"Run URL: {run_meta['url']}")
507
508 if result.get("note"):
509 print(f"Note: {result['note']}")
510
511 if result.get("error"):
512 print(f"Error fetching logs: {result['error']}")
513 continue
514
515 snippet = result.get("logSnippet") or ""
516 if snippet:
517 print("Failure snippet:")
518 print(indent_block(snippet, prefix=" "))
519 else:
520 print("No snippet available.")
521 print("-" * 60)
522
523
524def indent_block(text: str, prefix: str = " ") -> str:
525 return "\n".join(f"{prefix}{line}" for line in text.splitlines())
526
527
528if __name__ == "__main__":
529 raise SystemExit(main())