main
1#!/usr/bin/env python3
2"""
3Shared library for *arr (Sonarr, Radarr, Lidarr) automation scripts.
4
5Provides common functionality for API interaction, user confirmation,
6and output formatting across all *arr stack scripts.
7"""
8
9import subprocess
10import sys
11import time
12from typing import Any, Dict, List, Optional
13
14import requests
15
16
17class ArrClient:
18 """Base client for *arr API interactions."""
19
20 def __init__(self, base_url: str, api_key: str):
21 """
22 Initialize the *arr API client.
23
24 Args:
25 base_url: Base URL of the *arr service
26 (e.g., http://localhost:8989)
27 api_key: API key for authentication
28 """
29 self.base_url = base_url.rstrip("/")
30 self.api_key = api_key
31 self.headers = {"X-Api-Key": api_key}
32
33 def get(
34 self,
35 endpoint: str,
36 params: Optional[Dict[str, Any]] = None,
37 max_retries: int = 3,
38 retry_delay: float = 2.0,
39 ) -> List[Dict[str, Any]] | Dict[str, Any]:
40 """
41 Make a GET request to the *arr API with retry logic.
42
43 Args:
44 endpoint: API endpoint path (e.g., /api/v3/series)
45 params: Optional query parameters
46 max_retries: Maximum number of retry attempts
47 retry_delay: Initial delay between retries (seconds)
48
49 Returns:
50 JSON response data
51
52 Raises:
53 SystemExit: If the request fails after all retries
54 """
55 url = f"{self.base_url}{endpoint}"
56
57 for attempt in range(max_retries):
58 try:
59 response = requests.get(
60 url, headers=self.headers, params=params, timeout=30
61 )
62 response.raise_for_status()
63 return response.json()
64 except requests.exceptions.HTTPError as e:
65 status_code = e.response.status_code if e.response else None
66
67 # Retry on server errors (5xx) or rate limiting (429)
68 if status_code in [429, 500, 502, 503, 504]:
69 if attempt < max_retries - 1:
70 wait_time = retry_delay * (2**attempt)
71 print(
72 f" Server error ({status_code}), "
73 f"retrying in {wait_time}s... "
74 f"(attempt {attempt + 1}/{max_retries})"
75 )
76 time.sleep(wait_time)
77 continue
78
79 # Don't retry on client errors (4xx except 429)
80 print(
81 f"Error fetching from {endpoint}: HTTP {status_code}",
82 file=sys.stderr,
83 )
84 if params:
85 print(f" Params: {params}", file=sys.stderr)
86 if e.response:
87 try:
88 error_detail = e.response.json()
89 print(f" Detail: {error_detail}", file=sys.stderr)
90 except Exception:
91 print(
92 f" Response: {e.response.text[:200]}",
93 file=sys.stderr,
94 )
95 sys.exit(1)
96 except requests.exceptions.Timeout:
97 if attempt < max_retries - 1:
98 wait_time = retry_delay * (2**attempt)
99 print(
100 f" Request timeout, retrying in {wait_time}s... "
101 f"(attempt {attempt + 1}/{max_retries})"
102 )
103 time.sleep(wait_time)
104 continue
105 print(f"Error: Request timeout on {endpoint}", file=sys.stderr)
106 sys.exit(1)
107 except requests.exceptions.RequestException as e:
108 print(f"Error fetching from {endpoint}: {e}", file=sys.stderr)
109 if params:
110 print(f" Params: {params}", file=sys.stderr)
111 sys.exit(1)
112
113 # Should not reach here, but just in case
114 print(f"Error: Failed after {max_retries} attempts", file=sys.stderr)
115 sys.exit(1)
116
117 def post(
118 self,
119 endpoint: str,
120 payload: Dict[str, Any],
121 max_retries: int = 3,
122 retry_delay: float = 2.0,
123 ) -> Dict[str, Any]:
124 """
125 Make a POST request to the *arr API with retry logic.
126
127 Args:
128 endpoint: API endpoint path (e.g., /api/v3/command)
129 payload: JSON payload to send
130 max_retries: Maximum number of retry attempts
131 retry_delay: Initial delay between retries (seconds)
132
133 Returns:
134 JSON response data (empty dict on failure)
135 """
136 url = f"{self.base_url}{endpoint}"
137 headers = {**self.headers, "Content-Type": "application/json"}
138
139 for attempt in range(max_retries):
140 try:
141 response = requests.post(
142 url, headers=headers, json=payload, timeout=30
143 )
144 response.raise_for_status()
145 return response.json()
146 except requests.exceptions.HTTPError as e:
147 status_code = e.response.status_code if e.response else None
148
149 # Retry on server errors (5xx) or rate limiting (429)
150 if status_code in [429, 500, 502, 503, 504]:
151 if attempt < max_retries - 1:
152 wait_time = retry_delay * (2**attempt)
153 print(
154 f" Server error ({status_code}), "
155 f"retrying in {wait_time}s... "
156 f"(attempt {attempt + 1}/{max_retries})"
157 )
158 time.sleep(wait_time)
159 continue
160
161 # Better error reporting
162 if status_code:
163 print(
164 f"Error posting to {endpoint}: HTTP {status_code}",
165 file=sys.stderr,
166 )
167 else:
168 error_msg = (
169 f"Error posting to {endpoint}: "
170 f"{type(e).__name__} - {str(e)}"
171 )
172 print(error_msg, file=sys.stderr)
173
174 # Print payload for debugging
175 print(f" Payload: {payload}", file=sys.stderr)
176
177 # Always try to get response details
178 if e.response is not None:
179 print(
180 f" Response status: {e.response.status_code}",
181 file=sys.stderr,
182 )
183 print(
184 f" Response headers: {dict(e.response.headers)}",
185 file=sys.stderr,
186 )
187 try:
188 error_detail = e.response.json()
189 print(
190 f" Response JSON: {error_detail}", file=sys.stderr
191 )
192 except Exception:
193 print(
194 f" Response text: {e.response.text}",
195 file=sys.stderr,
196 )
197 else:
198 print(" No response object available", file=sys.stderr)
199 return {}
200 except requests.exceptions.Timeout:
201 if attempt < max_retries - 1:
202 wait_time = retry_delay * (2**attempt)
203 print(
204 f" Request timeout, retrying in {wait_time}s... "
205 f"(attempt {attempt + 1}/{max_retries})"
206 )
207 time.sleep(wait_time)
208 continue
209 print(f"Error: Request timeout on {endpoint}", file=sys.stderr)
210 return {}
211 except requests.exceptions.RequestException as e:
212 print(f"Error posting to {endpoint}: {e}", file=sys.stderr)
213 return {}
214
215 return {}
216
217 def put(
218 self,
219 endpoint: str,
220 payload: Dict[str, Any],
221 max_retries: int = 3,
222 retry_delay: float = 2.0,
223 ) -> Dict[str, Any]:
224 """
225 Make a PUT request to the *arr API with retry logic.
226
227 Args:
228 endpoint: API endpoint path (e.g., /api/v3/series)
229 payload: JSON payload to send
230 max_retries: Maximum number of retry attempts
231 retry_delay: Initial delay between retries (seconds)
232
233 Returns:
234 JSON response data (empty dict on failure)
235 """
236 url = f"{self.base_url}{endpoint}"
237 headers = {**self.headers, "Content-Type": "application/json"}
238
239 for attempt in range(max_retries):
240 try:
241 response = requests.put(
242 url, headers=headers, json=payload, timeout=30
243 )
244 response.raise_for_status()
245 return response.json()
246 except requests.exceptions.HTTPError as e:
247 status_code = e.response.status_code if e.response else None
248
249 # Retry on server errors (5xx) or rate limiting (429)
250 if status_code in [429, 500, 502, 503, 504]:
251 if attempt < max_retries - 1:
252 wait_time = retry_delay * (2**attempt)
253 print(
254 f" Server error ({status_code}), "
255 f"retrying in {wait_time}s... "
256 f"(attempt {attempt + 1}/{max_retries})"
257 )
258 time.sleep(wait_time)
259 continue
260
261 print(
262 f"Error putting to {endpoint}: HTTP {status_code}",
263 file=sys.stderr,
264 )
265 if e.response:
266 try:
267 error_detail = e.response.json()
268 print(f" Detail: {error_detail}", file=sys.stderr)
269 except Exception:
270 print(
271 f" Response: {e.response.text[:200]}",
272 file=sys.stderr,
273 )
274 return {}
275 except requests.exceptions.Timeout:
276 if attempt < max_retries - 1:
277 wait_time = retry_delay * (2**attempt)
278 print(
279 f" Request timeout, retrying in {wait_time}s... "
280 f"(attempt {attempt + 1}/{max_retries})"
281 )
282 time.sleep(wait_time)
283 continue
284 print(f"Error: Request timeout on {endpoint}", file=sys.stderr)
285 return {}
286 except requests.exceptions.RequestException as e:
287 print(f"Error putting to {endpoint}: {e}", file=sys.stderr)
288 return {}
289
290 return {}
291
292
293def ask_confirmation(prompt: str) -> bool:
294 """
295 Ask user for yes/no confirmation.
296
297 Args:
298 prompt: Question to ask the user
299
300 Returns:
301 True if user confirms (y/yes), False otherwise (n/no)
302 """
303 while True:
304 response = input(f"{prompt} (y/n): ").lower().strip()
305 if response in ["y", "yes"]:
306 return True
307 elif response in ["n", "no"]:
308 return False
309 else:
310 print("Please answer 'y' or 'n'")
311
312
313class CommandContext:
314 """Context object for command execution with common options."""
315
316 def __init__(self, dry_run: bool = False, no_confirm: bool = False):
317 """
318 Initialize command context.
319
320 Args:
321 dry_run: If True, show changes without applying them
322 no_confirm: If True, skip interactive confirmations
323 """
324 self.dry_run = dry_run
325 self.no_confirm = no_confirm
326
327
328def print_separator(char: str = "=", width: int = 80) -> None:
329 """Print a separator line."""
330 print(char * width)
331
332
333def print_section_header(title: str) -> None:
334 """Print a section header with separators."""
335 print("\n" + "=" * 80)
336 print(title)
337 print("=" * 80)
338
339
340def print_item_list(
341 items: List[str], prefix: str, max_display: int = 5
342) -> None:
343 """
344 Print a list of items with optional truncation.
345
346 Args:
347 items: List of item names to display
348 prefix: Prefix message to show before the list
349 max_display: Maximum number of items to show before truncating
350 """
351 if not items:
352 return
353
354 count = len(items)
355 print(f"\n{prefix} ({count} items):")
356 for item in items[:max_display]:
357 print(f" - {item}")
358 if len(items) > max_display:
359 remaining = len(items) - max_display
360 print(f" ... and {remaining} more")
361
362
363def get_confirmation_decision(ctx: CommandContext, prompt: str) -> bool:
364 """
365 Determine whether to proceed based on dry-run, no-confirm, or user input.
366
367 Args:
368 ctx: Command context with dry_run and no_confirm flags
369 prompt: Confirmation prompt to show user
370
371 Returns:
372 True if should proceed, False otherwise
373 """
374 if ctx.dry_run:
375 print("\n[DRY RUN] Skipping actual operation")
376 return False
377 elif ctx.no_confirm:
378 print("\n[NO CONFIRM] Proceeding with operation...")
379 return True
380 else:
381 return ask_confirmation(prompt)
382
383
384def print_final_summary(
385 total: int,
386 processed: int,
387 skipped: int,
388 operation: str,
389 queue_note: bool = True,
390) -> None:
391 """
392 Print final summary of operations.
393
394 Args:
395 total: Total items that needed processing
396 processed: Number of items successfully processed
397 skipped: Number of items skipped
398 operation: Name of the operation (e.g., "Renamed", "Retagged")
399 queue_note: Whether to show the queue check note
400 """
401 print_section_header("FINAL SUMMARY")
402 print(f"\nItems processed: {total}")
403 print(f" - {operation}: {processed}")
404 print(f" - Skipped: {skipped}")
405
406 if processed > 0 and queue_note:
407 print(
408 f"\nNote: {operation} operations are queued. "
409 "Check the service's queue for progress."
410 )
411
412
413def select_with_fzf(
414 items: List[Dict[str, str]],
415 display_format: str,
416 multi: bool = True,
417 enable_star_select: bool = False,
418) -> List[str]:
419 """
420 Use fzf to interactively select items.
421
422 Args:
423 items: List of dictionaries containing item data
424 display_format: Format string for displaying items (e.g.,
425 "{name} ({owner}, {tracks_total} tracks)")
426 multi: Allow multiple selection if True
427 enable_star_select: Add Ctrl-S keybinding to select all ★ items
428
429 Returns:
430 List of selected item IDs (empty list if cancelled)
431 """
432 if not items:
433 return []
434
435 # Create lookup table: display text -> item id
436 lookup = {}
437 lines = []
438 for item in items:
439 display = display_format.format(**item)
440 lines.append(display)
441 lookup[display] = item.get("id")
442
443 # Prepare fzf input
444 fzf_input = "\n".join(lines)
445
446 # Run fzf
447 fzf_args = ["fzf", "--ansi", "--prompt=Select items: "]
448 if multi:
449 fzf_args.append("--multi")
450
451 # Add keybinding to select all starred items
452 if enable_star_select:
453 fzf_args.extend([
454 "--bind", "ctrl-s:select-all+accept",
455 "--header", "TAB: select | ENTER: confirm | Ctrl-S: select all ★ items"
456 ])
457
458 try:
459 result = subprocess.run(
460 fzf_args,
461 input=fzf_input,
462 text=True,
463 capture_output=True,
464 check=True,
465 )
466 # Parse selected lines
467 selected_lines = result.stdout.strip().split("\n")
468 return [lookup[line] for line in selected_lines if line in lookup]
469 except subprocess.CalledProcessError:
470 # User cancelled or fzf not found
471 return []
472 except FileNotFoundError:
473 print("Error: fzf not found. Please install fzf:", file=sys.stderr)
474 print(" On NixOS: nix-env -iA nixpkgs.fzf", file=sys.stderr)
475 print(" On other systems: see https://github.com/junegunn/fzf")
476 sys.exit(1)
477
478
479class JellyfinClient:
480 """Client for Jellyfin API interactions."""
481
482 def __init__(
483 self, base_url: str, api_token: str, user_id: str, debug: bool = False
484 ):
485 """
486 Initialize the Jellyfin API client.
487
488 Args:
489 base_url: Base URL of the Jellyfin service
490 (e.g., http://localhost:8096)
491 api_token: API token for authentication
492 user_id: User ID or username for playlist ownership
493 debug: Enable debug output
494 """
495 self.base_url = base_url.rstrip("/")
496 self.api_token = api_token
497 self.debug = debug
498 self.headers = {
499 "Authorization": f'MediaBrowser Token="{api_token}"',
500 "Content-Type": "application/json",
501 }
502
503 # Resolve username to user ID if needed
504 self.user_id = self._resolve_user_id(user_id)
505
506 def _resolve_user_id(self, user_identifier: str) -> str:
507 """
508 Resolve a username or user ID to a proper user ID.
509
510 If the identifier looks like a GUID, use it as-is.
511 Otherwise, look up the user by username.
512
513 Args:
514 user_identifier: Username or user ID
515
516 Returns:
517 Resolved user ID (GUID)
518 """
519 # Check if it's already a GUID (basic check for 8-4-4-4-12 format)
520 if len(user_identifier) == 32 or (
521 len(user_identifier) == 36 and user_identifier.count("-") == 4
522 ):
523 return user_identifier
524
525 # Otherwise, look up by username
526 try:
527 response = self.get("/Users")
528 if isinstance(response, list):
529 for user in response:
530 user_name = user.get("Name", "").lower()
531 if user_name == user_identifier.lower():
532 return user.get("Id")
533 except Exception:
534 # If lookup fails, return the original identifier
535 # and let subsequent API calls fail with a clearer error
536 pass
537
538 # If not found, return original (might be unrecognized ID)
539 return user_identifier
540
541 def get(
542 self, endpoint: str, params: Optional[Dict[str, Any]] = None
543 ) -> List[Dict[str, Any]] | Dict[str, Any]:
544 """
545 Make a GET request to the Jellyfin API.
546
547 Args:
548 endpoint: API endpoint path (e.g., /Items)
549 params: Optional query parameters
550
551 Returns:
552 JSON response data
553
554 Raises:
555 SystemExit: If the request fails
556 """
557 url = f"{self.base_url}{endpoint}"
558 try:
559 response = requests.get(
560 url, headers=self.headers, params=params, timeout=30
561 )
562 response.raise_for_status()
563 return response.json()
564 except requests.exceptions.RequestException as e:
565 print(
566 f"Error fetching from Jellyfin {endpoint}: {e}",
567 file=sys.stderr,
568 )
569 sys.exit(1)
570
571 def post(self, endpoint: str, payload: Dict[str, Any]) -> Dict[str, Any]:
572 """
573 Make a POST request to the Jellyfin API.
574
575 Args:
576 endpoint: API endpoint path (e.g., /Playlists)
577 payload: JSON payload to send
578
579 Returns:
580 JSON response data
581
582 Raises:
583 SystemExit: If the request fails
584 """
585 url = f"{self.base_url}{endpoint}"
586 try:
587 response = requests.post(
588 url, headers=self.headers, json=payload, timeout=30
589 )
590 response.raise_for_status()
591 return response.json()
592 except requests.exceptions.RequestException as e:
593 print(
594 f"Error posting to Jellyfin {endpoint}: {e}",
595 file=sys.stderr,
596 )
597 if hasattr(e, "response") and e.response is not None:
598 try:
599 error_detail = e.response.json()
600 print(f" Detail: {error_detail}", file=sys.stderr)
601 except Exception:
602 print(
603 f" Response: {e.response.text[:200]}",
604 file=sys.stderr,
605 )
606 sys.exit(1)
607
608 def search_tracks(
609 self,
610 query: str,
611 limit: int = 50,
612 artist_name: str = None,
613 track_name: str = None,
614 ) -> List[Dict[str, Any]]:
615 """
616 Search for tracks in Jellyfin library.
617
618 Args:
619 query: Legacy search query string (for backward compatibility)
620 limit: Maximum number of results
621 artist_name: Artist name to search by (preferred)
622 track_name: Track name to search by
623
624 Returns:
625 List of track items
626 """
627 # Prefer searching by artist name when available
628 if artist_name:
629 # Find artist using NameStartsWith (more reliable)
630 artist_words = artist_name.split()
631 artist_first_word = (
632 artist_words[0] if artist_words else artist_name
633 )
634
635 artist_params = {
636 "NameStartsWith": artist_first_word,
637 "IncludeItemTypes": "MusicArtist",
638 "Limit": 50,
639 "Recursive": True,
640 }
641 artist_result = self.get(
642 f"/Users/{self.user_id}/Items", params=artist_params
643 )
644
645 artists = artist_result.get("Items", [])
646
647 if self.debug:
648 print(
649 f"DEBUG: Searching for artist starting with "
650 f"'{artist_first_word}' - Found {len(artists)} artists"
651 )
652 if artists:
653 for idx, artist in enumerate(artists[:5], 1):
654 print(f" {idx}. {artist.get('Name')}")
655
656 if artists:
657 # Find exact or best match
658 artist_name_lower = artist_name.lower()
659 matched_artist = None
660
661 for artist in artists:
662 if artist.get("Name", "").lower() == artist_name_lower:
663 matched_artist = artist
664 break
665
666 # If no exact match, use first result
667 if not matched_artist:
668 matched_artist = artists[0]
669
670 artist_id = matched_artist.get("Id")
671
672 if self.debug:
673 artist_name_str = matched_artist.get("Name")
674 print(
675 f"DEBUG: Using artist: {artist_name_str} "
676 f"(ID: {artist_id})"
677 )
678
679 track_params = {
680 "ArtistIds": artist_id,
681 "IncludeItemTypes": "Audio",
682 "Recursive": True,
683 "Limit": limit,
684 "Fields": (
685 "Artists,Album,AlbumArtist,AlbumArtists,ArtistItems"
686 ),
687 }
688 result = self.get(
689 f"/Users/{self.user_id}/Items", params=track_params
690 )
691 items = result.get("Items", [])
692
693 if self.debug:
694 print(f"DEBUG: Found {len(items)} tracks by this artist")
695
696 return items
697
698 # Fallback: search by track name
699 search_term = track_name or query
700 words = search_term.split()
701 first_word = words[0] if words else search_term
702
703 params = {
704 "NameStartsWith": first_word,
705 "IncludeItemTypes": "Audio",
706 "Recursive": True,
707 "Limit": 200, # Larger limit: filtering client-side
708 "Fields": ("Artists,Album,AlbumArtist,AlbumArtists,ArtistItems"),
709 "EnableUserData": False, # Skip user data for speed
710 }
711
712 result = self.get(f"/Users/{self.user_id}/Items", params=params)
713
714 items = result.get("Items", [])
715
716 # Enrich items with album artist data if track artists are missing
717 for item in items:
718 if not item.get("Artists") and item.get("AlbumId"):
719 try:
720 album_id = item["AlbumId"]
721 album = self.get(
722 f"/Users/{self.user_id}/Items/{album_id}",
723 params={"Fields": "Artists,AlbumArtists"},
724 )
725 # Use album artists as track artists
726 item["Artists"] = album.get("AlbumArtists", [])
727 item["Album"] = album.get("Name", "")
728 except Exception:
729 pass # Continue even if album fetch fails
730
731 return items
732
733 def get_playlists(self) -> List[Dict[str, Any]]:
734 """
735 Get all playlists for the user.
736
737 Returns:
738 List of playlist items
739 """
740 params = {
741 "IncludeItemTypes": "Playlist",
742 "Recursive": "true",
743 }
744 result = self.get(f"/Users/{self.user_id}/Items", params=params)
745 return result.get("Items", [])
746
747 def create_playlist(
748 self, name: str, item_ids: List[str], is_public: bool = False
749 ) -> Dict[str, Any]:
750 """
751 Create a new playlist in Jellyfin.
752
753 Args:
754 name: Playlist name
755 item_ids: List of Jellyfin item IDs to add
756 is_public: Whether the playlist is public
757
758 Returns:
759 Created playlist data
760 """
761 payload = {
762 "Name": name,
763 "Ids": item_ids,
764 "UserId": self.user_id,
765 "IsPublic": is_public,
766 }
767 return self.post("/Playlists", payload)
768
769 def add_to_playlist(
770 self, playlist_id: str, item_ids: List[str]
771 ) -> Dict[str, Any]:
772 """
773 Add items to an existing playlist.
774
775 Args:
776 playlist_id: Jellyfin playlist ID
777 item_ids: List of item IDs to add
778
779 Returns:
780 Response data (empty dict if no content)
781 """
782 params = {"ids": ",".join(item_ids), "userId": self.user_id}
783 url = f"{self.base_url}/Playlists/{playlist_id}/Items"
784 try:
785 response = requests.post(
786 url, headers=self.headers, params=params, timeout=30
787 )
788 response.raise_for_status()
789
790 # Handle empty responses (204 No Content)
791 if response.status_code == 204 or not response.text:
792 return {}
793
794 return response.json()
795 except requests.exceptions.RequestException as e:
796 print(
797 f"Error adding items to playlist: {e}",
798 file=sys.stderr,
799 )
800 return {}
801
802 def get_playlist_items(self, playlist_id: str) -> List[str]:
803 """
804 Get all item IDs in a playlist.
805
806 Args:
807 playlist_id: Jellyfin playlist ID
808
809 Returns:
810 List of item IDs in the playlist
811 """
812 params = {
813 "ParentId": playlist_id,
814 "Fields": "Id",
815 }
816 result = self.get(f"/Users/{self.user_id}/Items", params=params)
817 items = result.get("Items", [])
818 return [item.get("Id") for item in items if item.get("Id")]
819
820 def get_playlist_items_full(
821 self,
822 playlist_id: str,
823 fields: Optional[List[str]] = None,
824 ) -> List[Dict[str, Any]]:
825 """
826 Get all items in a playlist with full metadata.
827
828 Args:
829 playlist_id: Jellyfin playlist ID
830 fields: Additional fields to include in response
831 Defaults to ["Path", "MediaSources"]
832
833 Returns:
834 List of items with full metadata
835 """
836 if fields is None:
837 fields = ["Path", "MediaSources"]
838
839 params = {
840 "ParentId": playlist_id,
841 "Fields": ",".join(fields),
842 }
843
844 result = self.get(f"/Users/{self.user_id}/Items", params=params)
845 return result.get("Items", [])
846
847 def remove_from_playlist(
848 self, playlist_id: str, item_ids: List[str]
849 ) -> bool:
850 """
851 Remove specific items from a playlist.
852
853 Args:
854 playlist_id: Jellyfin playlist ID
855 item_ids: List of item IDs to remove
856
857 Returns:
858 True if successful
859 """
860 if not item_ids:
861 return True # Nothing to remove
862
863 try:
864 url = (
865 f"{self.base_url}/Playlists/{playlist_id}/Items"
866 f"?EntryIds={','.join(item_ids)}"
867 )
868 response = requests.delete(url, headers=self.headers, timeout=30)
869 response.raise_for_status()
870 return True
871 except requests.exceptions.RequestException as e:
872 print(
873 f"Error removing items from playlist: {e}",
874 file=sys.stderr,
875 )
876 return False
877
878 def clear_playlist(self, playlist_id: str) -> bool:
879 """
880 Remove all items from a playlist.
881
882 Args:
883 playlist_id: Jellyfin playlist ID
884
885 Returns:
886 True if successful
887 """
888 # Get current items
889 item_ids = self.get_playlist_items(playlist_id)
890
891 if not item_ids:
892 return True # Already empty
893
894 # Remove all items using the dedicated method
895 return self.remove_from_playlist(playlist_id, item_ids)
896
897 def get_favorites(
898 self,
899 include_types: Optional[List[str]] = None,
900 fields: Optional[List[str]] = None,
901 ) -> List[Dict[str, Any]]:
902 """
903 Get all favorite items for the user.
904
905 Args:
906 include_types: List of item types to include
907 (e.g., ["Movie", "Series"])
908 Defaults to ["Movie", "Series"]
909 fields: Additional fields to include in response
910 Defaults to ["Path", "MediaSources"]
911
912 Returns:
913 List of favorite items
914 """
915 if include_types is None:
916 include_types = ["Movie", "Series"]
917 if fields is None:
918 fields = ["Path", "MediaSources"]
919
920 params = {
921 "IsFavorite": "true",
922 "Recursive": "true",
923 "IncludeItemTypes": ",".join(include_types),
924 "Fields": ",".join(fields),
925 }
926
927 result = self.get(f"/Users/{self.user_id}/Items", params=params)
928 return result.get("Items", [])
929
930 def get_series_episodes(
931 self,
932 series_id: str,
933 fields: Optional[List[str]] = None,
934 ) -> List[Dict[str, Any]]:
935 """
936 Get all episodes for a series.
937
938 Args:
939 series_id: Jellyfin series ID
940 fields: Additional fields to include in response
941 Defaults to ["Path", "MediaSources"]
942
943 Returns:
944 List of episode items
945 """
946 if fields is None:
947 fields = ["Path", "MediaSources"]
948
949 params = {"Fields": ",".join(fields)}
950
951 result = self.get(f"/Shows/{series_id}/Episodes", params=params)
952 return result.get("Items", [])
953
954 def get_movies(
955 self,
956 fields: Optional[List[str]] = None,
957 sort_by: str = "SortName",
958 ) -> List[Dict[str, Any]]:
959 """
960 Get all movies in the library.
961
962 Args:
963 fields: Additional fields to include in response
964 sort_by: Sort field (default: SortName)
965
966 Returns:
967 List of movie items
968 """
969 if fields is None:
970 fields = ["ProductionYear", "CommunityRating", "Genres"]
971
972 params = {
973 "IncludeItemTypes": "Movie",
974 "Recursive": "true",
975 "Fields": ",".join(fields),
976 "SortBy": sort_by,
977 }
978
979 result = self.get(f"/Users/{self.user_id}/Items", params=params)
980 return result.get("Items", [])
981
982 def get_series(
983 self,
984 fields: Optional[List[str]] = None,
985 sort_by: str = "SortName",
986 ) -> List[Dict[str, Any]]:
987 """
988 Get all series in the library.
989
990 Args:
991 fields: Additional fields to include in response
992 sort_by: Sort field (default: SortName)
993
994 Returns:
995 List of series items
996 """
997 if fields is None:
998 fields = ["ProductionYear", "CommunityRating", "Genres"]
999
1000 params = {
1001 "IncludeItemTypes": "Series",
1002 "Recursive": "true",
1003 "Fields": ",".join(fields),
1004 "SortBy": sort_by,
1005 }
1006
1007 result = self.get(f"/Users/{self.user_id}/Items", params=params)
1008 return result.get("Items", [])
1009
1010 def get_items_by_type(
1011 self,
1012 item_types: List[str],
1013 fields: Optional[List[str]] = None,
1014 sort_by: str = "SortName",
1015 ) -> List[Dict[str, Any]]:
1016 """
1017 Get items of specified types from the library.
1018
1019 Args:
1020 item_types: List of item types (e.g., ["Movie", "Series"])
1021 fields: Additional fields to include in response
1022 sort_by: Sort field (default: SortName)
1023
1024 Returns:
1025 List of items
1026 """
1027 if fields is None:
1028 fields = ["ProductionYear", "CommunityRating", "Genres"]
1029
1030 params = {
1031 "IncludeItemTypes": ",".join(item_types),
1032 "Recursive": "true",
1033 "Fields": ",".join(fields),
1034 "SortBy": sort_by,
1035 }
1036
1037 result = self.get(f"/Users/{self.user_id}/Items", params=params)
1038 return result.get("Items", [])
1039
1040
1041class SpotifyClient:
1042 """Client for Spotify API interactions using client credentials flow."""
1043
1044 def __init__(self, client_id: str, client_secret: str):
1045 """
1046 Initialize the Spotify API client with client credentials.
1047
1048 This uses the client credentials flow which can access public
1049 playlists but not private user data.
1050
1051 Args:
1052 client_id: Spotify application client ID
1053 client_secret: Spotify application client secret
1054 """
1055 try:
1056 import spotipy
1057 from spotipy.oauth2 import SpotifyClientCredentials
1058 except ImportError:
1059 print(
1060 "Error: spotipy library not found. Install it with:",
1061 file=sys.stderr,
1062 )
1063 print(" pip install spotipy", file=sys.stderr)
1064 sys.exit(1)
1065
1066 # Use client credentials flow (no OAuth required)
1067 auth_manager = SpotifyClientCredentials(
1068 client_id=client_id, client_secret=client_secret
1069 )
1070 self.sp = spotipy.Spotify(auth_manager=auth_manager)
1071
1072 def get_playlist_tracks(self, playlist_id: str) -> List[Dict[str, Any]]:
1073 """
1074 Fetch all tracks from a Spotify playlist.
1075
1076 Args:
1077 playlist_id: Spotify playlist ID or URI
1078
1079 Returns:
1080 List of track information dictionaries
1081 """
1082 tracks = []
1083 results = self.sp.playlist_tracks(playlist_id)
1084
1085 while results:
1086 for item in results.get("items", []):
1087 if item and item.get("track"):
1088 track = item["track"]
1089 album_obj = track.get("album", {})
1090 tracks.append(
1091 {
1092 "name": track.get("name"),
1093 "artists": [
1094 {
1095 "name": artist.get("name"),
1096 "id": artist.get("id"),
1097 }
1098 for artist in track.get("artists", [])
1099 ],
1100 "album": {
1101 "name": album_obj.get("name"),
1102 "id": album_obj.get("id"),
1103 "artists": [
1104 {
1105 "name": artist.get("name"),
1106 "id": artist.get("id"),
1107 }
1108 for artist in album_obj.get("artists", [])
1109 ],
1110 },
1111 }
1112 )
1113
1114 # Handle pagination
1115 if results.get("next"):
1116 results = self.sp.next(results)
1117 else:
1118 results = None
1119
1120 return tracks
1121
1122 def get_playlist_info(self, playlist_id: str) -> Dict[str, Any]:
1123 """
1124 Get information about a Spotify playlist.
1125
1126 Args:
1127 playlist_id: Spotify playlist ID or URI
1128
1129 Returns:
1130 Playlist information dictionary
1131 """
1132 playlist = self.sp.playlist(playlist_id)
1133 return {
1134 "name": playlist.get("name"),
1135 "description": playlist.get("description"),
1136 "owner": playlist.get("owner", {}).get("display_name"),
1137 "tracks_total": playlist.get("tracks", {}).get("total", 0),
1138 }
1139
1140 def get_user_playlists(self, username: str) -> List[Dict[str, Any]]:
1141 """
1142 Fetch all public playlists for a specific user.
1143
1144 Args:
1145 username: Spotify username (user ID)
1146
1147 Returns:
1148 List of playlist information dictionaries
1149 """
1150 playlists = []
1151 try:
1152 results = self.sp.user_playlists(username)
1153
1154 while results:
1155 for item in results.get("items", []):
1156 if item:
1157 playlists.append(
1158 {
1159 "id": item.get("id"),
1160 "name": item.get("name"),
1161 "owner": item.get("owner", {}).get(
1162 "display_name"
1163 ),
1164 "tracks_total": item.get("tracks", {}).get(
1165 "total", 0
1166 ),
1167 "public": item.get("public", False),
1168 }
1169 )
1170
1171 # Handle pagination
1172 if results.get("next"):
1173 results = self.sp.next(results)
1174 else:
1175 results = None
1176
1177 except Exception as e:
1178 print(
1179 f"Error fetching playlists for user '{username}': {e}",
1180 file=sys.stderr,
1181 )
1182 print(
1183 "Make sure the username is correct and the user has "
1184 "public playlists.",
1185 file=sys.stderr,
1186 )
1187
1188 return playlists