flake-update-20260505
  1#!/usr/bin/env python3
  2from __future__ import annotations
  3
  4import argparse
  5import json
  6import re
  7import subprocess
  8import sys
  9from pathlib import Path
 10from shutil import which
 11from typing import Any, Iterable, Sequence
 12
 13FAILURE_CONCLUSIONS = {
 14    "failure",
 15    "cancelled",
 16    "timed_out",
 17    "action_required",
 18}
 19
 20FAILURE_STATES = {
 21    "failure",
 22    "error",
 23    "cancelled",
 24    "timed_out",
 25    "action_required",
 26}
 27
 28FAILURE_BUCKETS = {"fail"}
 29
 30FAILURE_MARKERS = (
 31    "error",
 32    "fail",
 33    "failed",
 34    "traceback",
 35    "exception",
 36    "assert",
 37    "panic",
 38    "fatal",
 39    "timeout",
 40    "segmentation fault",
 41)
 42
 43DEFAULT_MAX_LINES = 160
 44DEFAULT_CONTEXT_LINES = 30
 45PENDING_LOG_MARKERS = (
 46    "still in progress",
 47    "log will be available when it is complete",
 48)
 49
 50
 51class GhResult:
 52    def __init__(self, returncode: int, stdout: str, stderr: str):
 53        self.returncode = returncode
 54        self.stdout = stdout
 55        self.stderr = stderr
 56
 57
 58def run_gh_command(args: Sequence[str], cwd: Path) -> GhResult:
 59    process = subprocess.run(
 60        ["gh", *args],
 61        cwd=cwd,
 62        text=True,
 63        capture_output=True,
 64    )
 65    return GhResult(process.returncode, process.stdout, process.stderr)
 66
 67
 68def run_gh_command_raw(args: Sequence[str], cwd: Path) -> tuple[int, bytes, str]:
 69    process = subprocess.run(
 70        ["gh", *args],
 71        cwd=cwd,
 72        capture_output=True,
 73    )
 74    stderr = process.stderr.decode(errors="replace")
 75    return process.returncode, process.stdout, stderr
 76
 77
 78def parse_args() -> argparse.Namespace:
 79    parser = argparse.ArgumentParser(
 80        description=(
 81            "Inspect failing GitHub PR checks, fetch GitHub Actions logs, and extract a "
 82            "failure snippet."
 83        ),
 84        formatter_class=argparse.ArgumentDefaultsHelpFormatter,
 85    )
 86    parser.add_argument(
 87        "--repo", default=".", help="Path inside the target Git repository."
 88    )
 89    parser.add_argument(
 90        "--pr", default=None, help="PR number or URL (defaults to current branch PR)."
 91    )
 92    parser.add_argument("--max-lines", type=int, default=DEFAULT_MAX_LINES)
 93    parser.add_argument("--context", type=int, default=DEFAULT_CONTEXT_LINES)
 94    parser.add_argument(
 95        "--json", action="store_true", help="Emit JSON instead of text output."
 96    )
 97    return parser.parse_args()
 98
 99
100def main() -> int:
101    args = parse_args()
102    repo_root = find_git_root(Path(args.repo))
103    if repo_root is None:
104        print("Error: not inside a Git repository.", file=sys.stderr)
105        return 1
106
107    if not ensure_gh_available(repo_root):
108        return 1
109
110    pr_value = resolve_pr(args.pr, repo_root)
111    if pr_value is None:
112        return 1
113
114    checks = fetch_checks(pr_value, repo_root)
115    if checks is None:
116        return 1
117
118    failing = [c for c in checks if is_failing(c)]
119    if not failing:
120        print(f"PR #{pr_value}: no failing checks detected.")
121        return 0
122
123    results = []
124    for check in failing:
125        results.append(
126            analyze_check(
127                check,
128                repo_root=repo_root,
129                max_lines=max(1, args.max_lines),
130                context=max(1, args.context),
131            )
132        )
133
134    if args.json:
135        print(json.dumps({"pr": pr_value, "results": results}, indent=2))
136    else:
137        render_results(pr_value, results)
138
139    return 1
140
141
142def find_git_root(start: Path) -> Path | None:
143    result = subprocess.run(
144        ["git", "rev-parse", "--show-toplevel"],
145        cwd=start,
146        text=True,
147        capture_output=True,
148    )
149    if result.returncode != 0:
150        return None
151    return Path(result.stdout.strip())
152
153
154def ensure_gh_available(repo_root: Path) -> bool:
155    if which("gh") is None:
156        print("Error: gh is not installed or not on PATH.", file=sys.stderr)
157        return False
158    result = run_gh_command(["auth", "status"], cwd=repo_root)
159    if result.returncode == 0:
160        return True
161    message = (result.stderr or result.stdout or "").strip()
162    print(message or "Error: gh not authenticated.", file=sys.stderr)
163    return False
164
165
166def resolve_pr(pr_value: str | None, repo_root: Path) -> str | None:
167    if pr_value:
168        return pr_value
169    result = run_gh_command(["pr", "view", "--json", "number"], cwd=repo_root)
170    if result.returncode != 0:
171        message = (result.stderr or result.stdout or "").strip()
172        print(message or "Error: unable to resolve PR.", file=sys.stderr)
173        return None
174    try:
175        data = json.loads(result.stdout or "{}")
176    except json.JSONDecodeError:
177        print("Error: unable to parse PR JSON.", file=sys.stderr)
178        return None
179    number = data.get("number")
180    if not number:
181        print("Error: no PR number found.", file=sys.stderr)
182        return None
183    return str(number)
184
185
186def fetch_checks(pr_value: str, repo_root: Path) -> list[dict[str, Any]] | None:
187    primary_fields = [
188        "name",
189        "state",
190        "conclusion",
191        "detailsUrl",
192        "startedAt",
193        "completedAt",
194    ]
195    result = run_gh_command(
196        ["pr", "checks", pr_value, "--json", ",".join(primary_fields)],
197        cwd=repo_root,
198    )
199    if result.returncode != 0:
200        message = "\n".join(filter(None, [result.stderr, result.stdout])).strip()
201        available_fields = parse_available_fields(message)
202        if available_fields:
203            fallback_fields = [
204                "name",
205                "state",
206                "bucket",
207                "link",
208                "startedAt",
209                "completedAt",
210                "workflow",
211            ]
212            selected_fields = [
213                field for field in fallback_fields if field in available_fields
214            ]
215            if not selected_fields:
216                print(
217                    "Error: no usable fields available for gh pr checks.",
218                    file=sys.stderr,
219                )
220                return None
221            result = run_gh_command(
222                ["pr", "checks", pr_value, "--json", ",".join(selected_fields)],
223                cwd=repo_root,
224            )
225            if result.returncode != 0:
226                message = (result.stderr or result.stdout or "").strip()
227                print(message or "Error: gh pr checks failed.", file=sys.stderr)
228                return None
229        else:
230            print(message or "Error: gh pr checks failed.", file=sys.stderr)
231            return None
232    try:
233        data = json.loads(result.stdout or "[]")
234    except json.JSONDecodeError:
235        print("Error: unable to parse checks JSON.", file=sys.stderr)
236        return None
237    if not isinstance(data, list):
238        print("Error: unexpected checks JSON shape.", file=sys.stderr)
239        return None
240    return data
241
242
243def is_failing(check: dict[str, Any]) -> bool:
244    conclusion = normalize_field(check.get("conclusion"))
245    if conclusion in FAILURE_CONCLUSIONS:
246        return True
247    state = normalize_field(check.get("state") or check.get("status"))
248    if state in FAILURE_STATES:
249        return True
250    bucket = normalize_field(check.get("bucket"))
251    return bucket in FAILURE_BUCKETS
252
253
254def analyze_check(
255    check: dict[str, Any],
256    repo_root: Path,
257    max_lines: int,
258    context: int,
259) -> dict[str, Any]:
260    url = check.get("detailsUrl") or check.get("link") or ""
261    run_id = extract_run_id(url)
262    job_id = extract_job_id(url)
263    base: dict[str, Any] = {
264        "name": check.get("name", ""),
265        "detailsUrl": url,
266        "runId": run_id,
267        "jobId": job_id,
268    }
269
270    if run_id is None:
271        base["status"] = "external"
272        base["note"] = "No GitHub Actions run id detected in detailsUrl."
273        return base
274
275    metadata = fetch_run_metadata(run_id, repo_root)
276    log_text, log_error, log_status = fetch_check_log(
277        run_id=run_id,
278        job_id=job_id,
279        repo_root=repo_root,
280    )
281
282    if log_status == "pending":
283        base["status"] = "log_pending"
284        base["note"] = log_error or "Logs are not available yet."
285        if metadata:
286            base["run"] = metadata
287        return base
288
289    if log_error:
290        base["status"] = "log_unavailable"
291        base["error"] = log_error
292        if metadata:
293            base["run"] = metadata
294        return base
295
296    snippet = extract_failure_snippet(log_text, max_lines=max_lines, context=context)
297    base["status"] = "ok"
298    base["run"] = metadata or {}
299    base["logSnippet"] = snippet
300    base["logTail"] = tail_lines(log_text, max_lines)
301    return base
302
303
304def extract_run_id(url: str) -> str | None:
305    if not url:
306        return None
307    for pattern in (r"/actions/runs/(\d+)", r"/runs/(\d+)"):
308        match = re.search(pattern, url)
309        if match:
310            return match.group(1)
311    return None
312
313
314def extract_job_id(url: str) -> str | None:
315    if not url:
316        return None
317    match = re.search(r"/actions/runs/\d+/job/(\d+)", url)
318    if match:
319        return match.group(1)
320    match = re.search(r"/job/(\d+)", url)
321    if match:
322        return match.group(1)
323    return None
324
325
326def fetch_run_metadata(run_id: str, repo_root: Path) -> dict[str, Any] | None:
327    fields = [
328        "conclusion",
329        "status",
330        "workflowName",
331        "name",
332        "event",
333        "headBranch",
334        "headSha",
335        "url",
336    ]
337    result = run_gh_command(
338        ["run", "view", run_id, "--json", ",".join(fields)], cwd=repo_root
339    )
340    if result.returncode != 0:
341        return None
342    try:
343        data = json.loads(result.stdout or "{}")
344    except json.JSONDecodeError:
345        return None
346    if not isinstance(data, dict):
347        return None
348    return data
349
350
351def fetch_check_log(
352    run_id: str,
353    job_id: str | None,
354    repo_root: Path,
355) -> tuple[str, str, str]:
356    log_text, log_error = fetch_run_log(run_id, repo_root)
357    if not log_error:
358        return log_text, "", "ok"
359
360    if is_log_pending_message(log_error) and job_id:
361        job_log, job_error = fetch_job_log(job_id, repo_root)
362        if job_log:
363            return job_log, "", "ok"
364        if job_error and is_log_pending_message(job_error):
365            return "", job_error, "pending"
366        if job_error:
367            return "", job_error, "error"
368        return "", log_error, "pending"
369
370    if is_log_pending_message(log_error):
371        return "", log_error, "pending"
372
373    return "", log_error, "error"
374
375
376def fetch_run_log(run_id: str, repo_root: Path) -> tuple[str, str]:
377    result = run_gh_command(["run", "view", run_id, "--log"], cwd=repo_root)
378    if result.returncode != 0:
379        error = (result.stderr or result.stdout or "").strip()
380        return "", error or "gh run view failed"
381    return result.stdout, ""
382
383
384def fetch_job_log(job_id: str, repo_root: Path) -> tuple[str, str]:
385    repo_slug = fetch_repo_slug(repo_root)
386    if not repo_slug:
387        return "", "Error: unable to resolve repository name for job logs."
388    endpoint = f"/repos/{repo_slug}/actions/jobs/{job_id}/logs"
389    returncode, stdout_bytes, stderr = run_gh_command_raw(
390        ["api", endpoint], cwd=repo_root
391    )
392    if returncode != 0:
393        message = (stderr or stdout_bytes.decode(errors="replace")).strip()
394        return "", message or "gh api job logs failed"
395    if is_zip_payload(stdout_bytes):
396        return "", "Job logs returned a zip archive; unable to parse."
397    return stdout_bytes.decode(errors="replace"), ""
398
399
400def fetch_repo_slug(repo_root: Path) -> str | None:
401    result = run_gh_command(["repo", "view", "--json", "nameWithOwner"], cwd=repo_root)
402    if result.returncode != 0:
403        return None
404    try:
405        data = json.loads(result.stdout or "{}")
406    except json.JSONDecodeError:
407        return None
408    name_with_owner = data.get("nameWithOwner")
409    if not name_with_owner:
410        return None
411    return str(name_with_owner)
412
413
414def normalize_field(value: Any) -> str:
415    if value is None:
416        return ""
417    return str(value).strip().lower()
418
419
420def parse_available_fields(message: str) -> list[str]:
421    if "Available fields:" not in message:
422        return []
423    fields: list[str] = []
424    collecting = False
425    for line in message.splitlines():
426        if "Available fields:" in line:
427            collecting = True
428            continue
429        if not collecting:
430            continue
431        field = line.strip()
432        if not field:
433            continue
434        fields.append(field)
435    return fields
436
437
438def is_log_pending_message(message: str) -> bool:
439    lowered = message.lower()
440    return any(marker in lowered for marker in PENDING_LOG_MARKERS)
441
442
443def is_zip_payload(payload: bytes) -> bool:
444    return payload.startswith(b"PK")
445
446
447def extract_failure_snippet(log_text: str, max_lines: int, context: int) -> str:
448    lines = log_text.splitlines()
449    if not lines:
450        return ""
451
452    marker_index = find_failure_index(lines)
453    if marker_index is None:
454        return "\n".join(lines[-max_lines:])
455
456    start = max(0, marker_index - context)
457    end = min(len(lines), marker_index + context)
458    window = lines[start:end]
459    if len(window) > max_lines:
460        window = window[-max_lines:]
461    return "\n".join(window)
462
463
464def find_failure_index(lines: Sequence[str]) -> int | None:
465    for idx in range(len(lines) - 1, -1, -1):
466        lowered = lines[idx].lower()
467        if any(marker in lowered for marker in FAILURE_MARKERS):
468            return idx
469    return None
470
471
472def tail_lines(text: str, max_lines: int) -> str:
473    if max_lines <= 0:
474        return ""
475    lines = text.splitlines()
476    return "\n".join(lines[-max_lines:])
477
478
479def render_results(pr_number: str, results: Iterable[dict[str, Any]]) -> None:
480    results_list = list(results)
481    print(f"PR #{pr_number}: {len(results_list)} failing checks analyzed.")
482    for result in results_list:
483        print("-" * 60)
484        print(f"Check: {result.get('name', '')}")
485        if result.get("detailsUrl"):
486            print(f"Details: {result['detailsUrl']}")
487        run_id = result.get("runId")
488        if run_id:
489            print(f"Run ID: {run_id}")
490        job_id = result.get("jobId")
491        if job_id:
492            print(f"Job ID: {job_id}")
493        status = result.get("status", "unknown")
494        print(f"Status: {status}")
495
496        run_meta = result.get("run", {})
497        if run_meta:
498            branch = run_meta.get("headBranch", "")
499            sha = (run_meta.get("headSha") or "")[:12]
500            workflow = run_meta.get("workflowName") or run_meta.get("name") or ""
501            conclusion = run_meta.get("conclusion") or run_meta.get("status") or ""
502            print(f"Workflow: {workflow} ({conclusion})")
503            if branch or sha:
504                print(f"Branch/SHA: {branch} {sha}")
505            if run_meta.get("url"):
506                print(f"Run URL: {run_meta['url']}")
507
508        if result.get("note"):
509            print(f"Note: {result['note']}")
510
511        if result.get("error"):
512            print(f"Error fetching logs: {result['error']}")
513            continue
514
515        snippet = result.get("logSnippet") or ""
516        if snippet:
517            print("Failure snippet:")
518            print(indent_block(snippet, prefix="  "))
519        else:
520            print("No snippet available.")
521    print("-" * 60)
522
523
524def indent_block(text: str, prefix: str = "  ") -> str:
525    return "\n".join(f"{prefix}{line}" for line in text.splitlines())
526
527
528if __name__ == "__main__":
529    raise SystemExit(main())