main
  1#!/usr/bin/env -S uv run --script
  2# /// script
  3# requires-python = ">=3.11"
  4# dependencies = []
  5# ///
  6"""
  7usage-collect: Collect usage metrics from various sources.
  8
  9Modes:
 10  host    - Collect per-host data (shell, nix, emacs, services, custom tools)
 11  shared  - Collect shared data (pi sessions) — run on one host only
 12  backfill - Process all historical data
 13"""
 14import argparse
 15import json
 16import os
 17import re
 18import socket
 19import struct
 20import subprocess
 21import sys
 22from collections import Counter, defaultdict
 23from datetime import datetime, date, timedelta
 24from pathlib import Path
 25
 26METRICS_DIR = Path(os.environ.get("XDG_DATA_HOME", Path.home() / ".local/share")) / "usage-metrics"
 27HOSTNAME = socket.gethostname()
 28
 29
 30def parse_zsh_history(history_file: Path, target_date: date | None = None) -> dict:
 31    """Parse zsh extended history format: `: timestamp:duration;command`"""
 32    commands = Counter()
 33    total = 0
 34    pattern = re.compile(r"^: (\d+):\d+;(.+)")
 35
 36    if not history_file.exists():
 37        return {"total_commands": 0, "unique_commands": 0, "top_commands": [], "all_commands": {}}
 38
 39    with open(history_file, "r", errors="replace") as f:
 40        for line in f:
 41            m = pattern.match(line)
 42            if not m:
 43                continue
 44            ts, cmd = int(m.group(1)), m.group(2).strip()
 45            cmd_date = date.fromtimestamp(ts)
 46            if target_date and cmd_date != target_date:
 47                continue
 48            # Extract first word as command name
 49            cmd_name = cmd.split()[0] if cmd else ""
 50            # Strip path prefixes
 51            cmd_name = cmd_name.rsplit("/", 1)[-1]
 52            if cmd_name:
 53                commands[cmd_name] += 1
 54                total += 1
 55
 56    top = [{"cmd": c, "count": n} for c, n in commands.most_common(50)]
 57    return {
 58        "total_commands": total,
 59        "unique_commands": len(commands),
 60        "top_commands": top,
 61        "all_commands": dict(commands),
 62    }
 63
 64
 65def parse_zsh_history_range(history_file: Path, start: date, end: date) -> dict[str, dict]:
 66    """Parse zsh history for a date range, returning per-day data."""
 67    daily: dict[str, Counter] = defaultdict(Counter)
 68    daily_total: dict[str, int] = defaultdict(int)
 69    pattern = re.compile(r"^: (\d+):\d+;(.+)")
 70
 71    if not history_file.exists():
 72        return {}
 73
 74    with open(history_file, "r", errors="replace") as f:
 75        for line in f:
 76            m = pattern.match(line)
 77            if not m:
 78                continue
 79            ts, cmd = int(m.group(1)), m.group(2).strip()
 80            cmd_date = date.fromtimestamp(ts)
 81            if cmd_date < start or cmd_date > end:
 82                continue
 83            cmd_name = cmd.split()[0] if cmd else ""
 84            cmd_name = cmd_name.rsplit("/", 1)[-1]
 85            if cmd_name:
 86                daily[str(cmd_date)][cmd_name] += 1
 87                daily_total[str(cmd_date)] += 1
 88
 89    result = {}
 90    for d in daily:
 91        cmds = daily[d]
 92        top = [{"cmd": c, "count": n} for c, n in cmds.most_common(50)]
 93        result[d] = {
 94            "total_commands": daily_total[d],
 95            "unique_commands": len(cmds),
 96            "top_commands": top,
 97            "all_commands": dict(cmds),
 98        }
 99    return result
100
101
102def parse_process_accounting(target_date: date | None = None) -> dict:
103    """Parse process accounting data using lastcomm."""
104    commands = Counter()
105    try:
106        result = subprocess.run(
107            ["lastcomm", "--forwards"],
108            capture_output=True, text=True, timeout=30,
109        )
110        if result.returncode != 0:
111            return {"total_execs": 0, "unique_binaries": 0, "top_binaries": [], "all_binaries": {}}
112
113        for line in result.stdout.splitlines():
114            parts = line.split()
115            if len(parts) < 4:
116                continue
117            cmd_name = parts[0]
118            # lastcomm format varies; date is typically at the end
119            # We'll collect everything if no target_date filtering needed
120            # For date filtering, we'd need to parse the date fields
121            commands[cmd_name] += 1
122    except (FileNotFoundError, subprocess.TimeoutExpired):
123        return {"total_execs": 0, "unique_binaries": 0, "top_binaries": [], "all_binaries": {}}
124
125    top = [{"cmd": c, "count": n} for c, n in commands.most_common(50)]
126    return {
127        "total_execs": sum(commands.values()),
128        "unique_binaries": len(commands),
129        "top_binaries": top,
130        "all_binaries": dict(commands),
131    }
132
133
134def collect_nix_packages(shell_commands: dict, acct_commands: dict) -> dict:
135    """Cross-reference installed bins against used commands."""
136    system_bins = set()
137    bin_dirs = [Path("/run/current-system/sw/bin")]
138
139    nix_profile = Path.home() / ".nix-profile/bin"
140    if nix_profile.exists():
141        bin_dirs.append(nix_profile)
142
143    for d in bin_dirs:
144        if d.exists():
145            system_bins.update(f.name for f in d.iterdir() if f.is_file() or f.is_symlink())
146
147    # Combine all used commands from shell + process accounting
148    used = set(shell_commands.get("all_commands", {}).keys())
149    used |= set(acct_commands.get("all_binaries", {}).keys())
150
151    used_bins = system_bins & used
152    unused_bins = system_bins - used
153
154    return {
155        "total_bins": len(system_bins),
156        "used_count": len(used_bins),
157        "unused_count": len(unused_bins),
158        "used_bins": sorted(used_bins),
159        "unused_bins": sorted(unused_bins),
160    }
161
162
163def collect_emacs_data() -> dict:
164    """Read emacs dump file if available, compare with declared packages."""
165    dump_file = METRICS_DIR / "emacs-dump.json"
166    emacs_nix = Path.home() / "src/home/home/common/dev/emacs.nix"
167
168    # Parse declared packages from emacs.nix
169    declared = []
170    if emacs_nix.exists():
171        in_epkgs = False
172        bracket_depth = 0
173        for line in emacs_nix.read_text().splitlines():
174            stripped = line.strip()
175            if "epkgs: with epkgs;" in line or "epkgs:" in line and "with epkgs;" in line:
176                in_epkgs = True
177                bracket_depth = 0
178                continue
179            if in_epkgs:
180                bracket_depth += stripped.count("[") - stripped.count("]")
181                if bracket_depth <= 0 and "]" in stripped:
182                    in_epkgs = False
183                    continue
184                # Extract package name (skip comments, empty lines, and non-identifiers)
185                if stripped and not stripped.startswith("#") and not stripped.startswith("("):
186                    pkg = stripped.rstrip(",").strip()
187                    # Only include valid package names (alphanumeric + hyphens)
188                    if pkg and re.match(r'^[a-zA-Z][a-zA-Z0-9_-]*$', pkg):
189                        declared.append(pkg)
190
191    # Read emacs dump
192    loaded_features = []
193    command_freq = {}
194    if dump_file.exists():
195        try:
196            data = json.loads(dump_file.read_text())
197            loaded_features = data.get("loaded_features", [])
198            command_freq = data.get("command_frequency", {})
199        except (json.JSONDecodeError, KeyError):
200            pass
201
202    # Normalize: emacs package names use - but features may use _ or -
203    loaded_set = {f.replace("_", "-") for f in loaded_features}
204    unused = [p for p in declared if p.replace("_", "-") not in loaded_set]
205
206    return {
207        "declared_packages": declared,
208        "declared_count": len(declared),
209        "loaded_features": loaded_features,
210        "loaded_count": len(loaded_features),
211        "unused_packages": unused,
212        "unused_count": len(unused),
213        "command_frequency": dict(sorted(command_freq.items(), key=lambda x: -x[1])[:50]) if command_freq else {},
214    }
215
216
217def collect_services() -> dict:
218    """Snapshot running systemd services."""
219    try:
220        result = subprocess.run(
221            ["systemctl", "list-units", "--type=service", "--state=running", "--no-legend", "--no-pager"],
222            capture_output=True, text=True, timeout=10,
223        )
224        services = []
225        for line in result.stdout.splitlines():
226            parts = line.split()
227            if parts:
228                services.append(parts[0].removesuffix(".service"))
229        return {"running": sorted(services), "total": len(services)}
230    except (FileNotFoundError, subprocess.TimeoutExpired):
231        return {"running": [], "total": 0}
232
233
234def collect_custom_tools(shell_commands: dict, acct_commands: dict) -> dict:
235    """Check which custom tools from pkgs/ are actually used."""
236    pkgs_file = Path.home() / "src/home/pkgs/default.nix"
237    tools = []
238
239    if pkgs_file.exists():
240        for line in pkgs_file.read_text().splitlines():
241            m = re.match(r"\s+(\w[\w-]*)\s*=\s*pkgs\.callPackage", line)
242            if m:
243                tools.append(m.group(1))
244
245    used = set(shell_commands.get("all_commands", {}).keys())
246    used |= set(acct_commands.get("all_binaries", {}).keys())
247
248    used_tools = [t for t in tools if t in used]
249    unused_tools = [t for t in tools if t not in used]
250
251    return {
252        "defined": tools,
253        "used": used_tools,
254        "unused": unused_tools,
255    }
256
257
258def collect_pi_sessions(target_date: date | None = None) -> dict:
259    """Parse pi session JSONL files for tool/skill/model usage."""
260    sessions_dir = Path.home() / ".local/share/ai-sync/pi-sessions"
261    if not sessions_dir.exists():
262        # Try alternate location
263        sessions_dir = Path.home() / ".pi/agent/sessions"
264
265    if not sessions_dir.exists():
266        return {"sessions_count": 0, "tools": {}, "skills_loaded": {}, "models_used": {}, "providers_used": {}}
267
268    tools = Counter()
269    skills = Counter()
270    models = Counter()
271    providers = Counter()
272    sessions_count = 0
273
274    watermark_file = METRICS_DIR / ".pi-watermark"
275    watermark = ""
276    if watermark_file.exists() and target_date is None:
277        watermark = watermark_file.read_text().strip()
278
279    latest_file = ""
280
281    for session_dir in sessions_dir.iterdir():
282        if not session_dir.is_dir():
283            continue
284        for jsonl_file in sorted(session_dir.iterdir()):
285            if not jsonl_file.name.endswith(".jsonl"):
286                continue
287
288            # Skip files older than watermark
289            if watermark and jsonl_file.name < watermark:
290                continue
291
292            if jsonl_file.name > latest_file:
293                latest_file = jsonl_file.name
294
295            # Check date from filename: YYYY-MM-DDTHH-MM-SS-...
296            try:
297                file_date = date.fromisoformat(jsonl_file.name[:10])
298            except ValueError:
299                continue
300
301            if target_date and file_date != target_date:
302                continue
303
304            sessions_count += 1
305
306            try:
307                content = jsonl_file.read_text(errors="replace")
308                for line in content.splitlines():
309                    try:
310                        entry = json.loads(line)
311                    except json.JSONDecodeError:
312                        continue
313
314                    entry_type = entry.get("type", "")
315
316                    if entry_type == "model_change":
317                        model = entry.get("modelId", "")
318                        provider = entry.get("provider", "")
319                        if model:
320                            models[model] += 1
321                        if provider:
322                            providers[provider] += 1
323
324                    elif entry_type == "message":
325                        msg = entry.get("message", {})
326                        msg_content = msg.get("content", [])
327                        if isinstance(msg_content, list):
328                            for block in msg_content:
329                                if not isinstance(block, dict):
330                                    continue
331                                # Tool calls from assistant messages
332                                if block.get("type") == "toolCall":
333                                    tool_name = block.get("name", "")
334                                    if tool_name:
335                                        tools[tool_name] += 1
336                                    # Check for skill reads in tool args
337                                    args = block.get("arguments", {})
338                                    if isinstance(args, dict):
339                                        for v in args.values():
340                                            if isinstance(v, str):
341                                                for s in re.findall(r'skills/([^/]+)/SKILL\.md', v):
342                                                    if re.match(r'^[A-Za-z][A-Za-z0-9_-]*$', s):
343                                                        skills[s] += 1
344                                # Text blocks may reference skills
345                                elif block.get("type") == "text":
346                                    text = block.get("text", "")
347                                    for s in re.findall(r'skills/([^/]+)/SKILL\.md', text):
348                                        if re.match(r'^[A-Za-z][A-Za-z0-9_-]*$', s):
349                                            skills[s] += 1
350                        # toolResult messages also have toolName
351                        if msg.get("role") == "toolResult":
352                            tool_name = msg.get("toolName", "")
353                            if tool_name:
354                                tools[tool_name] += 1  # count result too for completeness
355                            # Check tool result content for skill file reads
356                            result_content = msg.get("content", [])
357                            if isinstance(result_content, list):
358                                for block in result_content:
359                                    if isinstance(block, dict):
360                                        text = block.get("text", "")
361                                        if isinstance(text, str):
362                                            for s in re.findall(r'skills/([^/]+)/SKILL\.md', text):
363                                                skills[s] += 1
364
365            except (OSError, IOError):
366                continue
367
368    # Update watermark
369    if latest_file and target_date is None:
370        watermark_file.parent.mkdir(parents=True, exist_ok=True)
371        watermark_file.write_text(latest_file)
372
373    # Find all declared skills
374    skills_dir = Path.home() / ".config/claude/skills"
375    all_skills = []
376    if skills_dir.exists():
377        all_skills = [d.name for d in skills_dir.iterdir() if d.is_dir() and (d / "SKILL.md").exists()]
378
379    # Filter skills to only known skill names
380    all_skills_set = set(all_skills)
381    skills = Counter({k: v for k, v in skills.items() if k in all_skills_set})
382
383    never_used = [s for s in all_skills if s not in skills]
384
385    return {
386        "sessions_count": sessions_count,
387        "tools": dict(tools.most_common()),
388        "skills_loaded": dict(skills.most_common()),
389        "skills_never_used": sorted(never_used),
390        "models_used": dict(models.most_common()),
391        "providers_used": dict(providers.most_common()),
392    }
393
394
395def collect_host(target_date: date) -> dict:
396    """Collect all per-host metrics for a given date."""
397    history_file = Path(os.environ.get("XDG_DATA_HOME", Path.home() / ".local/share")) / "zsh_history"
398
399    shell = parse_zsh_history(history_file, target_date)
400    acct = parse_process_accounting(target_date)
401    nix = collect_nix_packages(shell, acct)
402    emacs = collect_emacs_data()
403    services = collect_services()
404    custom = collect_custom_tools(shell, acct)
405
406    return {
407        "hostname": HOSTNAME,
408        "date": str(target_date),
409        "shell": shell,
410        "process_accounting": acct,
411        "nix_packages": nix,
412        "emacs": emacs,
413        "services": services,
414        "custom_tools": custom,
415    }
416
417
418def collect_shared(target_date: date) -> dict:
419    """Collect shared metrics (pi sessions)."""
420    pi = collect_pi_sessions(target_date)
421    return {
422        "date": str(target_date),
423        "pi": pi,
424    }
425
426
427def write_metrics(data: dict, subdir: str, filename: str):
428    """Write metrics JSON to the appropriate directory."""
429    out_dir = METRICS_DIR / subdir
430    out_dir.mkdir(parents=True, exist_ok=True)
431    out_file = out_dir / filename
432    out_file.write_text(json.dumps(data, indent=2, default=str))
433    print(f"Written: {out_file}")
434
435
436def cmd_host(args):
437    target = args.date or date.today()
438    data = collect_host(target)
439    write_metrics(data, f"hosts/{HOSTNAME}", f"{target}.json")
440
441
442def cmd_shared(args):
443    target = args.date or date.today()
444    data = collect_shared(target)
445    write_metrics(data, "shared", f"{target}.json")
446
447
448def cmd_backfill(args):
449    """Backfill historical data from all sources."""
450    history_file = Path(os.environ.get("XDG_DATA_HOME", Path.home() / ".local/share")) / "zsh_history"
451
452    # Find date range from zsh history
453    pattern = re.compile(r"^: (\d+):\d+;")
454    min_ts = float('inf')
455    max_ts = 0
456
457    if history_file.exists():
458        with open(history_file, "r", errors="replace") as f:
459            for line in f:
460                m = pattern.match(line)
461                if m:
462                    ts = int(m.group(1))
463                    min_ts = min(min_ts, ts)
464                    max_ts = max(max_ts, ts)
465
466    if min_ts == float('inf'):
467        print("No history data found for backfill.")
468        return
469
470    start = date.fromtimestamp(min_ts)
471    end = date.fromtimestamp(max_ts)
472    print(f"Backfilling host data from {start} to {end}...")
473
474    # Parse all history at once for efficiency
475    daily_shell = parse_zsh_history_range(history_file, start, end)
476    acct = parse_process_accounting()  # Can't filter by date easily for backfill
477
478    for day_str, shell_data in sorted(daily_shell.items()):
479        out_file = METRICS_DIR / f"hosts/{HOSTNAME}" / f"{day_str}.json"
480        if out_file.exists() and not args.force:
481            continue
482
483        data = {
484            "hostname": HOSTNAME,
485            "date": day_str,
486            "shell": shell_data,
487            "process_accounting": {"note": "backfilled, no per-day acct data"},
488            "nix_packages": {"note": "snapshot not available for historical dates"},
489            "emacs": {"note": "snapshot not available for historical dates"},
490            "services": {"note": "snapshot not available for historical dates"},
491            "custom_tools": {"note": "snapshot not available for historical dates"},
492        }
493        write_metrics(data, f"hosts/{HOSTNAME}", f"{day_str}.json")
494
495    print(f"Backfilled {len(daily_shell)} days of host data.")
496
497    # Backfill shared (pi sessions)
498    if args.shared:
499        print("Backfilling shared (pi sessions)...")
500        # Collect all pi sessions without date filter, grouped by date
501        sessions_dir = Path.home() / ".local/share/ai-sync/pi-sessions"
502        if not sessions_dir.exists():
503            sessions_dir = Path.home() / ".pi/agent/sessions"
504
505        if sessions_dir.exists():
506            # Group session files by date
507            dates_seen = set()
508            for session_dir in sessions_dir.iterdir():
509                if not session_dir.is_dir():
510                    continue
511                for jsonl_file in session_dir.iterdir():
512                    if jsonl_file.name.endswith(".jsonl"):
513                        try:
514                            file_date = date.fromisoformat(jsonl_file.name[:10])
515                            dates_seen.add(file_date)
516                        except ValueError:
517                            continue
518
519            for d in sorted(dates_seen):
520                out_file = METRICS_DIR / "shared" / f"{d}.json"
521                if out_file.exists() and not args.force:
522                    continue
523                data = collect_shared(d)
524                write_metrics(data, "shared", f"{d}.json")
525
526            print(f"Backfilled {len(dates_seen)} days of shared data.")
527
528
529def parse_date(s: str) -> date:
530    return date.fromisoformat(s)
531
532
533def main():
534    parser = argparse.ArgumentParser(description="Collect usage metrics")
535    sub = parser.add_subparsers(dest="command", required=True)
536
537    host_p = sub.add_parser("host", help="Collect per-host metrics")
538    host_p.add_argument("--date", type=parse_date, default=None, help="Date (YYYY-MM-DD), default today")
539    host_p.set_defaults(func=cmd_host)
540
541    shared_p = sub.add_parser("shared", help="Collect shared metrics (pi sessions)")
542    shared_p.add_argument("--date", type=parse_date, default=None, help="Date (YYYY-MM-DD), default today")
543    shared_p.set_defaults(func=cmd_shared)
544
545    backfill_p = sub.add_parser("backfill", help="Backfill historical data")
546    backfill_p.add_argument("--force", action="store_true", help="Overwrite existing files")
547    backfill_p.add_argument("--shared", action="store_true", help="Also backfill shared/pi data")
548    backfill_p.set_defaults(func=cmd_backfill)
549
550    args = parser.parse_args()
551    args.func(args)
552
553
554if __name__ == "__main__":
555    main()