nftable-migration
  1#!/usr/bin/env python3
  2"""
  3Shared library for *arr (Sonarr, Radarr, Lidarr) automation scripts.
  4
  5Provides common functionality for API interaction, user confirmation,
  6and output formatting across all *arr stack scripts.
  7"""
  8
  9import subprocess
 10import sys
 11import time
 12from typing import Any, Dict, List, Optional
 13
 14import requests
 15
 16
 17class ArrClient:
 18    """Base client for *arr API interactions."""
 19
 20    def __init__(self, base_url: str, api_key: str):
 21        """
 22        Initialize the *arr API client.
 23
 24        Args:
 25            base_url: Base URL of the *arr service
 26                      (e.g., http://localhost:8989)
 27            api_key: API key for authentication
 28        """
 29        self.base_url = base_url.rstrip("/")
 30        self.api_key = api_key
 31        self.headers = {"X-Api-Key": api_key}
 32
 33    def get(
 34        self,
 35        endpoint: str,
 36        params: Optional[Dict[str, Any]] = None,
 37        max_retries: int = 3,
 38        retry_delay: float = 2.0,
 39    ) -> List[Dict[str, Any]] | Dict[str, Any]:
 40        """
 41        Make a GET request to the *arr API with retry logic.
 42
 43        Args:
 44            endpoint: API endpoint path (e.g., /api/v3/series)
 45            params: Optional query parameters
 46            max_retries: Maximum number of retry attempts
 47            retry_delay: Initial delay between retries (seconds)
 48
 49        Returns:
 50            JSON response data
 51
 52        Raises:
 53            SystemExit: If the request fails after all retries
 54        """
 55        url = f"{self.base_url}{endpoint}"
 56
 57        for attempt in range(max_retries):
 58            try:
 59                response = requests.get(
 60                    url, headers=self.headers, params=params, timeout=30
 61                )
 62                response.raise_for_status()
 63                return response.json()
 64            except requests.exceptions.HTTPError as e:
 65                status_code = e.response.status_code if e.response else None
 66
 67                # Retry on server errors (5xx) or rate limiting (429)
 68                if status_code in [429, 500, 502, 503, 504]:
 69                    if attempt < max_retries - 1:
 70                        wait_time = retry_delay * (2**attempt)
 71                        print(
 72                            f"  Server error ({status_code}), "
 73                            f"retrying in {wait_time}s... "
 74                            f"(attempt {attempt + 1}/{max_retries})"
 75                        )
 76                        time.sleep(wait_time)
 77                        continue
 78
 79                # Don't retry on client errors (4xx except 429)
 80                print(
 81                    f"Error fetching from {endpoint}: HTTP {status_code}",
 82                    file=sys.stderr,
 83                )
 84                if params:
 85                    print(f"  Params: {params}", file=sys.stderr)
 86                if e.response:
 87                    try:
 88                        error_detail = e.response.json()
 89                        print(f"  Detail: {error_detail}", file=sys.stderr)
 90                    except Exception:
 91                        print(
 92                            f"  Response: {e.response.text[:200]}",
 93                            file=sys.stderr,
 94                        )
 95                sys.exit(1)
 96            except requests.exceptions.Timeout:
 97                if attempt < max_retries - 1:
 98                    wait_time = retry_delay * (2**attempt)
 99                    print(
100                        f"  Request timeout, retrying in {wait_time}s... "
101                        f"(attempt {attempt + 1}/{max_retries})"
102                    )
103                    time.sleep(wait_time)
104                    continue
105                print(f"Error: Request timeout on {endpoint}", file=sys.stderr)
106                sys.exit(1)
107            except requests.exceptions.RequestException as e:
108                print(f"Error fetching from {endpoint}: {e}", file=sys.stderr)
109                if params:
110                    print(f"  Params: {params}", file=sys.stderr)
111                sys.exit(1)
112
113        # Should not reach here, but just in case
114        print(
115            f"Error: Failed after {max_retries} attempts", file=sys.stderr
116        )
117        sys.exit(1)
118
119    def post(
120        self,
121        endpoint: str,
122        payload: Dict[str, Any],
123        max_retries: int = 3,
124        retry_delay: float = 2.0,
125    ) -> Dict[str, Any]:
126        """
127        Make a POST request to the *arr API with retry logic.
128
129        Args:
130            endpoint: API endpoint path (e.g., /api/v3/command)
131            payload: JSON payload to send
132            max_retries: Maximum number of retry attempts
133            retry_delay: Initial delay between retries (seconds)
134
135        Returns:
136            JSON response data (empty dict on failure)
137        """
138        url = f"{self.base_url}{endpoint}"
139        headers = {**self.headers, "Content-Type": "application/json"}
140
141        for attempt in range(max_retries):
142            try:
143                response = requests.post(
144                    url, headers=headers, json=payload, timeout=30
145                )
146                response.raise_for_status()
147                return response.json()
148            except requests.exceptions.HTTPError as e:
149                status_code = e.response.status_code if e.response else None
150
151                # Retry on server errors (5xx) or rate limiting (429)
152                if status_code in [429, 500, 502, 503, 504]:
153                    if attempt < max_retries - 1:
154                        wait_time = retry_delay * (2**attempt)
155                        print(
156                            f"  Server error ({status_code}), "
157                            f"retrying in {wait_time}s... "
158                            f"(attempt {attempt + 1}/{max_retries})"
159                        )
160                        time.sleep(wait_time)
161                        continue
162
163                print(
164                    f"Error posting to {endpoint}: HTTP {status_code}",
165                    file=sys.stderr,
166                )
167                if e.response:
168                    try:
169                        error_detail = e.response.json()
170                        print(f"  Detail: {error_detail}", file=sys.stderr)
171                    except Exception:
172                        print(
173                            f"  Response: {e.response.text[:200]}",
174                            file=sys.stderr,
175                        )
176                return {}
177            except requests.exceptions.Timeout:
178                if attempt < max_retries - 1:
179                    wait_time = retry_delay * (2**attempt)
180                    print(
181                        f"  Request timeout, retrying in {wait_time}s... "
182                        f"(attempt {attempt + 1}/{max_retries})"
183                    )
184                    time.sleep(wait_time)
185                    continue
186                print(f"Error: Request timeout on {endpoint}", file=sys.stderr)
187                return {}
188            except requests.exceptions.RequestException as e:
189                print(f"Error posting to {endpoint}: {e}", file=sys.stderr)
190                return {}
191
192        return {}
193
194
195def ask_confirmation(prompt: str) -> bool:
196    """
197    Ask user for yes/no confirmation.
198
199    Args:
200        prompt: Question to ask the user
201
202    Returns:
203        True if user confirms (y/yes), False otherwise (n/no)
204    """
205    while True:
206        response = input(f"{prompt} (y/n): ").lower().strip()
207        if response in ["y", "yes"]:
208            return True
209        elif response in ["n", "no"]:
210            return False
211        else:
212            print("Please answer 'y' or 'n'")
213
214
215class CommandContext:
216    """Context object for command execution with common options."""
217
218    def __init__(self, dry_run: bool = False, no_confirm: bool = False):
219        """
220        Initialize command context.
221
222        Args:
223            dry_run: If True, show changes without applying them
224            no_confirm: If True, skip interactive confirmations
225        """
226        self.dry_run = dry_run
227        self.no_confirm = no_confirm
228
229
230def print_separator(char: str = "=", width: int = 80) -> None:
231    """Print a separator line."""
232    print(char * width)
233
234
235def print_section_header(title: str) -> None:
236    """Print a section header with separators."""
237    print("\n" + "=" * 80)
238    print(title)
239    print("=" * 80)
240
241
242def print_item_list(
243    items: List[str], prefix: str, max_display: int = 5
244) -> None:
245    """
246    Print a list of items with optional truncation.
247
248    Args:
249        items: List of item names to display
250        prefix: Prefix message to show before the list
251        max_display: Maximum number of items to show before truncating
252    """
253    if not items:
254        return
255
256    count = len(items)
257    print(f"\n{prefix} ({count} items):")
258    for item in items[:max_display]:
259        print(f"  - {item}")
260    if len(items) > max_display:
261        remaining = len(items) - max_display
262        print(f"  ... and {remaining} more")
263
264
265def get_confirmation_decision(
266    ctx: CommandContext, prompt: str
267) -> bool:
268    """
269    Determine whether to proceed based on dry-run, no-confirm, or user input.
270
271    Args:
272        ctx: Command context with dry_run and no_confirm flags
273        prompt: Confirmation prompt to show user
274
275    Returns:
276        True if should proceed, False otherwise
277    """
278    if ctx.dry_run:
279        print("\n[DRY RUN] Skipping actual operation")
280        return False
281    elif ctx.no_confirm:
282        print("\n[NO CONFIRM] Proceeding with operation...")
283        return True
284    else:
285        return ask_confirmation(prompt)
286
287
288def print_final_summary(
289    total: int,
290    processed: int,
291    skipped: int,
292    operation: str,
293    queue_note: bool = True,
294) -> None:
295    """
296    Print final summary of operations.
297
298    Args:
299        total: Total items that needed processing
300        processed: Number of items successfully processed
301        skipped: Number of items skipped
302        operation: Name of the operation (e.g., "Renamed", "Retagged")
303        queue_note: Whether to show the queue check note
304    """
305    print_section_header("FINAL SUMMARY")
306    print(f"\nItems processed: {total}")
307    print(f"  - {operation}: {processed}")
308    print(f"  - Skipped: {skipped}")
309
310    if processed > 0 and queue_note:
311        print(
312            f"\nNote: {operation} operations are queued. "
313            "Check the service's queue for progress."
314        )
315
316
317def select_with_fzf(
318    items: List[Dict[str, str]], display_format: str, multi: bool = True
319) -> List[str]:
320    """
321    Use fzf to interactively select items.
322
323    Args:
324        items: List of dictionaries containing item data
325        display_format: Format string for displaying items (e.g.,
326                       "{name} ({owner}, {tracks_total} tracks)")
327        multi: Allow multiple selection if True
328
329    Returns:
330        List of selected item IDs (empty list if cancelled)
331    """
332    if not items:
333        return []
334
335    # Create lookup table: display text -> item id
336    lookup = {}
337    lines = []
338    for item in items:
339        display = display_format.format(**item)
340        lines.append(display)
341        lookup[display] = item.get("id")
342
343    # Prepare fzf input
344    fzf_input = "\n".join(lines)
345
346    # Run fzf
347    fzf_args = ["fzf", "--ansi", "--prompt=Select playlists: "]
348    if multi:
349        fzf_args.append("--multi")
350
351    try:
352        result = subprocess.run(
353            fzf_args,
354            input=fzf_input,
355            text=True,
356            capture_output=True,
357            check=True,
358        )
359        # Parse selected lines
360        selected_lines = result.stdout.strip().split("\n")
361        return [lookup[line] for line in selected_lines if line in lookup]
362    except subprocess.CalledProcessError:
363        # User cancelled or fzf not found
364        return []
365    except FileNotFoundError:
366        print(
367            "Error: fzf not found. Please install fzf:", file=sys.stderr
368        )
369        print(
370            "  On NixOS: nix-env -iA nixpkgs.fzf", file=sys.stderr
371        )
372        print("  On other systems: see https://github.com/junegunn/fzf")
373        sys.exit(1)
374
375
376class SpotifyClient:
377    """Client for Spotify API interactions using client credentials flow."""
378
379    def __init__(self, client_id: str, client_secret: str):
380        """
381        Initialize the Spotify API client with client credentials.
382
383        This uses the client credentials flow which can access public
384        playlists but not private user data.
385
386        Args:
387            client_id: Spotify application client ID
388            client_secret: Spotify application client secret
389        """
390        try:
391            import spotipy
392            from spotipy.oauth2 import SpotifyClientCredentials
393        except ImportError:
394            print(
395                "Error: spotipy library not found. Install it with:",
396                file=sys.stderr,
397            )
398            print("  pip install spotipy", file=sys.stderr)
399            sys.exit(1)
400
401        # Use client credentials flow (no OAuth required)
402        auth_manager = SpotifyClientCredentials(
403            client_id=client_id, client_secret=client_secret
404        )
405        self.sp = spotipy.Spotify(auth_manager=auth_manager)
406
407    def get_playlist_tracks(self, playlist_id: str) -> List[Dict[str, Any]]:
408        """
409        Fetch all tracks from a Spotify playlist.
410
411        Args:
412            playlist_id: Spotify playlist ID or URI
413
414        Returns:
415            List of track information dictionaries
416        """
417        tracks = []
418        results = self.sp.playlist_tracks(playlist_id)
419
420        while results:
421            for item in results.get("items", []):
422                if item and item.get("track"):
423                    track = item["track"]
424                    tracks.append(
425                        {
426                            "name": track.get("name"),
427                            "artists": [
428                                {
429                                    "name": artist.get("name"),
430                                    "id": artist.get("id"),
431                                }
432                                for artist in track.get("artists", [])
433                            ],
434                            "album": track.get("album", {}).get("name"),
435                            "album_id": track.get("album", {}).get("id"),
436                        }
437                    )
438
439            # Handle pagination
440            if results.get("next"):
441                results = self.sp.next(results)
442            else:
443                results = None
444
445        return tracks
446
447    def get_playlist_info(self, playlist_id: str) -> Dict[str, Any]:
448        """
449        Get information about a Spotify playlist.
450
451        Args:
452            playlist_id: Spotify playlist ID or URI
453
454        Returns:
455            Playlist information dictionary
456        """
457        playlist = self.sp.playlist(playlist_id)
458        return {
459            "name": playlist.get("name"),
460            "description": playlist.get("description"),
461            "owner": playlist.get("owner", {}).get("display_name"),
462            "tracks_total": playlist.get("tracks", {}).get("total", 0),
463        }
464
465    def get_user_playlists(self, username: str) -> List[Dict[str, Any]]:
466        """
467        Fetch all public playlists for a specific user.
468
469        Args:
470            username: Spotify username (user ID)
471
472        Returns:
473            List of playlist information dictionaries
474        """
475        playlists = []
476        try:
477            results = self.sp.user_playlists(username)
478
479            while results:
480                for item in results.get("items", []):
481                    if item:
482                        playlists.append(
483                            {
484                                "id": item.get("id"),
485                                "name": item.get("name"),
486                                "owner": item.get("owner", {}).get(
487                                    "display_name"
488                                ),
489                                "tracks_total": item.get("tracks", {}).get(
490                                    "total", 0
491                                ),
492                                "public": item.get("public", False),
493                            }
494                        )
495
496                # Handle pagination
497                if results.get("next"):
498                    results = self.sp.next(results)
499                else:
500                    results = None
501
502        except Exception as e:
503            print(
504                f"Error fetching playlists for user '{username}': {e}",
505                file=sys.stderr,
506            )
507            print(
508                "Make sure the username is correct and the user has "
509                "public playlists.",
510                file=sys.stderr,
511            )
512
513        return playlists