main
   1#!/usr/bin/env python3
   2"""
   3Shared library for *arr (Sonarr, Radarr, Lidarr) automation scripts.
   4
   5Provides common functionality for API interaction, user confirmation,
   6and output formatting across all *arr stack scripts.
   7"""
   8
   9import subprocess
  10import sys
  11import time
  12from typing import Any, Dict, List, Optional
  13
  14import requests
  15
  16
  17class ArrClient:
  18    """Base client for *arr API interactions."""
  19
  20    def __init__(self, base_url: str, api_key: str):
  21        """
  22        Initialize the *arr API client.
  23
  24        Args:
  25            base_url: Base URL of the *arr service
  26                      (e.g., http://localhost:8989)
  27            api_key: API key for authentication
  28        """
  29        self.base_url = base_url.rstrip("/")
  30        self.api_key = api_key
  31        self.headers = {"X-Api-Key": api_key}
  32
  33    def get(
  34        self,
  35        endpoint: str,
  36        params: Optional[Dict[str, Any]] = None,
  37        max_retries: int = 3,
  38        retry_delay: float = 2.0,
  39    ) -> List[Dict[str, Any]] | Dict[str, Any]:
  40        """
  41        Make a GET request to the *arr API with retry logic.
  42
  43        Args:
  44            endpoint: API endpoint path (e.g., /api/v3/series)
  45            params: Optional query parameters
  46            max_retries: Maximum number of retry attempts
  47            retry_delay: Initial delay between retries (seconds)
  48
  49        Returns:
  50            JSON response data
  51
  52        Raises:
  53            SystemExit: If the request fails after all retries
  54        """
  55        url = f"{self.base_url}{endpoint}"
  56
  57        for attempt in range(max_retries):
  58            try:
  59                response = requests.get(
  60                    url, headers=self.headers, params=params, timeout=30
  61                )
  62                response.raise_for_status()
  63                return response.json()
  64            except requests.exceptions.HTTPError as e:
  65                status_code = e.response.status_code if e.response else None
  66
  67                # Retry on server errors (5xx) or rate limiting (429)
  68                if status_code in [429, 500, 502, 503, 504]:
  69                    if attempt < max_retries - 1:
  70                        wait_time = retry_delay * (2**attempt)
  71                        print(
  72                            f"  Server error ({status_code}), "
  73                            f"retrying in {wait_time}s... "
  74                            f"(attempt {attempt + 1}/{max_retries})"
  75                        )
  76                        time.sleep(wait_time)
  77                        continue
  78
  79                # Don't retry on client errors (4xx except 429)
  80                print(
  81                    f"Error fetching from {endpoint}: HTTP {status_code}",
  82                    file=sys.stderr,
  83                )
  84                if params:
  85                    print(f"  Params: {params}", file=sys.stderr)
  86                if e.response:
  87                    try:
  88                        error_detail = e.response.json()
  89                        print(f"  Detail: {error_detail}", file=sys.stderr)
  90                    except Exception:
  91                        print(
  92                            f"  Response: {e.response.text[:200]}",
  93                            file=sys.stderr,
  94                        )
  95                sys.exit(1)
  96            except requests.exceptions.Timeout:
  97                if attempt < max_retries - 1:
  98                    wait_time = retry_delay * (2**attempt)
  99                    print(
 100                        f"  Request timeout, retrying in {wait_time}s... "
 101                        f"(attempt {attempt + 1}/{max_retries})"
 102                    )
 103                    time.sleep(wait_time)
 104                    continue
 105                print(f"Error: Request timeout on {endpoint}", file=sys.stderr)
 106                sys.exit(1)
 107            except requests.exceptions.RequestException as e:
 108                print(f"Error fetching from {endpoint}: {e}", file=sys.stderr)
 109                if params:
 110                    print(f"  Params: {params}", file=sys.stderr)
 111                sys.exit(1)
 112
 113        # Should not reach here, but just in case
 114        print(f"Error: Failed after {max_retries} attempts", file=sys.stderr)
 115        sys.exit(1)
 116
 117    def post(
 118        self,
 119        endpoint: str,
 120        payload: Dict[str, Any],
 121        max_retries: int = 3,
 122        retry_delay: float = 2.0,
 123    ) -> Dict[str, Any]:
 124        """
 125        Make a POST request to the *arr API with retry logic.
 126
 127        Args:
 128            endpoint: API endpoint path (e.g., /api/v3/command)
 129            payload: JSON payload to send
 130            max_retries: Maximum number of retry attempts
 131            retry_delay: Initial delay between retries (seconds)
 132
 133        Returns:
 134            JSON response data (empty dict on failure)
 135        """
 136        url = f"{self.base_url}{endpoint}"
 137        headers = {**self.headers, "Content-Type": "application/json"}
 138
 139        for attempt in range(max_retries):
 140            try:
 141                response = requests.post(
 142                    url, headers=headers, json=payload, timeout=30
 143                )
 144                response.raise_for_status()
 145                return response.json()
 146            except requests.exceptions.HTTPError as e:
 147                status_code = e.response.status_code if e.response else None
 148
 149                # Retry on server errors (5xx) or rate limiting (429)
 150                if status_code in [429, 500, 502, 503, 504]:
 151                    if attempt < max_retries - 1:
 152                        wait_time = retry_delay * (2**attempt)
 153                        print(
 154                            f"  Server error ({status_code}), "
 155                            f"retrying in {wait_time}s... "
 156                            f"(attempt {attempt + 1}/{max_retries})"
 157                        )
 158                        time.sleep(wait_time)
 159                        continue
 160
 161                # Better error reporting
 162                if status_code:
 163                    print(
 164                        f"Error posting to {endpoint}: HTTP {status_code}",
 165                        file=sys.stderr,
 166                    )
 167                else:
 168                    error_msg = (
 169                        f"Error posting to {endpoint}: "
 170                        f"{type(e).__name__} - {str(e)}"
 171                    )
 172                    print(error_msg, file=sys.stderr)
 173
 174                # Print payload for debugging
 175                print(f"  Payload: {payload}", file=sys.stderr)
 176
 177                # Always try to get response details
 178                if e.response is not None:
 179                    print(
 180                        f"  Response status: {e.response.status_code}",
 181                        file=sys.stderr,
 182                    )
 183                    print(
 184                        f"  Response headers: {dict(e.response.headers)}",
 185                        file=sys.stderr,
 186                    )
 187                    try:
 188                        error_detail = e.response.json()
 189                        print(
 190                            f"  Response JSON: {error_detail}", file=sys.stderr
 191                        )
 192                    except Exception:
 193                        print(
 194                            f"  Response text: {e.response.text}",
 195                            file=sys.stderr,
 196                        )
 197                else:
 198                    print("  No response object available", file=sys.stderr)
 199                return {}
 200            except requests.exceptions.Timeout:
 201                if attempt < max_retries - 1:
 202                    wait_time = retry_delay * (2**attempt)
 203                    print(
 204                        f"  Request timeout, retrying in {wait_time}s... "
 205                        f"(attempt {attempt + 1}/{max_retries})"
 206                    )
 207                    time.sleep(wait_time)
 208                    continue
 209                print(f"Error: Request timeout on {endpoint}", file=sys.stderr)
 210                return {}
 211            except requests.exceptions.RequestException as e:
 212                print(f"Error posting to {endpoint}: {e}", file=sys.stderr)
 213                return {}
 214
 215        return {}
 216
 217    def put(
 218        self,
 219        endpoint: str,
 220        payload: Dict[str, Any],
 221        max_retries: int = 3,
 222        retry_delay: float = 2.0,
 223    ) -> Dict[str, Any]:
 224        """
 225        Make a PUT request to the *arr API with retry logic.
 226
 227        Args:
 228            endpoint: API endpoint path (e.g., /api/v3/series)
 229            payload: JSON payload to send
 230            max_retries: Maximum number of retry attempts
 231            retry_delay: Initial delay between retries (seconds)
 232
 233        Returns:
 234            JSON response data (empty dict on failure)
 235        """
 236        url = f"{self.base_url}{endpoint}"
 237        headers = {**self.headers, "Content-Type": "application/json"}
 238
 239        for attempt in range(max_retries):
 240            try:
 241                response = requests.put(
 242                    url, headers=headers, json=payload, timeout=30
 243                )
 244                response.raise_for_status()
 245                return response.json()
 246            except requests.exceptions.HTTPError as e:
 247                status_code = e.response.status_code if e.response else None
 248
 249                # Retry on server errors (5xx) or rate limiting (429)
 250                if status_code in [429, 500, 502, 503, 504]:
 251                    if attempt < max_retries - 1:
 252                        wait_time = retry_delay * (2**attempt)
 253                        print(
 254                            f"  Server error ({status_code}), "
 255                            f"retrying in {wait_time}s... "
 256                            f"(attempt {attempt + 1}/{max_retries})"
 257                        )
 258                        time.sleep(wait_time)
 259                        continue
 260
 261                print(
 262                    f"Error putting to {endpoint}: HTTP {status_code}",
 263                    file=sys.stderr,
 264                )
 265                if e.response:
 266                    try:
 267                        error_detail = e.response.json()
 268                        print(f"  Detail: {error_detail}", file=sys.stderr)
 269                    except Exception:
 270                        print(
 271                            f"  Response: {e.response.text[:200]}",
 272                            file=sys.stderr,
 273                        )
 274                return {}
 275            except requests.exceptions.Timeout:
 276                if attempt < max_retries - 1:
 277                    wait_time = retry_delay * (2**attempt)
 278                    print(
 279                        f"  Request timeout, retrying in {wait_time}s... "
 280                        f"(attempt {attempt + 1}/{max_retries})"
 281                    )
 282                    time.sleep(wait_time)
 283                    continue
 284                print(f"Error: Request timeout on {endpoint}", file=sys.stderr)
 285                return {}
 286            except requests.exceptions.RequestException as e:
 287                print(f"Error putting to {endpoint}: {e}", file=sys.stderr)
 288                return {}
 289
 290        return {}
 291
 292
 293def ask_confirmation(prompt: str) -> bool:
 294    """
 295    Ask user for yes/no confirmation.
 296
 297    Args:
 298        prompt: Question to ask the user
 299
 300    Returns:
 301        True if user confirms (y/yes), False otherwise (n/no)
 302    """
 303    while True:
 304        response = input(f"{prompt} (y/n): ").lower().strip()
 305        if response in ["y", "yes"]:
 306            return True
 307        elif response in ["n", "no"]:
 308            return False
 309        else:
 310            print("Please answer 'y' or 'n'")
 311
 312
 313class CommandContext:
 314    """Context object for command execution with common options."""
 315
 316    def __init__(self, dry_run: bool = False, no_confirm: bool = False):
 317        """
 318        Initialize command context.
 319
 320        Args:
 321            dry_run: If True, show changes without applying them
 322            no_confirm: If True, skip interactive confirmations
 323        """
 324        self.dry_run = dry_run
 325        self.no_confirm = no_confirm
 326
 327
 328def print_separator(char: str = "=", width: int = 80) -> None:
 329    """Print a separator line."""
 330    print(char * width)
 331
 332
 333def print_section_header(title: str) -> None:
 334    """Print a section header with separators."""
 335    print("\n" + "=" * 80)
 336    print(title)
 337    print("=" * 80)
 338
 339
 340def print_item_list(
 341    items: List[str], prefix: str, max_display: int = 5
 342) -> None:
 343    """
 344    Print a list of items with optional truncation.
 345
 346    Args:
 347        items: List of item names to display
 348        prefix: Prefix message to show before the list
 349        max_display: Maximum number of items to show before truncating
 350    """
 351    if not items:
 352        return
 353
 354    count = len(items)
 355    print(f"\n{prefix} ({count} items):")
 356    for item in items[:max_display]:
 357        print(f"  - {item}")
 358    if len(items) > max_display:
 359        remaining = len(items) - max_display
 360        print(f"  ... and {remaining} more")
 361
 362
 363def get_confirmation_decision(ctx: CommandContext, prompt: str) -> bool:
 364    """
 365    Determine whether to proceed based on dry-run, no-confirm, or user input.
 366
 367    Args:
 368        ctx: Command context with dry_run and no_confirm flags
 369        prompt: Confirmation prompt to show user
 370
 371    Returns:
 372        True if should proceed, False otherwise
 373    """
 374    if ctx.dry_run:
 375        print("\n[DRY RUN] Skipping actual operation")
 376        return False
 377    elif ctx.no_confirm:
 378        print("\n[NO CONFIRM] Proceeding with operation...")
 379        return True
 380    else:
 381        return ask_confirmation(prompt)
 382
 383
 384def print_final_summary(
 385    total: int,
 386    processed: int,
 387    skipped: int,
 388    operation: str,
 389    queue_note: bool = True,
 390) -> None:
 391    """
 392    Print final summary of operations.
 393
 394    Args:
 395        total: Total items that needed processing
 396        processed: Number of items successfully processed
 397        skipped: Number of items skipped
 398        operation: Name of the operation (e.g., "Renamed", "Retagged")
 399        queue_note: Whether to show the queue check note
 400    """
 401    print_section_header("FINAL SUMMARY")
 402    print(f"\nItems processed: {total}")
 403    print(f"  - {operation}: {processed}")
 404    print(f"  - Skipped: {skipped}")
 405
 406    if processed > 0 and queue_note:
 407        print(
 408            f"\nNote: {operation} operations are queued. "
 409            "Check the service's queue for progress."
 410        )
 411
 412
 413def select_with_fzf(
 414    items: List[Dict[str, str]],
 415    display_format: str,
 416    multi: bool = True,
 417    enable_star_select: bool = False,
 418) -> List[str]:
 419    """
 420    Use fzf to interactively select items.
 421
 422    Args:
 423        items: List of dictionaries containing item data
 424        display_format: Format string for displaying items (e.g.,
 425                       "{name} ({owner}, {tracks_total} tracks)")
 426        multi: Allow multiple selection if True
 427        enable_star_select: Add Ctrl-S keybinding to select all ★ items
 428
 429    Returns:
 430        List of selected item IDs (empty list if cancelled)
 431    """
 432    if not items:
 433        return []
 434
 435    # Create lookup table: display text -> item id
 436    lookup = {}
 437    lines = []
 438    for item in items:
 439        display = display_format.format(**item)
 440        lines.append(display)
 441        lookup[display] = item.get("id")
 442
 443    # Prepare fzf input
 444    fzf_input = "\n".join(lines)
 445
 446    # Run fzf
 447    fzf_args = ["fzf", "--ansi", "--prompt=Select items: "]
 448    if multi:
 449        fzf_args.append("--multi")
 450
 451    # Add keybinding to select all starred items
 452    if enable_star_select:
 453        fzf_args.extend([
 454            "--bind", "ctrl-s:select-all+accept",
 455            "--header", "TAB: select | ENTER: confirm | Ctrl-S: select all ★ items"
 456        ])
 457
 458    try:
 459        result = subprocess.run(
 460            fzf_args,
 461            input=fzf_input,
 462            text=True,
 463            capture_output=True,
 464            check=True,
 465        )
 466        # Parse selected lines
 467        selected_lines = result.stdout.strip().split("\n")
 468        return [lookup[line] for line in selected_lines if line in lookup]
 469    except subprocess.CalledProcessError:
 470        # User cancelled or fzf not found
 471        return []
 472    except FileNotFoundError:
 473        print("Error: fzf not found. Please install fzf:", file=sys.stderr)
 474        print("  On NixOS: nix-env -iA nixpkgs.fzf", file=sys.stderr)
 475        print("  On other systems: see https://github.com/junegunn/fzf")
 476        sys.exit(1)
 477
 478
 479class JellyfinClient:
 480    """Client for Jellyfin API interactions."""
 481
 482    def __init__(
 483        self, base_url: str, api_token: str, user_id: str, debug: bool = False
 484    ):
 485        """
 486        Initialize the Jellyfin API client.
 487
 488        Args:
 489            base_url: Base URL of the Jellyfin service
 490                      (e.g., http://localhost:8096)
 491            api_token: API token for authentication
 492            user_id: User ID or username for playlist ownership
 493            debug: Enable debug output
 494        """
 495        self.base_url = base_url.rstrip("/")
 496        self.api_token = api_token
 497        self.debug = debug
 498        self.headers = {
 499            "Authorization": f'MediaBrowser Token="{api_token}"',
 500            "Content-Type": "application/json",
 501        }
 502
 503        # Resolve username to user ID if needed
 504        self.user_id = self._resolve_user_id(user_id)
 505
 506    def _resolve_user_id(self, user_identifier: str) -> str:
 507        """
 508        Resolve a username or user ID to a proper user ID.
 509
 510        If the identifier looks like a GUID, use it as-is.
 511        Otherwise, look up the user by username.
 512
 513        Args:
 514            user_identifier: Username or user ID
 515
 516        Returns:
 517            Resolved user ID (GUID)
 518        """
 519        # Check if it's already a GUID (basic check for 8-4-4-4-12 format)
 520        if len(user_identifier) == 32 or (
 521            len(user_identifier) == 36 and user_identifier.count("-") == 4
 522        ):
 523            return user_identifier
 524
 525        # Otherwise, look up by username
 526        try:
 527            response = self.get("/Users")
 528            if isinstance(response, list):
 529                for user in response:
 530                    user_name = user.get("Name", "").lower()
 531                    if user_name == user_identifier.lower():
 532                        return user.get("Id")
 533        except Exception:
 534            # If lookup fails, return the original identifier
 535            # and let subsequent API calls fail with a clearer error
 536            pass
 537
 538        # If not found, return original (might be unrecognized ID)
 539        return user_identifier
 540
 541    def get(
 542        self, endpoint: str, params: Optional[Dict[str, Any]] = None
 543    ) -> List[Dict[str, Any]] | Dict[str, Any]:
 544        """
 545        Make a GET request to the Jellyfin API.
 546
 547        Args:
 548            endpoint: API endpoint path (e.g., /Items)
 549            params: Optional query parameters
 550
 551        Returns:
 552            JSON response data
 553
 554        Raises:
 555            SystemExit: If the request fails
 556        """
 557        url = f"{self.base_url}{endpoint}"
 558        try:
 559            response = requests.get(
 560                url, headers=self.headers, params=params, timeout=30
 561            )
 562            response.raise_for_status()
 563            return response.json()
 564        except requests.exceptions.RequestException as e:
 565            print(
 566                f"Error fetching from Jellyfin {endpoint}: {e}",
 567                file=sys.stderr,
 568            )
 569            sys.exit(1)
 570
 571    def post(self, endpoint: str, payload: Dict[str, Any]) -> Dict[str, Any]:
 572        """
 573        Make a POST request to the Jellyfin API.
 574
 575        Args:
 576            endpoint: API endpoint path (e.g., /Playlists)
 577            payload: JSON payload to send
 578
 579        Returns:
 580            JSON response data
 581
 582        Raises:
 583            SystemExit: If the request fails
 584        """
 585        url = f"{self.base_url}{endpoint}"
 586        try:
 587            response = requests.post(
 588                url, headers=self.headers, json=payload, timeout=30
 589            )
 590            response.raise_for_status()
 591            return response.json()
 592        except requests.exceptions.RequestException as e:
 593            print(
 594                f"Error posting to Jellyfin {endpoint}: {e}",
 595                file=sys.stderr,
 596            )
 597            if hasattr(e, "response") and e.response is not None:
 598                try:
 599                    error_detail = e.response.json()
 600                    print(f"  Detail: {error_detail}", file=sys.stderr)
 601                except Exception:
 602                    print(
 603                        f"  Response: {e.response.text[:200]}",
 604                        file=sys.stderr,
 605                    )
 606            sys.exit(1)
 607
 608    def search_tracks(
 609        self,
 610        query: str,
 611        limit: int = 50,
 612        artist_name: str = None,
 613        track_name: str = None,
 614    ) -> List[Dict[str, Any]]:
 615        """
 616        Search for tracks in Jellyfin library.
 617
 618        Args:
 619            query: Legacy search query string (for backward compatibility)
 620            limit: Maximum number of results
 621            artist_name: Artist name to search by (preferred)
 622            track_name: Track name to search by
 623
 624        Returns:
 625            List of track items
 626        """
 627        # Prefer searching by artist name when available
 628        if artist_name:
 629            # Find artist using NameStartsWith (more reliable)
 630            artist_words = artist_name.split()
 631            artist_first_word = (
 632                artist_words[0] if artist_words else artist_name
 633            )
 634
 635            artist_params = {
 636                "NameStartsWith": artist_first_word,
 637                "IncludeItemTypes": "MusicArtist",
 638                "Limit": 50,
 639                "Recursive": True,
 640            }
 641            artist_result = self.get(
 642                f"/Users/{self.user_id}/Items", params=artist_params
 643            )
 644
 645            artists = artist_result.get("Items", [])
 646
 647            if self.debug:
 648                print(
 649                    f"DEBUG: Searching for artist starting with "
 650                    f"'{artist_first_word}' - Found {len(artists)} artists"
 651                )
 652                if artists:
 653                    for idx, artist in enumerate(artists[:5], 1):
 654                        print(f"  {idx}. {artist.get('Name')}")
 655
 656            if artists:
 657                # Find exact or best match
 658                artist_name_lower = artist_name.lower()
 659                matched_artist = None
 660
 661                for artist in artists:
 662                    if artist.get("Name", "").lower() == artist_name_lower:
 663                        matched_artist = artist
 664                        break
 665
 666                # If no exact match, use first result
 667                if not matched_artist:
 668                    matched_artist = artists[0]
 669
 670                artist_id = matched_artist.get("Id")
 671
 672                if self.debug:
 673                    artist_name_str = matched_artist.get("Name")
 674                    print(
 675                        f"DEBUG: Using artist: {artist_name_str} "
 676                        f"(ID: {artist_id})"
 677                    )
 678
 679                track_params = {
 680                    "ArtistIds": artist_id,
 681                    "IncludeItemTypes": "Audio",
 682                    "Recursive": True,
 683                    "Limit": limit,
 684                    "Fields": (
 685                        "Artists,Album,AlbumArtist,AlbumArtists,ArtistItems"
 686                    ),
 687                }
 688                result = self.get(
 689                    f"/Users/{self.user_id}/Items", params=track_params
 690                )
 691                items = result.get("Items", [])
 692
 693                if self.debug:
 694                    print(f"DEBUG: Found {len(items)} tracks by this artist")
 695
 696                return items
 697
 698        # Fallback: search by track name
 699        search_term = track_name or query
 700        words = search_term.split()
 701        first_word = words[0] if words else search_term
 702
 703        params = {
 704            "NameStartsWith": first_word,
 705            "IncludeItemTypes": "Audio",
 706            "Recursive": True,
 707            "Limit": 200,  # Larger limit: filtering client-side
 708            "Fields": ("Artists,Album,AlbumArtist,AlbumArtists,ArtistItems"),
 709            "EnableUserData": False,  # Skip user data for speed
 710        }
 711
 712        result = self.get(f"/Users/{self.user_id}/Items", params=params)
 713
 714        items = result.get("Items", [])
 715
 716        # Enrich items with album artist data if track artists are missing
 717        for item in items:
 718            if not item.get("Artists") and item.get("AlbumId"):
 719                try:
 720                    album_id = item["AlbumId"]
 721                    album = self.get(
 722                        f"/Users/{self.user_id}/Items/{album_id}",
 723                        params={"Fields": "Artists,AlbumArtists"},
 724                    )
 725                    # Use album artists as track artists
 726                    item["Artists"] = album.get("AlbumArtists", [])
 727                    item["Album"] = album.get("Name", "")
 728                except Exception:
 729                    pass  # Continue even if album fetch fails
 730
 731        return items
 732
 733    def get_playlists(self) -> List[Dict[str, Any]]:
 734        """
 735        Get all playlists for the user.
 736
 737        Returns:
 738            List of playlist items
 739        """
 740        params = {
 741            "IncludeItemTypes": "Playlist",
 742            "Recursive": "true",
 743        }
 744        result = self.get(f"/Users/{self.user_id}/Items", params=params)
 745        return result.get("Items", [])
 746
 747    def create_playlist(
 748        self, name: str, item_ids: List[str], is_public: bool = False
 749    ) -> Dict[str, Any]:
 750        """
 751        Create a new playlist in Jellyfin.
 752
 753        Args:
 754            name: Playlist name
 755            item_ids: List of Jellyfin item IDs to add
 756            is_public: Whether the playlist is public
 757
 758        Returns:
 759            Created playlist data
 760        """
 761        payload = {
 762            "Name": name,
 763            "Ids": item_ids,
 764            "UserId": self.user_id,
 765            "IsPublic": is_public,
 766        }
 767        return self.post("/Playlists", payload)
 768
 769    def add_to_playlist(
 770        self, playlist_id: str, item_ids: List[str]
 771    ) -> Dict[str, Any]:
 772        """
 773        Add items to an existing playlist.
 774
 775        Args:
 776            playlist_id: Jellyfin playlist ID
 777            item_ids: List of item IDs to add
 778
 779        Returns:
 780            Response data (empty dict if no content)
 781        """
 782        params = {"ids": ",".join(item_ids), "userId": self.user_id}
 783        url = f"{self.base_url}/Playlists/{playlist_id}/Items"
 784        try:
 785            response = requests.post(
 786                url, headers=self.headers, params=params, timeout=30
 787            )
 788            response.raise_for_status()
 789
 790            # Handle empty responses (204 No Content)
 791            if response.status_code == 204 or not response.text:
 792                return {}
 793
 794            return response.json()
 795        except requests.exceptions.RequestException as e:
 796            print(
 797                f"Error adding items to playlist: {e}",
 798                file=sys.stderr,
 799            )
 800            return {}
 801
 802    def get_playlist_items(self, playlist_id: str) -> List[str]:
 803        """
 804        Get all item IDs in a playlist.
 805
 806        Args:
 807            playlist_id: Jellyfin playlist ID
 808
 809        Returns:
 810            List of item IDs in the playlist
 811        """
 812        params = {
 813            "ParentId": playlist_id,
 814            "Fields": "Id",
 815        }
 816        result = self.get(f"/Users/{self.user_id}/Items", params=params)
 817        items = result.get("Items", [])
 818        return [item.get("Id") for item in items if item.get("Id")]
 819
 820    def get_playlist_items_full(
 821        self,
 822        playlist_id: str,
 823        fields: Optional[List[str]] = None,
 824    ) -> List[Dict[str, Any]]:
 825        """
 826        Get all items in a playlist with full metadata.
 827
 828        Args:
 829            playlist_id: Jellyfin playlist ID
 830            fields: Additional fields to include in response
 831                   Defaults to ["Path", "MediaSources"]
 832
 833        Returns:
 834            List of items with full metadata
 835        """
 836        if fields is None:
 837            fields = ["Path", "MediaSources"]
 838
 839        params = {
 840            "ParentId": playlist_id,
 841            "Fields": ",".join(fields),
 842        }
 843
 844        result = self.get(f"/Users/{self.user_id}/Items", params=params)
 845        return result.get("Items", [])
 846
 847    def remove_from_playlist(
 848        self, playlist_id: str, item_ids: List[str]
 849    ) -> bool:
 850        """
 851        Remove specific items from a playlist.
 852
 853        Args:
 854            playlist_id: Jellyfin playlist ID
 855            item_ids: List of item IDs to remove
 856
 857        Returns:
 858            True if successful
 859        """
 860        if not item_ids:
 861            return True  # Nothing to remove
 862
 863        try:
 864            url = (
 865                f"{self.base_url}/Playlists/{playlist_id}/Items"
 866                f"?EntryIds={','.join(item_ids)}"
 867            )
 868            response = requests.delete(url, headers=self.headers, timeout=30)
 869            response.raise_for_status()
 870            return True
 871        except requests.exceptions.RequestException as e:
 872            print(
 873                f"Error removing items from playlist: {e}",
 874                file=sys.stderr,
 875            )
 876            return False
 877
 878    def clear_playlist(self, playlist_id: str) -> bool:
 879        """
 880        Remove all items from a playlist.
 881
 882        Args:
 883            playlist_id: Jellyfin playlist ID
 884
 885        Returns:
 886            True if successful
 887        """
 888        # Get current items
 889        item_ids = self.get_playlist_items(playlist_id)
 890
 891        if not item_ids:
 892            return True  # Already empty
 893
 894        # Remove all items using the dedicated method
 895        return self.remove_from_playlist(playlist_id, item_ids)
 896
 897    def get_favorites(
 898        self,
 899        include_types: Optional[List[str]] = None,
 900        fields: Optional[List[str]] = None,
 901    ) -> List[Dict[str, Any]]:
 902        """
 903        Get all favorite items for the user.
 904
 905        Args:
 906            include_types: List of item types to include
 907                          (e.g., ["Movie", "Series"])
 908                          Defaults to ["Movie", "Series"]
 909            fields: Additional fields to include in response
 910                   Defaults to ["Path", "MediaSources"]
 911
 912        Returns:
 913            List of favorite items
 914        """
 915        if include_types is None:
 916            include_types = ["Movie", "Series"]
 917        if fields is None:
 918            fields = ["Path", "MediaSources"]
 919
 920        params = {
 921            "IsFavorite": "true",
 922            "Recursive": "true",
 923            "IncludeItemTypes": ",".join(include_types),
 924            "Fields": ",".join(fields),
 925        }
 926
 927        result = self.get(f"/Users/{self.user_id}/Items", params=params)
 928        return result.get("Items", [])
 929
 930    def get_series_episodes(
 931        self,
 932        series_id: str,
 933        fields: Optional[List[str]] = None,
 934    ) -> List[Dict[str, Any]]:
 935        """
 936        Get all episodes for a series.
 937
 938        Args:
 939            series_id: Jellyfin series ID
 940            fields: Additional fields to include in response
 941                   Defaults to ["Path", "MediaSources"]
 942
 943        Returns:
 944            List of episode items
 945        """
 946        if fields is None:
 947            fields = ["Path", "MediaSources"]
 948
 949        params = {"Fields": ",".join(fields)}
 950
 951        result = self.get(f"/Shows/{series_id}/Episodes", params=params)
 952        return result.get("Items", [])
 953
 954    def get_movies(
 955        self,
 956        fields: Optional[List[str]] = None,
 957        sort_by: str = "SortName",
 958    ) -> List[Dict[str, Any]]:
 959        """
 960        Get all movies in the library.
 961
 962        Args:
 963            fields: Additional fields to include in response
 964            sort_by: Sort field (default: SortName)
 965
 966        Returns:
 967            List of movie items
 968        """
 969        if fields is None:
 970            fields = ["ProductionYear", "CommunityRating", "Genres"]
 971
 972        params = {
 973            "IncludeItemTypes": "Movie",
 974            "Recursive": "true",
 975            "Fields": ",".join(fields),
 976            "SortBy": sort_by,
 977        }
 978
 979        result = self.get(f"/Users/{self.user_id}/Items", params=params)
 980        return result.get("Items", [])
 981
 982    def get_series(
 983        self,
 984        fields: Optional[List[str]] = None,
 985        sort_by: str = "SortName",
 986    ) -> List[Dict[str, Any]]:
 987        """
 988        Get all series in the library.
 989
 990        Args:
 991            fields: Additional fields to include in response
 992            sort_by: Sort field (default: SortName)
 993
 994        Returns:
 995            List of series items
 996        """
 997        if fields is None:
 998            fields = ["ProductionYear", "CommunityRating", "Genres"]
 999
1000        params = {
1001            "IncludeItemTypes": "Series",
1002            "Recursive": "true",
1003            "Fields": ",".join(fields),
1004            "SortBy": sort_by,
1005        }
1006
1007        result = self.get(f"/Users/{self.user_id}/Items", params=params)
1008        return result.get("Items", [])
1009
1010    def get_items_by_type(
1011        self,
1012        item_types: List[str],
1013        fields: Optional[List[str]] = None,
1014        sort_by: str = "SortName",
1015    ) -> List[Dict[str, Any]]:
1016        """
1017        Get items of specified types from the library.
1018
1019        Args:
1020            item_types: List of item types (e.g., ["Movie", "Series"])
1021            fields: Additional fields to include in response
1022            sort_by: Sort field (default: SortName)
1023
1024        Returns:
1025            List of items
1026        """
1027        if fields is None:
1028            fields = ["ProductionYear", "CommunityRating", "Genres"]
1029
1030        params = {
1031            "IncludeItemTypes": ",".join(item_types),
1032            "Recursive": "true",
1033            "Fields": ",".join(fields),
1034            "SortBy": sort_by,
1035        }
1036
1037        result = self.get(f"/Users/{self.user_id}/Items", params=params)
1038        return result.get("Items", [])
1039
1040
1041class SpotifyClient:
1042    """Client for Spotify API interactions using client credentials flow."""
1043
1044    def __init__(self, client_id: str, client_secret: str):
1045        """
1046        Initialize the Spotify API client with client credentials.
1047
1048        This uses the client credentials flow which can access public
1049        playlists but not private user data.
1050
1051        Args:
1052            client_id: Spotify application client ID
1053            client_secret: Spotify application client secret
1054        """
1055        try:
1056            import spotipy
1057            from spotipy.oauth2 import SpotifyClientCredentials
1058        except ImportError:
1059            print(
1060                "Error: spotipy library not found. Install it with:",
1061                file=sys.stderr,
1062            )
1063            print("  pip install spotipy", file=sys.stderr)
1064            sys.exit(1)
1065
1066        # Use client credentials flow (no OAuth required)
1067        auth_manager = SpotifyClientCredentials(
1068            client_id=client_id, client_secret=client_secret
1069        )
1070        self.sp = spotipy.Spotify(auth_manager=auth_manager)
1071
1072    def get_playlist_tracks(self, playlist_id: str) -> List[Dict[str, Any]]:
1073        """
1074        Fetch all tracks from a Spotify playlist.
1075
1076        Args:
1077            playlist_id: Spotify playlist ID or URI
1078
1079        Returns:
1080            List of track information dictionaries
1081        """
1082        tracks = []
1083        results = self.sp.playlist_tracks(playlist_id)
1084
1085        while results:
1086            for item in results.get("items", []):
1087                if item and item.get("track"):
1088                    track = item["track"]
1089                    album_obj = track.get("album", {})
1090                    tracks.append(
1091                        {
1092                            "name": track.get("name"),
1093                            "artists": [
1094                                {
1095                                    "name": artist.get("name"),
1096                                    "id": artist.get("id"),
1097                                }
1098                                for artist in track.get("artists", [])
1099                            ],
1100                            "album": {
1101                                "name": album_obj.get("name"),
1102                                "id": album_obj.get("id"),
1103                                "artists": [
1104                                    {
1105                                        "name": artist.get("name"),
1106                                        "id": artist.get("id"),
1107                                    }
1108                                    for artist in album_obj.get("artists", [])
1109                                ],
1110                            },
1111                        }
1112                    )
1113
1114            # Handle pagination
1115            if results.get("next"):
1116                results = self.sp.next(results)
1117            else:
1118                results = None
1119
1120        return tracks
1121
1122    def get_playlist_info(self, playlist_id: str) -> Dict[str, Any]:
1123        """
1124        Get information about a Spotify playlist.
1125
1126        Args:
1127            playlist_id: Spotify playlist ID or URI
1128
1129        Returns:
1130            Playlist information dictionary
1131        """
1132        playlist = self.sp.playlist(playlist_id)
1133        return {
1134            "name": playlist.get("name"),
1135            "description": playlist.get("description"),
1136            "owner": playlist.get("owner", {}).get("display_name"),
1137            "tracks_total": playlist.get("tracks", {}).get("total", 0),
1138        }
1139
1140    def get_user_playlists(self, username: str) -> List[Dict[str, Any]]:
1141        """
1142        Fetch all public playlists for a specific user.
1143
1144        Args:
1145            username: Spotify username (user ID)
1146
1147        Returns:
1148            List of playlist information dictionaries
1149        """
1150        playlists = []
1151        try:
1152            results = self.sp.user_playlists(username)
1153
1154            while results:
1155                for item in results.get("items", []):
1156                    if item:
1157                        playlists.append(
1158                            {
1159                                "id": item.get("id"),
1160                                "name": item.get("name"),
1161                                "owner": item.get("owner", {}).get(
1162                                    "display_name"
1163                                ),
1164                                "tracks_total": item.get("tracks", {}).get(
1165                                    "total", 0
1166                                ),
1167                                "public": item.get("public", False),
1168                            }
1169                        )
1170
1171                # Handle pagination
1172                if results.get("next"):
1173                    results = self.sp.next(results)
1174                else:
1175                    results = None
1176
1177        except Exception as e:
1178            print(
1179                f"Error fetching playlists for user '{username}': {e}",
1180                file=sys.stderr,
1181            )
1182            print(
1183                "Make sure the username is correct and the user has "
1184                "public playlists.",
1185                file=sys.stderr,
1186            )
1187
1188        return playlists