main
1#!/usr/bin/env -S uv run --script
2# /// script
3# requires-python = ">=3.11"
4# dependencies = []
5# ///
6"""
7usage-collect: Collect usage metrics from various sources.
8
9Modes:
10 host - Collect per-host data (shell, nix, emacs, services, custom tools)
11 shared - Collect shared data (pi sessions) — run on one host only
12 backfill - Process all historical data
13"""
14import argparse
15import json
16import os
17import re
18import socket
19import struct
20import subprocess
21import sys
22from collections import Counter, defaultdict
23from datetime import datetime, date, timedelta
24from pathlib import Path
25
26METRICS_DIR = Path(os.environ.get("XDG_DATA_HOME", Path.home() / ".local/share")) / "usage-metrics"
27HOSTNAME = socket.gethostname()
28
29
30def parse_zsh_history(history_file: Path, target_date: date | None = None) -> dict:
31 """Parse zsh extended history format: `: timestamp:duration;command`"""
32 commands = Counter()
33 total = 0
34 pattern = re.compile(r"^: (\d+):\d+;(.+)")
35
36 if not history_file.exists():
37 return {"total_commands": 0, "unique_commands": 0, "top_commands": [], "all_commands": {}}
38
39 with open(history_file, "r", errors="replace") as f:
40 for line in f:
41 m = pattern.match(line)
42 if not m:
43 continue
44 ts, cmd = int(m.group(1)), m.group(2).strip()
45 cmd_date = date.fromtimestamp(ts)
46 if target_date and cmd_date != target_date:
47 continue
48 # Extract first word as command name
49 cmd_name = cmd.split()[0] if cmd else ""
50 # Strip path prefixes
51 cmd_name = cmd_name.rsplit("/", 1)[-1]
52 if cmd_name:
53 commands[cmd_name] += 1
54 total += 1
55
56 top = [{"cmd": c, "count": n} for c, n in commands.most_common(50)]
57 return {
58 "total_commands": total,
59 "unique_commands": len(commands),
60 "top_commands": top,
61 "all_commands": dict(commands),
62 }
63
64
65def parse_zsh_history_range(history_file: Path, start: date, end: date) -> dict[str, dict]:
66 """Parse zsh history for a date range, returning per-day data."""
67 daily: dict[str, Counter] = defaultdict(Counter)
68 daily_total: dict[str, int] = defaultdict(int)
69 pattern = re.compile(r"^: (\d+):\d+;(.+)")
70
71 if not history_file.exists():
72 return {}
73
74 with open(history_file, "r", errors="replace") as f:
75 for line in f:
76 m = pattern.match(line)
77 if not m:
78 continue
79 ts, cmd = int(m.group(1)), m.group(2).strip()
80 cmd_date = date.fromtimestamp(ts)
81 if cmd_date < start or cmd_date > end:
82 continue
83 cmd_name = cmd.split()[0] if cmd else ""
84 cmd_name = cmd_name.rsplit("/", 1)[-1]
85 if cmd_name:
86 daily[str(cmd_date)][cmd_name] += 1
87 daily_total[str(cmd_date)] += 1
88
89 result = {}
90 for d in daily:
91 cmds = daily[d]
92 top = [{"cmd": c, "count": n} for c, n in cmds.most_common(50)]
93 result[d] = {
94 "total_commands": daily_total[d],
95 "unique_commands": len(cmds),
96 "top_commands": top,
97 "all_commands": dict(cmds),
98 }
99 return result
100
101
102def parse_process_accounting(target_date: date | None = None) -> dict:
103 """Parse process accounting data using lastcomm."""
104 commands = Counter()
105 try:
106 result = subprocess.run(
107 ["lastcomm", "--forwards"],
108 capture_output=True, text=True, timeout=30,
109 )
110 if result.returncode != 0:
111 return {"total_execs": 0, "unique_binaries": 0, "top_binaries": [], "all_binaries": {}}
112
113 for line in result.stdout.splitlines():
114 parts = line.split()
115 if len(parts) < 4:
116 continue
117 cmd_name = parts[0]
118 # lastcomm format varies; date is typically at the end
119 # We'll collect everything if no target_date filtering needed
120 # For date filtering, we'd need to parse the date fields
121 commands[cmd_name] += 1
122 except (FileNotFoundError, subprocess.TimeoutExpired):
123 return {"total_execs": 0, "unique_binaries": 0, "top_binaries": [], "all_binaries": {}}
124
125 top = [{"cmd": c, "count": n} for c, n in commands.most_common(50)]
126 return {
127 "total_execs": sum(commands.values()),
128 "unique_binaries": len(commands),
129 "top_binaries": top,
130 "all_binaries": dict(commands),
131 }
132
133
134def collect_nix_packages(shell_commands: dict, acct_commands: dict) -> dict:
135 """Cross-reference installed bins against used commands."""
136 system_bins = set()
137 bin_dirs = [Path("/run/current-system/sw/bin")]
138
139 nix_profile = Path.home() / ".nix-profile/bin"
140 if nix_profile.exists():
141 bin_dirs.append(nix_profile)
142
143 for d in bin_dirs:
144 if d.exists():
145 system_bins.update(f.name for f in d.iterdir() if f.is_file() or f.is_symlink())
146
147 # Combine all used commands from shell + process accounting
148 used = set(shell_commands.get("all_commands", {}).keys())
149 used |= set(acct_commands.get("all_binaries", {}).keys())
150
151 used_bins = system_bins & used
152 unused_bins = system_bins - used
153
154 return {
155 "total_bins": len(system_bins),
156 "used_count": len(used_bins),
157 "unused_count": len(unused_bins),
158 "used_bins": sorted(used_bins),
159 "unused_bins": sorted(unused_bins),
160 }
161
162
163def collect_emacs_data() -> dict:
164 """Read emacs dump file if available, compare with declared packages."""
165 dump_file = METRICS_DIR / "emacs-dump.json"
166 emacs_nix = Path.home() / "src/home/home/common/dev/emacs.nix"
167
168 # Parse declared packages from emacs.nix
169 declared = []
170 if emacs_nix.exists():
171 in_epkgs = False
172 bracket_depth = 0
173 for line in emacs_nix.read_text().splitlines():
174 stripped = line.strip()
175 if "epkgs: with epkgs;" in line or "epkgs:" in line and "with epkgs;" in line:
176 in_epkgs = True
177 bracket_depth = 0
178 continue
179 if in_epkgs:
180 bracket_depth += stripped.count("[") - stripped.count("]")
181 if bracket_depth <= 0 and "]" in stripped:
182 in_epkgs = False
183 continue
184 # Extract package name (skip comments, empty lines, and non-identifiers)
185 if stripped and not stripped.startswith("#") and not stripped.startswith("("):
186 pkg = stripped.rstrip(",").strip()
187 # Only include valid package names (alphanumeric + hyphens)
188 if pkg and re.match(r'^[a-zA-Z][a-zA-Z0-9_-]*$', pkg):
189 declared.append(pkg)
190
191 # Read emacs dump
192 loaded_features = []
193 command_freq = {}
194 if dump_file.exists():
195 try:
196 data = json.loads(dump_file.read_text())
197 loaded_features = data.get("loaded_features", [])
198 command_freq = data.get("command_frequency", {})
199 except (json.JSONDecodeError, KeyError):
200 pass
201
202 # Normalize: emacs package names use - but features may use _ or -
203 loaded_set = {f.replace("_", "-") for f in loaded_features}
204 unused = [p for p in declared if p.replace("_", "-") not in loaded_set]
205
206 return {
207 "declared_packages": declared,
208 "declared_count": len(declared),
209 "loaded_features": loaded_features,
210 "loaded_count": len(loaded_features),
211 "unused_packages": unused,
212 "unused_count": len(unused),
213 "command_frequency": dict(sorted(command_freq.items(), key=lambda x: -x[1])[:50]) if command_freq else {},
214 }
215
216
217def collect_services() -> dict:
218 """Snapshot running systemd services."""
219 try:
220 result = subprocess.run(
221 ["systemctl", "list-units", "--type=service", "--state=running", "--no-legend", "--no-pager"],
222 capture_output=True, text=True, timeout=10,
223 )
224 services = []
225 for line in result.stdout.splitlines():
226 parts = line.split()
227 if parts:
228 services.append(parts[0].removesuffix(".service"))
229 return {"running": sorted(services), "total": len(services)}
230 except (FileNotFoundError, subprocess.TimeoutExpired):
231 return {"running": [], "total": 0}
232
233
234def collect_custom_tools(shell_commands: dict, acct_commands: dict) -> dict:
235 """Check which custom tools from pkgs/ are actually used."""
236 pkgs_file = Path.home() / "src/home/pkgs/default.nix"
237 tools = []
238
239 if pkgs_file.exists():
240 for line in pkgs_file.read_text().splitlines():
241 m = re.match(r"\s+(\w[\w-]*)\s*=\s*pkgs\.callPackage", line)
242 if m:
243 tools.append(m.group(1))
244
245 used = set(shell_commands.get("all_commands", {}).keys())
246 used |= set(acct_commands.get("all_binaries", {}).keys())
247
248 used_tools = [t for t in tools if t in used]
249 unused_tools = [t for t in tools if t not in used]
250
251 return {
252 "defined": tools,
253 "used": used_tools,
254 "unused": unused_tools,
255 }
256
257
258def collect_pi_sessions(target_date: date | None = None) -> dict:
259 """Parse pi session JSONL files for tool/skill/model usage."""
260 sessions_dir = Path.home() / ".local/share/ai-sync/pi-sessions"
261 if not sessions_dir.exists():
262 # Try alternate location
263 sessions_dir = Path.home() / ".pi/agent/sessions"
264
265 if not sessions_dir.exists():
266 return {"sessions_count": 0, "tools": {}, "skills_loaded": {}, "models_used": {}, "providers_used": {}}
267
268 tools = Counter()
269 skills = Counter()
270 models = Counter()
271 providers = Counter()
272 sessions_count = 0
273
274 watermark_file = METRICS_DIR / ".pi-watermark"
275 watermark = ""
276 if watermark_file.exists() and target_date is None:
277 watermark = watermark_file.read_text().strip()
278
279 latest_file = ""
280
281 for session_dir in sessions_dir.iterdir():
282 if not session_dir.is_dir():
283 continue
284 for jsonl_file in sorted(session_dir.iterdir()):
285 if not jsonl_file.name.endswith(".jsonl"):
286 continue
287
288 # Skip files older than watermark
289 if watermark and jsonl_file.name < watermark:
290 continue
291
292 if jsonl_file.name > latest_file:
293 latest_file = jsonl_file.name
294
295 # Check date from filename: YYYY-MM-DDTHH-MM-SS-...
296 try:
297 file_date = date.fromisoformat(jsonl_file.name[:10])
298 except ValueError:
299 continue
300
301 if target_date and file_date != target_date:
302 continue
303
304 sessions_count += 1
305
306 try:
307 content = jsonl_file.read_text(errors="replace")
308 for line in content.splitlines():
309 try:
310 entry = json.loads(line)
311 except json.JSONDecodeError:
312 continue
313
314 entry_type = entry.get("type", "")
315
316 if entry_type == "model_change":
317 model = entry.get("modelId", "")
318 provider = entry.get("provider", "")
319 if model:
320 models[model] += 1
321 if provider:
322 providers[provider] += 1
323
324 elif entry_type == "message":
325 msg = entry.get("message", {})
326 msg_content = msg.get("content", [])
327 if isinstance(msg_content, list):
328 for block in msg_content:
329 if not isinstance(block, dict):
330 continue
331 # Tool calls from assistant messages
332 if block.get("type") == "toolCall":
333 tool_name = block.get("name", "")
334 if tool_name:
335 tools[tool_name] += 1
336 # Check for skill reads in tool args
337 args = block.get("arguments", {})
338 if isinstance(args, dict):
339 for v in args.values():
340 if isinstance(v, str):
341 for s in re.findall(r'skills/([^/]+)/SKILL\.md', v):
342 if re.match(r'^[A-Za-z][A-Za-z0-9_-]*$', s):
343 skills[s] += 1
344 # Text blocks may reference skills
345 elif block.get("type") == "text":
346 text = block.get("text", "")
347 for s in re.findall(r'skills/([^/]+)/SKILL\.md', text):
348 if re.match(r'^[A-Za-z][A-Za-z0-9_-]*$', s):
349 skills[s] += 1
350 # toolResult messages also have toolName
351 if msg.get("role") == "toolResult":
352 tool_name = msg.get("toolName", "")
353 if tool_name:
354 tools[tool_name] += 1 # count result too for completeness
355 # Check tool result content for skill file reads
356 result_content = msg.get("content", [])
357 if isinstance(result_content, list):
358 for block in result_content:
359 if isinstance(block, dict):
360 text = block.get("text", "")
361 if isinstance(text, str):
362 for s in re.findall(r'skills/([^/]+)/SKILL\.md', text):
363 skills[s] += 1
364
365 except (OSError, IOError):
366 continue
367
368 # Update watermark
369 if latest_file and target_date is None:
370 watermark_file.parent.mkdir(parents=True, exist_ok=True)
371 watermark_file.write_text(latest_file)
372
373 # Find all declared skills
374 skills_dir = Path.home() / ".config/claude/skills"
375 all_skills = []
376 if skills_dir.exists():
377 all_skills = [d.name for d in skills_dir.iterdir() if d.is_dir() and (d / "SKILL.md").exists()]
378
379 # Filter skills to only known skill names
380 all_skills_set = set(all_skills)
381 skills = Counter({k: v for k, v in skills.items() if k in all_skills_set})
382
383 never_used = [s for s in all_skills if s not in skills]
384
385 return {
386 "sessions_count": sessions_count,
387 "tools": dict(tools.most_common()),
388 "skills_loaded": dict(skills.most_common()),
389 "skills_never_used": sorted(never_used),
390 "models_used": dict(models.most_common()),
391 "providers_used": dict(providers.most_common()),
392 }
393
394
395def collect_host(target_date: date) -> dict:
396 """Collect all per-host metrics for a given date."""
397 history_file = Path(os.environ.get("XDG_DATA_HOME", Path.home() / ".local/share")) / "zsh_history"
398
399 shell = parse_zsh_history(history_file, target_date)
400 acct = parse_process_accounting(target_date)
401 nix = collect_nix_packages(shell, acct)
402 emacs = collect_emacs_data()
403 services = collect_services()
404 custom = collect_custom_tools(shell, acct)
405
406 return {
407 "hostname": HOSTNAME,
408 "date": str(target_date),
409 "shell": shell,
410 "process_accounting": acct,
411 "nix_packages": nix,
412 "emacs": emacs,
413 "services": services,
414 "custom_tools": custom,
415 }
416
417
418def collect_shared(target_date: date) -> dict:
419 """Collect shared metrics (pi sessions)."""
420 pi = collect_pi_sessions(target_date)
421 return {
422 "date": str(target_date),
423 "pi": pi,
424 }
425
426
427def write_metrics(data: dict, subdir: str, filename: str):
428 """Write metrics JSON to the appropriate directory."""
429 out_dir = METRICS_DIR / subdir
430 out_dir.mkdir(parents=True, exist_ok=True)
431 out_file = out_dir / filename
432 out_file.write_text(json.dumps(data, indent=2, default=str))
433 print(f"Written: {out_file}")
434
435
436def cmd_host(args):
437 target = args.date or date.today()
438 data = collect_host(target)
439 write_metrics(data, f"hosts/{HOSTNAME}", f"{target}.json")
440
441
442def cmd_shared(args):
443 target = args.date or date.today()
444 data = collect_shared(target)
445 write_metrics(data, "shared", f"{target}.json")
446
447
448def cmd_backfill(args):
449 """Backfill historical data from all sources."""
450 history_file = Path(os.environ.get("XDG_DATA_HOME", Path.home() / ".local/share")) / "zsh_history"
451
452 # Find date range from zsh history
453 pattern = re.compile(r"^: (\d+):\d+;")
454 min_ts = float('inf')
455 max_ts = 0
456
457 if history_file.exists():
458 with open(history_file, "r", errors="replace") as f:
459 for line in f:
460 m = pattern.match(line)
461 if m:
462 ts = int(m.group(1))
463 min_ts = min(min_ts, ts)
464 max_ts = max(max_ts, ts)
465
466 if min_ts == float('inf'):
467 print("No history data found for backfill.")
468 return
469
470 start = date.fromtimestamp(min_ts)
471 end = date.fromtimestamp(max_ts)
472 print(f"Backfilling host data from {start} to {end}...")
473
474 # Parse all history at once for efficiency
475 daily_shell = parse_zsh_history_range(history_file, start, end)
476 acct = parse_process_accounting() # Can't filter by date easily for backfill
477
478 for day_str, shell_data in sorted(daily_shell.items()):
479 out_file = METRICS_DIR / f"hosts/{HOSTNAME}" / f"{day_str}.json"
480 if out_file.exists() and not args.force:
481 continue
482
483 data = {
484 "hostname": HOSTNAME,
485 "date": day_str,
486 "shell": shell_data,
487 "process_accounting": {"note": "backfilled, no per-day acct data"},
488 "nix_packages": {"note": "snapshot not available for historical dates"},
489 "emacs": {"note": "snapshot not available for historical dates"},
490 "services": {"note": "snapshot not available for historical dates"},
491 "custom_tools": {"note": "snapshot not available for historical dates"},
492 }
493 write_metrics(data, f"hosts/{HOSTNAME}", f"{day_str}.json")
494
495 print(f"Backfilled {len(daily_shell)} days of host data.")
496
497 # Backfill shared (pi sessions)
498 if args.shared:
499 print("Backfilling shared (pi sessions)...")
500 # Collect all pi sessions without date filter, grouped by date
501 sessions_dir = Path.home() / ".local/share/ai-sync/pi-sessions"
502 if not sessions_dir.exists():
503 sessions_dir = Path.home() / ".pi/agent/sessions"
504
505 if sessions_dir.exists():
506 # Group session files by date
507 dates_seen = set()
508 for session_dir in sessions_dir.iterdir():
509 if not session_dir.is_dir():
510 continue
511 for jsonl_file in session_dir.iterdir():
512 if jsonl_file.name.endswith(".jsonl"):
513 try:
514 file_date = date.fromisoformat(jsonl_file.name[:10])
515 dates_seen.add(file_date)
516 except ValueError:
517 continue
518
519 for d in sorted(dates_seen):
520 out_file = METRICS_DIR / "shared" / f"{d}.json"
521 if out_file.exists() and not args.force:
522 continue
523 data = collect_shared(d)
524 write_metrics(data, "shared", f"{d}.json")
525
526 print(f"Backfilled {len(dates_seen)} days of shared data.")
527
528
529def parse_date(s: str) -> date:
530 return date.fromisoformat(s)
531
532
533def main():
534 parser = argparse.ArgumentParser(description="Collect usage metrics")
535 sub = parser.add_subparsers(dest="command", required=True)
536
537 host_p = sub.add_parser("host", help="Collect per-host metrics")
538 host_p.add_argument("--date", type=parse_date, default=None, help="Date (YYYY-MM-DD), default today")
539 host_p.set_defaults(func=cmd_host)
540
541 shared_p = sub.add_parser("shared", help="Collect shared metrics (pi sessions)")
542 shared_p.add_argument("--date", type=parse_date, default=None, help="Date (YYYY-MM-DD), default today")
543 shared_p.set_defaults(func=cmd_shared)
544
545 backfill_p = sub.add_parser("backfill", help="Backfill historical data")
546 backfill_p.add_argument("--force", action="store_true", help="Overwrite existing files")
547 backfill_p.add_argument("--shared", action="store_true", help="Also backfill shared/pi data")
548 backfill_p.set_defaults(func=cmd_backfill)
549
550 args = parser.parse_args()
551 args.func(args)
552
553
554if __name__ == "__main__":
555 main()