nftable-migration
1#!/usr/bin/env python3
2"""
3Shared library for *arr (Sonarr, Radarr, Lidarr) automation scripts.
4
5Provides common functionality for API interaction, user confirmation,
6and output formatting across all *arr stack scripts.
7"""
8
9import subprocess
10import sys
11import time
12from typing import Any, Dict, List, Optional
13
14import requests
15
16
17class ArrClient:
18 """Base client for *arr API interactions."""
19
20 def __init__(self, base_url: str, api_key: str):
21 """
22 Initialize the *arr API client.
23
24 Args:
25 base_url: Base URL of the *arr service
26 (e.g., http://localhost:8989)
27 api_key: API key for authentication
28 """
29 self.base_url = base_url.rstrip("/")
30 self.api_key = api_key
31 self.headers = {"X-Api-Key": api_key}
32
33 def get(
34 self,
35 endpoint: str,
36 params: Optional[Dict[str, Any]] = None,
37 max_retries: int = 3,
38 retry_delay: float = 2.0,
39 ) -> List[Dict[str, Any]] | Dict[str, Any]:
40 """
41 Make a GET request to the *arr API with retry logic.
42
43 Args:
44 endpoint: API endpoint path (e.g., /api/v3/series)
45 params: Optional query parameters
46 max_retries: Maximum number of retry attempts
47 retry_delay: Initial delay between retries (seconds)
48
49 Returns:
50 JSON response data
51
52 Raises:
53 SystemExit: If the request fails after all retries
54 """
55 url = f"{self.base_url}{endpoint}"
56
57 for attempt in range(max_retries):
58 try:
59 response = requests.get(
60 url, headers=self.headers, params=params, timeout=30
61 )
62 response.raise_for_status()
63 return response.json()
64 except requests.exceptions.HTTPError as e:
65 status_code = e.response.status_code if e.response else None
66
67 # Retry on server errors (5xx) or rate limiting (429)
68 if status_code in [429, 500, 502, 503, 504]:
69 if attempt < max_retries - 1:
70 wait_time = retry_delay * (2**attempt)
71 print(
72 f" Server error ({status_code}), "
73 f"retrying in {wait_time}s... "
74 f"(attempt {attempt + 1}/{max_retries})"
75 )
76 time.sleep(wait_time)
77 continue
78
79 # Don't retry on client errors (4xx except 429)
80 print(
81 f"Error fetching from {endpoint}: HTTP {status_code}",
82 file=sys.stderr,
83 )
84 if params:
85 print(f" Params: {params}", file=sys.stderr)
86 if e.response:
87 try:
88 error_detail = e.response.json()
89 print(f" Detail: {error_detail}", file=sys.stderr)
90 except Exception:
91 print(
92 f" Response: {e.response.text[:200]}",
93 file=sys.stderr,
94 )
95 sys.exit(1)
96 except requests.exceptions.Timeout:
97 if attempt < max_retries - 1:
98 wait_time = retry_delay * (2**attempt)
99 print(
100 f" Request timeout, retrying in {wait_time}s... "
101 f"(attempt {attempt + 1}/{max_retries})"
102 )
103 time.sleep(wait_time)
104 continue
105 print(f"Error: Request timeout on {endpoint}", file=sys.stderr)
106 sys.exit(1)
107 except requests.exceptions.RequestException as e:
108 print(f"Error fetching from {endpoint}: {e}", file=sys.stderr)
109 if params:
110 print(f" Params: {params}", file=sys.stderr)
111 sys.exit(1)
112
113 # Should not reach here, but just in case
114 print(
115 f"Error: Failed after {max_retries} attempts", file=sys.stderr
116 )
117 sys.exit(1)
118
119 def post(
120 self,
121 endpoint: str,
122 payload: Dict[str, Any],
123 max_retries: int = 3,
124 retry_delay: float = 2.0,
125 ) -> Dict[str, Any]:
126 """
127 Make a POST request to the *arr API with retry logic.
128
129 Args:
130 endpoint: API endpoint path (e.g., /api/v3/command)
131 payload: JSON payload to send
132 max_retries: Maximum number of retry attempts
133 retry_delay: Initial delay between retries (seconds)
134
135 Returns:
136 JSON response data (empty dict on failure)
137 """
138 url = f"{self.base_url}{endpoint}"
139 headers = {**self.headers, "Content-Type": "application/json"}
140
141 for attempt in range(max_retries):
142 try:
143 response = requests.post(
144 url, headers=headers, json=payload, timeout=30
145 )
146 response.raise_for_status()
147 return response.json()
148 except requests.exceptions.HTTPError as e:
149 status_code = e.response.status_code if e.response else None
150
151 # Retry on server errors (5xx) or rate limiting (429)
152 if status_code in [429, 500, 502, 503, 504]:
153 if attempt < max_retries - 1:
154 wait_time = retry_delay * (2**attempt)
155 print(
156 f" Server error ({status_code}), "
157 f"retrying in {wait_time}s... "
158 f"(attempt {attempt + 1}/{max_retries})"
159 )
160 time.sleep(wait_time)
161 continue
162
163 print(
164 f"Error posting to {endpoint}: HTTP {status_code}",
165 file=sys.stderr,
166 )
167 if e.response:
168 try:
169 error_detail = e.response.json()
170 print(f" Detail: {error_detail}", file=sys.stderr)
171 except Exception:
172 print(
173 f" Response: {e.response.text[:200]}",
174 file=sys.stderr,
175 )
176 return {}
177 except requests.exceptions.Timeout:
178 if attempt < max_retries - 1:
179 wait_time = retry_delay * (2**attempt)
180 print(
181 f" Request timeout, retrying in {wait_time}s... "
182 f"(attempt {attempt + 1}/{max_retries})"
183 )
184 time.sleep(wait_time)
185 continue
186 print(f"Error: Request timeout on {endpoint}", file=sys.stderr)
187 return {}
188 except requests.exceptions.RequestException as e:
189 print(f"Error posting to {endpoint}: {e}", file=sys.stderr)
190 return {}
191
192 return {}
193
194
195def ask_confirmation(prompt: str) -> bool:
196 """
197 Ask user for yes/no confirmation.
198
199 Args:
200 prompt: Question to ask the user
201
202 Returns:
203 True if user confirms (y/yes), False otherwise (n/no)
204 """
205 while True:
206 response = input(f"{prompt} (y/n): ").lower().strip()
207 if response in ["y", "yes"]:
208 return True
209 elif response in ["n", "no"]:
210 return False
211 else:
212 print("Please answer 'y' or 'n'")
213
214
215class CommandContext:
216 """Context object for command execution with common options."""
217
218 def __init__(self, dry_run: bool = False, no_confirm: bool = False):
219 """
220 Initialize command context.
221
222 Args:
223 dry_run: If True, show changes without applying them
224 no_confirm: If True, skip interactive confirmations
225 """
226 self.dry_run = dry_run
227 self.no_confirm = no_confirm
228
229
230def print_separator(char: str = "=", width: int = 80) -> None:
231 """Print a separator line."""
232 print(char * width)
233
234
235def print_section_header(title: str) -> None:
236 """Print a section header with separators."""
237 print("\n" + "=" * 80)
238 print(title)
239 print("=" * 80)
240
241
242def print_item_list(
243 items: List[str], prefix: str, max_display: int = 5
244) -> None:
245 """
246 Print a list of items with optional truncation.
247
248 Args:
249 items: List of item names to display
250 prefix: Prefix message to show before the list
251 max_display: Maximum number of items to show before truncating
252 """
253 if not items:
254 return
255
256 count = len(items)
257 print(f"\n{prefix} ({count} items):")
258 for item in items[:max_display]:
259 print(f" - {item}")
260 if len(items) > max_display:
261 remaining = len(items) - max_display
262 print(f" ... and {remaining} more")
263
264
265def get_confirmation_decision(
266 ctx: CommandContext, prompt: str
267) -> bool:
268 """
269 Determine whether to proceed based on dry-run, no-confirm, or user input.
270
271 Args:
272 ctx: Command context with dry_run and no_confirm flags
273 prompt: Confirmation prompt to show user
274
275 Returns:
276 True if should proceed, False otherwise
277 """
278 if ctx.dry_run:
279 print("\n[DRY RUN] Skipping actual operation")
280 return False
281 elif ctx.no_confirm:
282 print("\n[NO CONFIRM] Proceeding with operation...")
283 return True
284 else:
285 return ask_confirmation(prompt)
286
287
288def print_final_summary(
289 total: int,
290 processed: int,
291 skipped: int,
292 operation: str,
293 queue_note: bool = True,
294) -> None:
295 """
296 Print final summary of operations.
297
298 Args:
299 total: Total items that needed processing
300 processed: Number of items successfully processed
301 skipped: Number of items skipped
302 operation: Name of the operation (e.g., "Renamed", "Retagged")
303 queue_note: Whether to show the queue check note
304 """
305 print_section_header("FINAL SUMMARY")
306 print(f"\nItems processed: {total}")
307 print(f" - {operation}: {processed}")
308 print(f" - Skipped: {skipped}")
309
310 if processed > 0 and queue_note:
311 print(
312 f"\nNote: {operation} operations are queued. "
313 "Check the service's queue for progress."
314 )
315
316
317def select_with_fzf(
318 items: List[Dict[str, str]], display_format: str, multi: bool = True
319) -> List[str]:
320 """
321 Use fzf to interactively select items.
322
323 Args:
324 items: List of dictionaries containing item data
325 display_format: Format string for displaying items (e.g.,
326 "{name} ({owner}, {tracks_total} tracks)")
327 multi: Allow multiple selection if True
328
329 Returns:
330 List of selected item IDs (empty list if cancelled)
331 """
332 if not items:
333 return []
334
335 # Create lookup table: display text -> item id
336 lookup = {}
337 lines = []
338 for item in items:
339 display = display_format.format(**item)
340 lines.append(display)
341 lookup[display] = item.get("id")
342
343 # Prepare fzf input
344 fzf_input = "\n".join(lines)
345
346 # Run fzf
347 fzf_args = ["fzf", "--ansi", "--prompt=Select playlists: "]
348 if multi:
349 fzf_args.append("--multi")
350
351 try:
352 result = subprocess.run(
353 fzf_args,
354 input=fzf_input,
355 text=True,
356 capture_output=True,
357 check=True,
358 )
359 # Parse selected lines
360 selected_lines = result.stdout.strip().split("\n")
361 return [lookup[line] for line in selected_lines if line in lookup]
362 except subprocess.CalledProcessError:
363 # User cancelled or fzf not found
364 return []
365 except FileNotFoundError:
366 print(
367 "Error: fzf not found. Please install fzf:", file=sys.stderr
368 )
369 print(
370 " On NixOS: nix-env -iA nixpkgs.fzf", file=sys.stderr
371 )
372 print(" On other systems: see https://github.com/junegunn/fzf")
373 sys.exit(1)
374
375
376class SpotifyClient:
377 """Client for Spotify API interactions using client credentials flow."""
378
379 def __init__(self, client_id: str, client_secret: str):
380 """
381 Initialize the Spotify API client with client credentials.
382
383 This uses the client credentials flow which can access public
384 playlists but not private user data.
385
386 Args:
387 client_id: Spotify application client ID
388 client_secret: Spotify application client secret
389 """
390 try:
391 import spotipy
392 from spotipy.oauth2 import SpotifyClientCredentials
393 except ImportError:
394 print(
395 "Error: spotipy library not found. Install it with:",
396 file=sys.stderr,
397 )
398 print(" pip install spotipy", file=sys.stderr)
399 sys.exit(1)
400
401 # Use client credentials flow (no OAuth required)
402 auth_manager = SpotifyClientCredentials(
403 client_id=client_id, client_secret=client_secret
404 )
405 self.sp = spotipy.Spotify(auth_manager=auth_manager)
406
407 def get_playlist_tracks(self, playlist_id: str) -> List[Dict[str, Any]]:
408 """
409 Fetch all tracks from a Spotify playlist.
410
411 Args:
412 playlist_id: Spotify playlist ID or URI
413
414 Returns:
415 List of track information dictionaries
416 """
417 tracks = []
418 results = self.sp.playlist_tracks(playlist_id)
419
420 while results:
421 for item in results.get("items", []):
422 if item and item.get("track"):
423 track = item["track"]
424 tracks.append(
425 {
426 "name": track.get("name"),
427 "artists": [
428 {
429 "name": artist.get("name"),
430 "id": artist.get("id"),
431 }
432 for artist in track.get("artists", [])
433 ],
434 "album": track.get("album", {}).get("name"),
435 "album_id": track.get("album", {}).get("id"),
436 }
437 )
438
439 # Handle pagination
440 if results.get("next"):
441 results = self.sp.next(results)
442 else:
443 results = None
444
445 return tracks
446
447 def get_playlist_info(self, playlist_id: str) -> Dict[str, Any]:
448 """
449 Get information about a Spotify playlist.
450
451 Args:
452 playlist_id: Spotify playlist ID or URI
453
454 Returns:
455 Playlist information dictionary
456 """
457 playlist = self.sp.playlist(playlist_id)
458 return {
459 "name": playlist.get("name"),
460 "description": playlist.get("description"),
461 "owner": playlist.get("owner", {}).get("display_name"),
462 "tracks_total": playlist.get("tracks", {}).get("total", 0),
463 }
464
465 def get_user_playlists(self, username: str) -> List[Dict[str, Any]]:
466 """
467 Fetch all public playlists for a specific user.
468
469 Args:
470 username: Spotify username (user ID)
471
472 Returns:
473 List of playlist information dictionaries
474 """
475 playlists = []
476 try:
477 results = self.sp.user_playlists(username)
478
479 while results:
480 for item in results.get("items", []):
481 if item:
482 playlists.append(
483 {
484 "id": item.get("id"),
485 "name": item.get("name"),
486 "owner": item.get("owner", {}).get(
487 "display_name"
488 ),
489 "tracks_total": item.get("tracks", {}).get(
490 "total", 0
491 ),
492 "public": item.get("public", False),
493 }
494 )
495
496 # Handle pagination
497 if results.get("next"):
498 results = self.sp.next(results)
499 else:
500 results = None
501
502 except Exception as e:
503 print(
504 f"Error fetching playlists for user '{username}': {e}",
505 file=sys.stderr,
506 )
507 print(
508 "Make sure the username is correct and the user has "
509 "public playlists.",
510 file=sys.stderr,
511 )
512
513 return playlists