Commit 6fdd06e5b238
Changed files (15)
dots
config
emacs
site-lisp
home
common
services
pkgs
systems
tools
usage-metrics
dots/config/emacs/site-lisp/usage-metrics.el
@@ -0,0 +1,1 @@
+../../../../tools/usage-metrics/usage-metrics.el
\ No newline at end of file
dots/config/emacs/init.el
@@ -2281,6 +2281,10 @@ parameter), remove all other windows so the capture buffer fills the frame."
:after org-capture
:demand t)
+;; Usage Metrics - dumps loaded features and command frequency for usage-metrics collector
+(use-package usage-metrics
+ :demand t)
+
;; Daily Plan - Jira/GitHub → org-mode scheduling
(use-package daily-plan
:commands (daily-plan-show daily-plan-inbox daily-plan-weekly
home/common/services/usage-metrics.nix
@@ -0,0 +1,47 @@
+{
+ pkgs,
+ hostname,
+ lib,
+ ...
+}:
+let
+ usage-metrics = pkgs.usage-metrics;
+ isSharedHost = hostname == "okinawa";
+in
+{
+ home.packages = [ usage-metrics ];
+
+ systemd.user.services.usage-collect-host = {
+ Unit.Description = "Collect per-host usage metrics";
+ Service = {
+ Type = "oneshot";
+ ExecStart = "${usage-metrics}/bin/usage-collect host";
+ };
+ };
+
+ systemd.user.timers.usage-collect-host = {
+ Unit.Description = "Daily per-host usage metrics collection";
+ Timer = {
+ OnCalendar = "*-*-* 23:50:00";
+ Persistent = true;
+ };
+ Install.WantedBy = [ "timers.target" ];
+ };
+
+ systemd.user.services.usage-collect-shared = lib.mkIf isSharedHost {
+ Unit.Description = "Collect shared usage metrics (pi sessions)";
+ Service = {
+ Type = "oneshot";
+ ExecStart = "${usage-metrics}/bin/usage-collect shared";
+ };
+ };
+
+ systemd.user.timers.usage-collect-shared = lib.mkIf isSharedHost {
+ Unit.Description = "Daily shared usage metrics collection";
+ Timer = {
+ OnCalendar = "*-*-* 23:55:00";
+ Persistent = true;
+ };
+ Install.WantedBy = [ "timers.target" ];
+ };
+}
pkgs/default.nix
@@ -29,6 +29,7 @@ in
audible-converter = pkgs.callPackage ./audible-converter { };
jellyfin-auto-collections = pkgs.callPackage ./jellyfin-auto-collections { };
jellyfin-favorites-sync = pkgs.callPackage ../tools/jellyfin-favorites-sync { };
+ usage-metrics = pkgs.callPackage ../tools/usage-metrics { };
jellyfin-manage-playlist = pkgs.callPackage ../tools/jellyfin-manage-playlist { };
music-playlist-dl = pkgs.callPackage ../tools/music-playlist-dl { };
readwise-reader = pkgs.callPackage ../tools/readwise-reader { };
systems/aix/home.nix
@@ -1,2 +1,5 @@
_: {
+ imports = [
+ ../../home/common/services/usage-metrics.nix
+ ];
}
systems/aomi/home.nix
@@ -1,6 +1,7 @@
{ pkgs, config, ... }:
{
imports = [
+ ../../home/common/services/usage-metrics.nix
../../home/common/dev/default.nix
../../home/common/dev/gh-news.nix
../../home/common/dev/github-notif-manager.nix
systems/carthage/home.nix
@@ -1,2 +1,5 @@
_: {
+ imports = [
+ ../../home/common/services/usage-metrics.nix
+ ];
}
systems/common/base/default.nix
@@ -30,6 +30,7 @@
};
environment.systemPackages = with pkgs; [
+ acct
binutils
curl
detach # For detached session management
@@ -82,4 +83,17 @@
);
services.fwupd.enable = true;
+
+ # Process accounting — logs every exec for usage-metrics tracking
+ systemd.services.acct = {
+ description = "GNU Process Accounting";
+ wantedBy = [ "multi-user.target" ];
+ serviceConfig = {
+ Type = "oneshot";
+ RemainAfterExit = true;
+ ExecStartPre = "${pkgs.coreutils}/bin/mkdir -p /var/log/account";
+ ExecStart = "${pkgs.acct}/bin/accton /var/log/account/pacct";
+ ExecStop = "${pkgs.acct}/bin/accton off";
+ };
+ };
}
systems/kyushu/home.nix
@@ -8,6 +8,7 @@ let
in
{
imports = [
+ ../../home/common/services/usage-metrics.nix
../../home/common/dev/containers.nix
../../home/common/dev/gh-news.nix
../../home/common/dev/github-notif-manager.nix
systems/okinawa/home.nix
@@ -15,6 +15,7 @@ let
in
{
imports = [
+ ../../home/common/services/usage-metrics.nix
../../home/common/dev/containers.nix
../../home/common/dev/gh-news.nix
../../home/common/dev/github-notif-manager.nix
tools/usage-metrics/default.nix
@@ -0,0 +1,35 @@
+{
+ pkgs,
+ lib,
+ ...
+}:
+let
+ python = pkgs.python3;
+in
+pkgs.stdenvNoCC.mkDerivation {
+ pname = "usage-metrics";
+ version = "0.1.0";
+
+ src = ./.;
+
+ nativeBuildInputs = [ pkgs.makeWrapper ];
+
+ installPhase = ''
+ mkdir -p $out/bin $out/share/emacs/site-lisp
+
+ # Install Python scripts without uv shebang
+ sed '1s|.*|#!${python}/bin/python3|' usage-collect > $out/bin/usage-collect
+ sed '1s|.*|#!${python}/bin/python3|' usage-report > $out/bin/usage-report
+ chmod +x $out/bin/usage-collect $out/bin/usage-report
+
+ cp usage-metrics.el $out/share/emacs/site-lisp/
+
+ wrapProgram $out/bin/usage-collect \
+ --prefix PATH : ${lib.makeBinPath [ pkgs.acct ]}
+ '';
+
+ meta = {
+ description = "Lightweight usage metrics collection and reporting";
+ mainProgram = "usage-collect";
+ };
+}
tools/usage-metrics/usage-collect
@@ -0,0 +1,551 @@
+#!/usr/bin/env -S uv run --script
+# /// script
+# requires-python = ">=3.11"
+# dependencies = []
+# ///
+"""
+usage-collect: Collect usage metrics from various sources.
+
+Modes:
+ host - Collect per-host data (shell, nix, emacs, services, custom tools)
+ shared - Collect shared data (pi sessions) — run on one host only
+ backfill - Process all historical data
+"""
+import argparse
+import json
+import os
+import re
+import socket
+import struct
+import subprocess
+import sys
+from collections import Counter, defaultdict
+from datetime import datetime, date, timedelta
+from pathlib import Path
+
+METRICS_DIR = Path(os.environ.get("XDG_DATA_HOME", Path.home() / ".local/share")) / "usage-metrics"
+HOSTNAME = socket.gethostname()
+
+
+def parse_zsh_history(history_file: Path, target_date: date | None = None) -> dict:
+ """Parse zsh extended history format: `: timestamp:duration;command`"""
+ commands = Counter()
+ total = 0
+ pattern = re.compile(r"^: (\d+):\d+;(.+)")
+
+ if not history_file.exists():
+ return {"total_commands": 0, "unique_commands": 0, "top_commands": [], "all_commands": {}}
+
+ with open(history_file, "r", errors="replace") as f:
+ for line in f:
+ m = pattern.match(line)
+ if not m:
+ continue
+ ts, cmd = int(m.group(1)), m.group(2).strip()
+ cmd_date = date.fromtimestamp(ts)
+ if target_date and cmd_date != target_date:
+ continue
+ # Extract first word as command name
+ cmd_name = cmd.split()[0] if cmd else ""
+ # Strip path prefixes
+ cmd_name = cmd_name.rsplit("/", 1)[-1]
+ if cmd_name:
+ commands[cmd_name] += 1
+ total += 1
+
+ top = [{"cmd": c, "count": n} for c, n in commands.most_common(50)]
+ return {
+ "total_commands": total,
+ "unique_commands": len(commands),
+ "top_commands": top,
+ "all_commands": dict(commands),
+ }
+
+
+def parse_zsh_history_range(history_file: Path, start: date, end: date) -> dict[str, dict]:
+ """Parse zsh history for a date range, returning per-day data."""
+ daily: dict[str, Counter] = defaultdict(Counter)
+ daily_total: dict[str, int] = defaultdict(int)
+ pattern = re.compile(r"^: (\d+):\d+;(.+)")
+
+ if not history_file.exists():
+ return {}
+
+ with open(history_file, "r", errors="replace") as f:
+ for line in f:
+ m = pattern.match(line)
+ if not m:
+ continue
+ ts, cmd = int(m.group(1)), m.group(2).strip()
+ cmd_date = date.fromtimestamp(ts)
+ if cmd_date < start or cmd_date > end:
+ continue
+ cmd_name = cmd.split()[0] if cmd else ""
+ cmd_name = cmd_name.rsplit("/", 1)[-1]
+ if cmd_name:
+ daily[str(cmd_date)][cmd_name] += 1
+ daily_total[str(cmd_date)] += 1
+
+ result = {}
+ for d in daily:
+ cmds = daily[d]
+ top = [{"cmd": c, "count": n} for c, n in cmds.most_common(50)]
+ result[d] = {
+ "total_commands": daily_total[d],
+ "unique_commands": len(cmds),
+ "top_commands": top,
+ "all_commands": dict(cmds),
+ }
+ return result
+
+
+def parse_process_accounting(target_date: date | None = None) -> dict:
+ """Parse process accounting data using lastcomm."""
+ commands = Counter()
+ try:
+ result = subprocess.run(
+ ["lastcomm", "--forwards"],
+ capture_output=True, text=True, timeout=30,
+ )
+ if result.returncode != 0:
+ return {"total_execs": 0, "unique_binaries": 0, "top_binaries": [], "all_binaries": {}}
+
+ for line in result.stdout.splitlines():
+ parts = line.split()
+ if len(parts) < 4:
+ continue
+ cmd_name = parts[0]
+ # lastcomm format varies; date is typically at the end
+ # We'll collect everything if no target_date filtering needed
+ # For date filtering, we'd need to parse the date fields
+ commands[cmd_name] += 1
+ except (FileNotFoundError, subprocess.TimeoutExpired):
+ return {"total_execs": 0, "unique_binaries": 0, "top_binaries": [], "all_binaries": {}}
+
+ top = [{"cmd": c, "count": n} for c, n in commands.most_common(50)]
+ return {
+ "total_execs": sum(commands.values()),
+ "unique_binaries": len(commands),
+ "top_binaries": top,
+ "all_binaries": dict(commands),
+ }
+
+
+def collect_nix_packages(shell_commands: dict, acct_commands: dict) -> dict:
+ """Cross-reference installed bins against used commands."""
+ system_bins = set()
+ bin_dirs = [Path("/run/current-system/sw/bin")]
+
+ nix_profile = Path.home() / ".nix-profile/bin"
+ if nix_profile.exists():
+ bin_dirs.append(nix_profile)
+
+ for d in bin_dirs:
+ if d.exists():
+ system_bins.update(f.name for f in d.iterdir() if f.is_file() or f.is_symlink())
+
+ # Combine all used commands from shell + process accounting
+ used = set(shell_commands.get("all_commands", {}).keys())
+ used |= set(acct_commands.get("all_binaries", {}).keys())
+
+ used_bins = system_bins & used
+ unused_bins = system_bins - used
+
+ return {
+ "total_bins": len(system_bins),
+ "used_count": len(used_bins),
+ "unused_count": len(unused_bins),
+ "used_bins": sorted(used_bins),
+ "unused_bins": sorted(unused_bins),
+ }
+
+
+def collect_emacs_data() -> dict:
+ """Read emacs dump file if available, compare with declared packages."""
+ dump_file = METRICS_DIR / "emacs-dump.json"
+ emacs_nix = Path.home() / "src/home/home/common/dev/emacs.nix"
+
+ # Parse declared packages from emacs.nix
+ declared = []
+ if emacs_nix.exists():
+ in_epkgs = False
+ bracket_depth = 0
+ for line in emacs_nix.read_text().splitlines():
+ stripped = line.strip()
+ if "epkgs: with epkgs;" in line or "epkgs:" in line and "with epkgs;" in line:
+ in_epkgs = True
+ bracket_depth = 0
+ continue
+ if in_epkgs:
+ bracket_depth += stripped.count("[") - stripped.count("]")
+ if bracket_depth <= 0 and "]" in stripped:
+ in_epkgs = False
+ continue
+ # Extract package name (skip comments, empty lines, and non-identifiers)
+ if stripped and not stripped.startswith("#") and not stripped.startswith("("):
+ pkg = stripped.rstrip(",").strip()
+ # Only include valid package names (alphanumeric + hyphens)
+ if pkg and re.match(r'^[a-zA-Z][a-zA-Z0-9_-]*$', pkg):
+ declared.append(pkg)
+
+ # Read emacs dump
+ loaded_features = []
+ command_freq = {}
+ if dump_file.exists():
+ try:
+ data = json.loads(dump_file.read_text())
+ loaded_features = data.get("loaded_features", [])
+ command_freq = data.get("command_frequency", {})
+ except (json.JSONDecodeError, KeyError):
+ pass
+
+ # Normalize: emacs package names use - but features may use _ or -
+ loaded_set = {f.replace("_", "-") for f in loaded_features}
+ unused = [p for p in declared if p.replace("_", "-") not in loaded_set]
+
+ return {
+ "declared_packages": declared,
+ "declared_count": len(declared),
+ "loaded_features": loaded_features,
+ "loaded_count": len(loaded_features),
+ "unused_packages": unused,
+ "unused_count": len(unused),
+ "command_frequency": dict(sorted(command_freq.items(), key=lambda x: -x[1])[:50]) if command_freq else {},
+ }
+
+
+def collect_services() -> dict:
+ """Snapshot running systemd services."""
+ try:
+ result = subprocess.run(
+ ["systemctl", "list-units", "--type=service", "--state=running", "--no-legend", "--no-pager"],
+ capture_output=True, text=True, timeout=10,
+ )
+ services = []
+ for line in result.stdout.splitlines():
+ parts = line.split()
+ if parts:
+ services.append(parts[0].removesuffix(".service"))
+ return {"running": sorted(services), "total": len(services)}
+ except (FileNotFoundError, subprocess.TimeoutExpired):
+ return {"running": [], "total": 0}
+
+
+def collect_custom_tools(shell_commands: dict, acct_commands: dict) -> dict:
+ """Check which custom tools from pkgs/ are actually used."""
+ pkgs_file = Path.home() / "src/home/pkgs/default.nix"
+ tools = []
+
+ if pkgs_file.exists():
+ for line in pkgs_file.read_text().splitlines():
+ m = re.match(r"\s+(\w[\w-]*)\s*=\s*pkgs\.callPackage", line)
+ if m:
+ tools.append(m.group(1))
+
+ used = set(shell_commands.get("all_commands", {}).keys())
+ used |= set(acct_commands.get("all_binaries", {}).keys())
+
+ used_tools = [t for t in tools if t in used]
+ unused_tools = [t for t in tools if t not in used]
+
+ return {
+ "defined": tools,
+ "used": used_tools,
+ "unused": unused_tools,
+ }
+
+
+def collect_pi_sessions(target_date: date | None = None) -> dict:
+ """Parse pi session JSONL files for tool/skill/model usage."""
+ sessions_dir = Path.home() / ".local/share/ai-sync/pi-sessions"
+ if not sessions_dir.exists():
+ # Try alternate location
+ sessions_dir = Path.home() / ".pi/agent/sessions"
+
+ if not sessions_dir.exists():
+ return {"sessions_count": 0, "tools": {}, "skills_loaded": {}, "models_used": {}, "providers_used": {}}
+
+ tools = Counter()
+ skills = Counter()
+ models = Counter()
+ providers = Counter()
+ sessions_count = 0
+
+ watermark_file = METRICS_DIR / ".pi-watermark"
+ watermark = ""
+ if watermark_file.exists() and target_date is None:
+ watermark = watermark_file.read_text().strip()
+
+ latest_file = ""
+
+ for session_dir in sessions_dir.iterdir():
+ if not session_dir.is_dir():
+ continue
+ for jsonl_file in sorted(session_dir.iterdir()):
+ if not jsonl_file.name.endswith(".jsonl"):
+ continue
+
+ # Skip files older than watermark
+ if watermark and jsonl_file.name < watermark:
+ continue
+
+ if jsonl_file.name > latest_file:
+ latest_file = jsonl_file.name
+
+ # Check date from filename: YYYY-MM-DDTHH-MM-SS-...
+ try:
+ file_date = date.fromisoformat(jsonl_file.name[:10])
+ except ValueError:
+ continue
+
+ if target_date and file_date != target_date:
+ continue
+
+ sessions_count += 1
+
+ try:
+ content = jsonl_file.read_text(errors="replace")
+ for line in content.splitlines():
+ try:
+ entry = json.loads(line)
+ except json.JSONDecodeError:
+ continue
+
+ entry_type = entry.get("type", "")
+
+ if entry_type == "model_change":
+ model = entry.get("modelId", "")
+ provider = entry.get("provider", "")
+ if model:
+ models[model] += 1
+ if provider:
+ providers[provider] += 1
+
+ elif entry_type == "message":
+ msg = entry.get("message", {})
+ msg_content = msg.get("content", [])
+ if isinstance(msg_content, list):
+ for block in msg_content:
+ if not isinstance(block, dict):
+ continue
+ # Tool calls from assistant messages
+ if block.get("type") == "toolCall":
+ tool_name = block.get("name", "")
+ if tool_name:
+ tools[tool_name] += 1
+ # Check for skill reads in tool args
+ args = block.get("arguments", {})
+ if isinstance(args, dict):
+ for v in args.values():
+ if isinstance(v, str):
+ for s in re.findall(r'skills/([^/]+)/SKILL\.md', v):
+ if re.match(r'^[A-Za-z][A-Za-z0-9_-]*$', s):
+ skills[s] += 1
+ # Text blocks may reference skills
+ elif block.get("type") == "text":
+ text = block.get("text", "")
+ for s in re.findall(r'skills/([^/]+)/SKILL\.md', text):
+ if re.match(r'^[A-Za-z][A-Za-z0-9_-]*$', s):
+ skills[s] += 1
+ # toolResult messages also have toolName
+ if msg.get("role") == "toolResult":
+ tool_name = msg.get("toolName", "")
+ if tool_name:
+ tools[tool_name] += 1 # count result too for completeness
+ # Check tool result content for skill file reads
+ result_content = msg.get("content", [])
+ if isinstance(result_content, list):
+ for block in result_content:
+ if isinstance(block, dict):
+ text = block.get("text", "")
+ if isinstance(text, str):
+ for s in re.findall(r'skills/([^/]+)/SKILL\.md', text):
+ skills[s] += 1
+
+ except (OSError, IOError):
+ continue
+
+ # Update watermark
+ if latest_file and target_date is None:
+ watermark_file.parent.mkdir(parents=True, exist_ok=True)
+ watermark_file.write_text(latest_file)
+
+ # Find all declared skills
+ skills_dir = Path.home() / ".config/claude/skills"
+ all_skills = []
+ if skills_dir.exists():
+ all_skills = [d.name for d in skills_dir.iterdir() if d.is_dir() and (d / "SKILL.md").exists()]
+
+ never_used = [s for s in all_skills if s not in skills]
+
+ return {
+ "sessions_count": sessions_count,
+ "tools": dict(tools.most_common()),
+ "skills_loaded": dict(skills.most_common()),
+ "skills_never_used": sorted(never_used),
+ "models_used": dict(models.most_common()),
+ "providers_used": dict(providers.most_common()),
+ }
+
+
+def collect_host(target_date: date) -> dict:
+ """Collect all per-host metrics for a given date."""
+ history_file = Path(os.environ.get("XDG_DATA_HOME", Path.home() / ".local/share")) / "zsh_history"
+
+ shell = parse_zsh_history(history_file, target_date)
+ acct = parse_process_accounting(target_date)
+ nix = collect_nix_packages(shell, acct)
+ emacs = collect_emacs_data()
+ services = collect_services()
+ custom = collect_custom_tools(shell, acct)
+
+ return {
+ "hostname": HOSTNAME,
+ "date": str(target_date),
+ "shell": shell,
+ "process_accounting": acct,
+ "nix_packages": nix,
+ "emacs": emacs,
+ "services": services,
+ "custom_tools": custom,
+ }
+
+
+def collect_shared(target_date: date) -> dict:
+ """Collect shared metrics (pi sessions)."""
+ pi = collect_pi_sessions(target_date)
+ return {
+ "date": str(target_date),
+ "pi": pi,
+ }
+
+
+def write_metrics(data: dict, subdir: str, filename: str):
+ """Write metrics JSON to the appropriate directory."""
+ out_dir = METRICS_DIR / subdir
+ out_dir.mkdir(parents=True, exist_ok=True)
+ out_file = out_dir / filename
+ out_file.write_text(json.dumps(data, indent=2, default=str))
+ print(f"Written: {out_file}")
+
+
+def cmd_host(args):
+ target = args.date or date.today()
+ data = collect_host(target)
+ write_metrics(data, f"hosts/{HOSTNAME}", f"{target}.json")
+
+
+def cmd_shared(args):
+ target = args.date or date.today()
+ data = collect_shared(target)
+ write_metrics(data, "shared", f"{target}.json")
+
+
+def cmd_backfill(args):
+ """Backfill historical data from all sources."""
+ history_file = Path(os.environ.get("XDG_DATA_HOME", Path.home() / ".local/share")) / "zsh_history"
+
+ # Find date range from zsh history
+ pattern = re.compile(r"^: (\d+):\d+;")
+ min_ts = float('inf')
+ max_ts = 0
+
+ if history_file.exists():
+ with open(history_file, "r", errors="replace") as f:
+ for line in f:
+ m = pattern.match(line)
+ if m:
+ ts = int(m.group(1))
+ min_ts = min(min_ts, ts)
+ max_ts = max(max_ts, ts)
+
+ if min_ts == float('inf'):
+ print("No history data found for backfill.")
+ return
+
+ start = date.fromtimestamp(min_ts)
+ end = date.fromtimestamp(max_ts)
+ print(f"Backfilling host data from {start} to {end}...")
+
+ # Parse all history at once for efficiency
+ daily_shell = parse_zsh_history_range(history_file, start, end)
+ acct = parse_process_accounting() # Can't filter by date easily for backfill
+
+ for day_str, shell_data in sorted(daily_shell.items()):
+ out_file = METRICS_DIR / f"hosts/{HOSTNAME}" / f"{day_str}.json"
+ if out_file.exists() and not args.force:
+ continue
+
+ data = {
+ "hostname": HOSTNAME,
+ "date": day_str,
+ "shell": shell_data,
+ "process_accounting": {"note": "backfilled, no per-day acct data"},
+ "nix_packages": {"note": "snapshot not available for historical dates"},
+ "emacs": {"note": "snapshot not available for historical dates"},
+ "services": {"note": "snapshot not available for historical dates"},
+ "custom_tools": {"note": "snapshot not available for historical dates"},
+ }
+ write_metrics(data, f"hosts/{HOSTNAME}", f"{day_str}.json")
+
+ print(f"Backfilled {len(daily_shell)} days of host data.")
+
+ # Backfill shared (pi sessions)
+ if args.shared:
+ print("Backfilling shared (pi sessions)...")
+ # Collect all pi sessions without date filter, grouped by date
+ sessions_dir = Path.home() / ".local/share/ai-sync/pi-sessions"
+ if not sessions_dir.exists():
+ sessions_dir = Path.home() / ".pi/agent/sessions"
+
+ if sessions_dir.exists():
+ # Group session files by date
+ dates_seen = set()
+ for session_dir in sessions_dir.iterdir():
+ if not session_dir.is_dir():
+ continue
+ for jsonl_file in session_dir.iterdir():
+ if jsonl_file.name.endswith(".jsonl"):
+ try:
+ file_date = date.fromisoformat(jsonl_file.name[:10])
+ dates_seen.add(file_date)
+ except ValueError:
+ continue
+
+ for d in sorted(dates_seen):
+ out_file = METRICS_DIR / "shared" / f"{d}.json"
+ if out_file.exists() and not args.force:
+ continue
+ data = collect_shared(d)
+ write_metrics(data, "shared", f"{d}.json")
+
+ print(f"Backfilled {len(dates_seen)} days of shared data.")
+
+
+def parse_date(s: str) -> date:
+ return date.fromisoformat(s)
+
+
+def main():
+ parser = argparse.ArgumentParser(description="Collect usage metrics")
+ sub = parser.add_subparsers(dest="command", required=True)
+
+ host_p = sub.add_parser("host", help="Collect per-host metrics")
+ host_p.add_argument("--date", type=parse_date, default=None, help="Date (YYYY-MM-DD), default today")
+ host_p.set_defaults(func=cmd_host)
+
+ shared_p = sub.add_parser("shared", help="Collect shared metrics (pi sessions)")
+ shared_p.add_argument("--date", type=parse_date, default=None, help="Date (YYYY-MM-DD), default today")
+ shared_p.set_defaults(func=cmd_shared)
+
+ backfill_p = sub.add_parser("backfill", help="Backfill historical data")
+ backfill_p.add_argument("--force", action="store_true", help="Overwrite existing files")
+ backfill_p.add_argument("--shared", action="store_true", help="Also backfill shared/pi data")
+ backfill_p.set_defaults(func=cmd_backfill)
+
+ args = parser.parse_args()
+ args.func(args)
+
+
+if __name__ == "__main__":
+ main()
tools/usage-metrics/usage-metrics.el
@@ -0,0 +1,97 @@
+;;; usage-metrics.el --- Dump Emacs usage data for usage-metrics collector -*- lexical-binding: t; -*-
+
+;;; Commentary:
+;; Periodically dumps loaded features and command frequency data to a JSON file
+;; that the usage-collect script reads.
+
+;;; Code:
+
+(defvar usage-metrics-output-file
+ (expand-file-name "usage-metrics/emacs-dump.json"
+ (or (getenv "XDG_DATA_HOME")
+ (expand-file-name ".local/share" (getenv "HOME"))))
+ "Path to write the emacs usage dump.")
+
+(defvar usage-metrics-timer nil
+ "Timer for periodic usage metrics dump.")
+
+(defun usage-metrics--get-loaded-features ()
+ "Return list of loaded feature names as strings."
+ (mapcar #'symbol-name features))
+
+(defun usage-metrics--get-command-frequency ()
+ "Return alist of (command . count) from keyfreq data if available."
+ (when (and (fboundp 'keyfreq-table)
+ (boundp 'keyfreq-table)
+ (hash-table-p keyfreq-table))
+ (let ((result '()))
+ (maphash
+ (lambda (key count)
+ (when (symbolp (cdr key))
+ (let* ((cmd-name (symbol-name (cdr key)))
+ (existing (assoc cmd-name result)))
+ (if existing
+ (setcdr existing (+ (cdr existing) count))
+ (push (cons cmd-name count) result)))))
+ keyfreq-table)
+ ;; Sort by frequency descending
+ (sort result (lambda (a b) (> (cdr a) (cdr b)))))))
+
+(defun usage-metrics--get-package-list ()
+ "Return list of installed package names."
+ (cond
+ ;; straight.el
+ ((fboundp 'straight--installed-packages)
+ (mapcar #'symbol-name (hash-table-keys straight--recipe-cache)))
+ ;; elpaca
+ ((fboundp 'elpaca--queued)
+ (mapcar (lambda (e) (symbol-name (elpaca<-package e)))
+ (elpaca--queued)))
+ ;; package.el
+ ((bound-and-true-p package-activated-list)
+ (mapcar #'symbol-name package-activated-list))
+ (t '())))
+
+(defun usage-metrics-dump ()
+ "Dump current Emacs usage data to JSON file."
+ (interactive)
+ (let* ((features-list (usage-metrics--get-loaded-features))
+ (cmd-freq (usage-metrics--get-command-frequency))
+ (packages (usage-metrics--get-package-list))
+ (data `(("timestamp" . ,(format-time-string "%Y-%m-%dT%H:%M:%S%z"))
+ ("loaded_features" . ,(vconcat features-list))
+ ("installed_packages" . ,(vconcat packages))
+ ("command_frequency" . ,(let ((ht (make-hash-table :test 'equal)))
+ (dolist (pair (seq-take cmd-freq 200))
+ (puthash (car pair) (cdr pair) ht))
+ ht))))
+ (dir (file-name-directory usage-metrics-output-file)))
+ (unless (file-directory-p dir)
+ (make-directory dir t))
+ (with-temp-file usage-metrics-output-file
+ (insert (json-serialize data)))
+ (message "usage-metrics: dumped to %s" usage-metrics-output-file)))
+
+(defun usage-metrics-start ()
+ "Start periodic usage metrics dumping (every 30 minutes)."
+ (interactive)
+ (usage-metrics-stop)
+ (setq usage-metrics-timer
+ (run-with-timer 300 1800 #'usage-metrics-dump)) ; first dump after 5min, then every 30min
+ (add-hook 'kill-emacs-hook #'usage-metrics-dump)
+ (message "usage-metrics: started"))
+
+(defun usage-metrics-stop ()
+ "Stop periodic usage metrics dumping."
+ (interactive)
+ (when usage-metrics-timer
+ (cancel-timer usage-metrics-timer)
+ (setq usage-metrics-timer nil))
+ (remove-hook 'kill-emacs-hook #'usage-metrics-dump)
+ (message "usage-metrics: stopped"))
+
+;; Auto-start when loaded
+(usage-metrics-start)
+
+(provide 'usage-metrics)
+;;; usage-metrics.el ends here
tools/usage-metrics/usage-report
@@ -0,0 +1,280 @@
+#!/usr/bin/env -S uv run --script
+# /// script
+# requires-python = ">=3.11"
+# dependencies = []
+# ///
+"""
+usage-report: Generate usage reports from collected metrics.
+
+Reads JSON files from ~/.local/share/usage-metrics/ and produces
+a markdown summary for awareness, pruning, and optimization.
+"""
+import argparse
+import json
+import os
+import sys
+from collections import Counter
+from datetime import date, timedelta
+from pathlib import Path
+
+
+METRICS_DIR = Path(os.environ.get("XDG_DATA_HOME", Path.home() / ".local/share")) / "usage-metrics"
+
+
+def load_host_data(days: int) -> dict[str, list[dict]]:
+ """Load host metrics for the last N days, grouped by hostname."""
+ hosts_dir = METRICS_DIR / "hosts"
+ if not hosts_dir.exists():
+ return {}
+
+ cutoff = date.today() - timedelta(days=days)
+ result: dict[str, list[dict]] = {}
+
+ for host_dir in hosts_dir.iterdir():
+ if not host_dir.is_dir():
+ continue
+ hostname = host_dir.name
+ result[hostname] = []
+ for f in sorted(host_dir.iterdir()):
+ if not f.name.endswith(".json"):
+ continue
+ try:
+ file_date = date.fromisoformat(f.stem)
+ except ValueError:
+ continue
+ if file_date < cutoff:
+ continue
+ try:
+ result[hostname].append(json.loads(f.read_text()))
+ except (json.JSONDecodeError, OSError):
+ continue
+
+ return result
+
+
+def load_shared_data(days: int) -> list[dict]:
+ """Load shared metrics for the last N days."""
+ shared_dir = METRICS_DIR / "shared"
+ if not shared_dir.exists():
+ return []
+
+ cutoff = date.today() - timedelta(days=days)
+ result = []
+
+ for f in sorted(shared_dir.iterdir()):
+ if not f.name.endswith(".json"):
+ continue
+ try:
+ file_date = date.fromisoformat(f.stem)
+ except ValueError:
+ continue
+ if file_date < cutoff:
+ continue
+ try:
+ result.append(json.loads(f.read_text()))
+ except (json.JSONDecodeError, OSError):
+ continue
+
+ return result
+
+
+def aggregate_commands(data_list: list[dict], key: str = "shell") -> Counter:
+ """Aggregate command counts across multiple days."""
+ total = Counter()
+ for data in data_list:
+ section = data.get(key, {})
+ cmds = section.get("all_commands", {}) or section.get("all_binaries", {})
+ if isinstance(cmds, dict):
+ total.update(cmds)
+ return total
+
+
+def aggregate_pi(shared_list: list[dict]) -> dict:
+ """Aggregate pi session data across days."""
+ tools = Counter()
+ skills = Counter()
+ models = Counter()
+ providers = Counter()
+ total_sessions = 0
+
+ for data in shared_list:
+ pi = data.get("pi", {})
+ total_sessions += pi.get("sessions_count", 0)
+ tools.update(pi.get("tools", {}))
+ skills.update(pi.get("skills_loaded", {}))
+ models.update(pi.get("models_used", {}))
+ providers.update(pi.get("providers_used", {}))
+
+ # Compute never-used from aggregate: skills in directory but never loaded
+ all_declared = set()
+ if shared_list:
+ for data in shared_list:
+ all_declared.update(data.get("pi", {}).get("skills_never_used", []))
+ all_declared.update(data.get("pi", {}).get("skills_loaded", {}).keys())
+ never_used = sorted(s for s in all_declared if s not in skills)
+
+ return {
+ "sessions": total_sessions,
+ "tools": tools,
+ "skills": skills,
+ "skills_never_used": never_used,
+ "models": models,
+ "providers": providers,
+ }
+
+
+def format_counter(counter: Counter, limit: int = 20) -> str:
+ """Format a counter as a readable list."""
+ items = counter.most_common(limit)
+ if not items:
+ return " (no data)\n"
+ max_name = max(len(name) for name, _ in items)
+ lines = []
+ for name, count in items:
+ bar = "█" * min(count, 50)
+ lines.append(f" {name:<{max_name}} {count:>5} {bar}")
+ return "\n".join(lines) + "\n"
+
+
+def generate_report(days: int, format: str = "md") -> str:
+ """Generate the full usage report."""
+ host_data = load_host_data(days)
+ shared_data = load_shared_data(days)
+
+ lines = []
+ lines.append(f"# Usage Report — last {days} days")
+ lines.append(f"Generated: {date.today()}\n")
+
+ # Per-host sections
+ for hostname, data_list in sorted(host_data.items()):
+ if not data_list:
+ continue
+
+ lines.append(f"## Host: {hostname} ({len(data_list)} days of data)\n")
+
+ # Shell commands
+ shell_cmds = aggregate_commands(data_list, "shell")
+ total_cmds = sum(shell_cmds.values())
+ lines.append(f"### Shell Commands")
+ lines.append(f"Total: {total_cmds} commands, {len(shell_cmds)} unique\n")
+ lines.append(format_counter(shell_cmds))
+
+ # Process accounting
+ acct_cmds = aggregate_commands(data_list, "process_accounting")
+ if acct_cmds:
+ lines.append(f"### Process Accounting (all exec'd binaries)")
+ lines.append(f"Total: {sum(acct_cmds.values())} executions, {len(acct_cmds)} unique\n")
+ lines.append(format_counter(acct_cmds))
+
+ # Nix packages (from most recent snapshot)
+ latest = data_list[-1]
+ nix = latest.get("nix_packages", {})
+ if isinstance(nix, dict) and "total_bins" in nix:
+ lines.append(f"### Nix Packages")
+ lines.append(f"Installed bins: {nix.get('total_bins', '?')}")
+ lines.append(f"Used (today): {nix.get('used_count', '?')}")
+ lines.append(f"Unused (today): {nix.get('unused_count', '?')}\n")
+
+ # Cross-reference with historical shell usage for better pruning
+ all_used = set(shell_cmds.keys()) | set(acct_cmds.keys())
+ unused_bins = set(nix.get("unused_bins", [])) - all_used
+ if unused_bins:
+ lines.append(f"**Never used in {days} days** ({len(unused_bins)} bins):")
+ # Show first 30
+ for b in sorted(unused_bins)[:30]:
+ lines.append(f" - {b}")
+ if len(unused_bins) > 30:
+ lines.append(f" ... and {len(unused_bins) - 30} more")
+ lines.append("")
+
+ # Emacs
+ emacs = latest.get("emacs", {})
+ if isinstance(emacs, dict) and "declared_count" in emacs:
+ lines.append(f"### Emacs Packages")
+ lines.append(f"Declared: {emacs.get('declared_count', '?')}")
+ lines.append(f"Loaded features: {emacs.get('loaded_count', '?')}")
+ unused_pkgs = emacs.get("unused_packages", [])
+ if unused_pkgs:
+ lines.append(f"Potentially unused ({len(unused_pkgs)}):")
+ for p in sorted(unused_pkgs):
+ lines.append(f" - {p}")
+ lines.append("")
+
+ # Custom tools
+ custom = latest.get("custom_tools", {})
+ if isinstance(custom, dict) and "defined" in custom:
+ lines.append(f"### Custom Tools (from pkgs/)")
+ lines.append(f"Defined: {len(custom.get('defined', []))}")
+ used = custom.get("used", [])
+ unused = custom.get("unused", [])
+ if used:
+ lines.append(f"Used: {', '.join(sorted(used))}")
+ if unused:
+ lines.append(f"Unused: {', '.join(sorted(unused))}")
+ lines.append("")
+
+ # Services
+ services = latest.get("services", {})
+ if isinstance(services, dict) and "running" in services:
+ lines.append(f"### System Services")
+ lines.append(f"Running: {services.get('total', '?')} services")
+ for s in services.get("running", []):
+ lines.append(f" - {s}")
+ lines.append("")
+
+ # Shared / Pi section
+ if shared_data:
+ pi = aggregate_pi(shared_data)
+ lines.append(f"## Pi Agent (across all hosts)\n")
+ lines.append(f"Total sessions: {pi['sessions']}\n")
+
+ lines.append("### Tools Used")
+ lines.append(format_counter(pi["tools"]))
+
+ lines.append("### Skills Loaded")
+ lines.append(format_counter(pi["skills"]))
+
+ if pi["skills_never_used"]:
+ lines.append(f"### Skills Never Used ({len(pi['skills_never_used'])})")
+ for s in pi["skills_never_used"]:
+ lines.append(f" - {s}")
+ lines.append("")
+
+ lines.append("### Models Used")
+ lines.append(format_counter(pi["models"]))
+
+ lines.append("### Providers")
+ lines.append(format_counter(pi["providers"]))
+
+ report = "\n".join(lines)
+
+ if format == "json":
+ # Re-export as structured JSON for LLM consumption
+ return json.dumps({
+ "period_days": days,
+ "generated": str(date.today()),
+ "hosts": {h: d for h, d in host_data.items()},
+ "shared": shared_data,
+ }, indent=2, default=str)
+
+ return report
+
+
+def main():
+ parser = argparse.ArgumentParser(description="Generate usage report")
+ parser.add_argument("--days", type=int, default=30, help="Number of days to include (default: 30)")
+ parser.add_argument("--format", choices=["md", "json"], default="md", help="Output format")
+ parser.add_argument("--output", "-o", type=str, help="Write to file instead of stdout")
+ args = parser.parse_args()
+
+ report = generate_report(args.days, args.format)
+
+ if args.output:
+ Path(args.output).write_text(report)
+ print(f"Report written to {args.output}", file=sys.stderr)
+ else:
+ print(report)
+
+
+if __name__ == "__main__":
+ main()
globals.nix
@@ -54,6 +54,10 @@ _: {
id = "ai-sync"; # unified AI agent storage (sessions, plans, learnings, research)
path = "/home/vincent/.local/share/ai-sync";
};
+ usage-metrics = {
+ id = "usage-metrics";
+ path = "/home/vincent/.local/share/usage-metrics";
+ };
paperless-media = {
id = "paperless-media";
path = "/neo/paperless/media";
@@ -241,6 +245,7 @@ _: {
wallpapers = { };
claude-sync = { };
ai-sync = { };
+ usage-metrics = { };
paperless-media = {
type = "receiveonly";
path = "/home/vincent/desktop/paperless-media";
@@ -286,6 +291,7 @@ _: {
wallpapers = { };
claude-sync = { };
ai-sync = { };
+ usage-metrics = { };
paperless-media = {
type = "receiveonly";
path = "/home/vincent/desktop/paperless-media";
@@ -534,6 +540,7 @@ _: {
wallpapers = { };
claude-sync = { };
ai-sync = { };
+ usage-metrics = { };
paperless-media = {
type = "receiveonly";
path = "/home/vincent/desktop/paperless-media";