Commit 3aecf1c7f9f8
Changed files (2)
tools
arr
commands
tools/arr/commands/lidarr_sync_spotify.py
@@ -50,14 +50,14 @@ def strip_duplicate_path_segments(path: str) -> tuple[str, bool, str]:
Returns:
Tuple of (cleaned_path, was_modified, duplicate_segment)
"""
- parts = [p for p in path.split('/') if p] # Split and remove empty parts
+ parts = [p for p in path.split("/") if p] # Split and remove empty parts
# Check for any duplicate consecutive segments
for i in range(len(parts) - 1):
if parts[i] == parts[i + 1]:
# Found duplicate - remove it
- cleaned_parts = parts[:i + 1] + parts[i + 2:]
- cleaned = '/' + '/'.join(cleaned_parts)
+ cleaned_parts = parts[: i + 1] + parts[i + 2 :]
+ cleaned = "/" + "/".join(cleaned_parts)
return (cleaned, True, parts[i])
return (path, False, "")
@@ -89,23 +89,31 @@ def get_root_folder_path(client: ArrClient, preferred_path: str = None) -> str:
print("Available root folders:")
for idx, folder in enumerate(folders):
marker = " (using)" if idx == 0 else ""
- print(f" [{idx+1}] {folder.get('path')}{marker}")
+ print(f" [{idx + 1}] {folder.get('path')}{marker}")
selected_path = folders[0].get("path")
# Check for duplicate path segments and fix if found
- cleaned_path, was_modified, duplicate_seg = strip_duplicate_path_segments(selected_path)
+ cleaned_path, was_modified, duplicate_seg = strip_duplicate_path_segments(
+ selected_path
+ )
if was_modified:
- print(f"\nWARNING: Root folder has duplicate path segment '{duplicate_seg}'!")
+ print(
+ f"\nWARNING: Root folder has duplicate path segment '{duplicate_seg}'!"
+ )
print(f" Original: {selected_path}")
print(f" Using: {cleaned_path}")
- print(" Consider fixing the root folder configuration in Lidarr settings.\n")
+ print(
+ " Consider fixing the root folder configuration in Lidarr settings.\n"
+ )
return cleaned_path
return selected_path
-def search_musicbrainz_artist(artist_name: str, debug: bool = False) -> Dict[str, Any] | None:
+def search_musicbrainz_artist(
+ artist_name: str, debug: bool = False
+) -> Dict[str, Any] | None:
"""
Search MusicBrainz directly for an artist.
@@ -113,6 +121,7 @@ def search_musicbrainz_artist(artist_name: str, debug: bool = False) -> Dict[str
Note: MusicBrainz rate limit is 1 request/second, enforced with sleep.
"""
+
def query_mb(search_term: str, require_exact: bool = False):
"""Helper to query MusicBrainz with a specific search term."""
url = "https://musicbrainz.org/ws/2/artist"
@@ -126,14 +135,30 @@ def search_musicbrainz_artist(artist_name: str, debug: bool = False) -> Dict[str
"limit": 5,
}
+ if debug:
+ print(f" DEBUG: MusicBrainz query: {params['query']}")
+
# MusicBrainz rate limit: 1 request per second
time.sleep(1.0)
- response = requests.get(url, headers=headers, params=params, timeout=10)
+ response = requests.get(
+ url, headers=headers, params=params, timeout=10
+ )
response.raise_for_status()
data = response.json()
artists = data.get("artists", [])
+
+ if debug:
+ print(f" DEBUG: MusicBrainz returned {len(artists)} results")
+ if artists:
+ for idx, artist in enumerate(artists[:3]): # Show first 3
+ print(
+ f" [{idx + 1}] {artist.get('name', 'N/A')} (ID: {artist.get('id', 'N/A')})"
+ )
+ if len(artists) > 3:
+ print(f" ... and {len(artists) - 3} more")
+
if not artists:
return None
@@ -144,35 +169,51 @@ def search_musicbrainz_artist(artist_name: str, debug: bool = False) -> Dict[str
# For exact match requirement, compare case-insensitive but preserve accents
if require_exact:
if mb_name.lower() == search_term.lower():
+ if debug:
+ print(f" DEBUG: Exact match found: '{mb_name}'")
return artist
else:
# For normalized match, use our matching function
if artist_name_matches(mb_name, artist_name):
+ if debug:
+ print(f" DEBUG: Normalized match found: '{mb_name}'")
return artist
+ if debug:
+ print(
+ f" DEBUG: No matching artist in results (require_exact={require_exact})"
+ )
return None
try:
# First try: exact match with original name (preserves accents)
if debug:
- print(f" DEBUG: Querying MusicBrainz for exact match: '{artist_name}'")
+ print(
+ f" DEBUG: Querying MusicBrainz for exact match: '{artist_name}'"
+ )
result = query_mb(artist_name, require_exact=True)
if result:
mb_name = result.get("name", "")
if debug:
- print(f" DEBUG: Found exact match in MusicBrainz: '{mb_name}'")
+ print(
+ f" DEBUG: Found exact match in MusicBrainz: '{mb_name}'"
+ )
else:
# Second try: normalized version (without accents)
normalized = normalize_artist_name(artist_name)
if normalized != artist_name:
if debug:
- print(f" DEBUG: No exact match, trying normalized: '{normalized}'")
+ print(
+ f" DEBUG: No exact match, trying normalized: '{normalized}'"
+ )
result = query_mb(normalized, require_exact=False)
if result:
mb_name = result.get("name", "")
if debug:
- print(f" DEBUG: Found normalized match in MusicBrainz: '{mb_name}'")
+ print(
+ f" DEBUG: Found normalized match in MusicBrainz: '{mb_name}'"
+ )
if not result:
if debug:
@@ -226,24 +267,38 @@ def normalize_artist_name(name: str) -> str:
# Normalize different types of hyphens/dashes to regular hyphen
# U+2010 (HYPHEN), U+2011 (NON-BREAKING HYPHEN), U+2012 (FIGURE DASH),
# U+2013 (EN DASH), U+2014 (EM DASH), U+2015 (HORIZONTAL BAR)
- hyphen_chars = ['\u2010', '\u2011', '\u2012', '\u2013', '\u2014', '\u2015']
+ hyphen_chars = ["\u2010", "\u2011", "\u2012", "\u2013", "\u2014", "\u2015"]
for hyphen in hyphen_chars:
- normalized = normalized.replace(hyphen, '-')
+ normalized = normalized.replace(hyphen, "-")
# Normalize different types of apostrophes/quotes to regular apostrophe
# U+2019 (RIGHT SINGLE QUOTATION MARK), U+02BC (MODIFIER LETTER APOSTROPHE)
# U+2018 (LEFT SINGLE QUOTATION MARK), U+201B (SINGLE HIGH-REVERSED-9 QUOTATION MARK)
- apostrophe_chars = ['\u2019', '\u02BC', '\u2018', '\u201B']
+ apostrophe_chars = ["\u2019", "\u02bc", "\u2018", "\u201b"]
for apostrophe in apostrophe_chars:
normalized = normalized.replace(apostrophe, "'")
+ # Handle ligatures before NFD normalization
+ ligatures = {
+ "œ": "oe",
+ "Œ": "OE",
+ "æ": "ae",
+ "Æ": "AE",
+ "fi": "fi",
+ "fl": "fl",
+ }
+ for ligature, replacement in ligatures.items():
+ normalized = normalized.replace(ligature, replacement)
+
# Remove unicode accents
- normalized = unicodedata.normalize('NFD', normalized)
- normalized = ''.join(char for char in normalized if unicodedata.category(char) != 'Mn')
+ normalized = unicodedata.normalize("NFD", normalized)
+ normalized = "".join(
+ char for char in normalized if unicodedata.category(char) != "Mn"
+ )
# Common replacements
- normalized = normalized.replace('&', 'and')
- normalized = normalized.replace('/', ' ')
+ normalized = normalized.replace("&", "and")
+ normalized = normalized.replace("/", " ")
return normalized
@@ -299,9 +354,13 @@ def filter_search_results(
if artist_name_matches(result_artist_name, artist_name):
filtered.append(result)
if debug:
- print(f" DEBUG: Matched '{result_artist_name}' to '{artist_name}'")
+ print(
+ f" DEBUG: Matched '{result_artist_name}' to '{artist_name}'"
+ )
elif debug:
- print(f" DEBUG: Rejected '{result_artist_name}' (doesn't match '{artist_name}')")
+ print(
+ f" DEBUG: Rejected '{result_artist_name}' (doesn't match '{artist_name}')"
+ )
return filtered
@@ -394,7 +453,7 @@ def monitor_artist_albums(
Returns:
Tuple of (matched_count, monitored_count)
"""
- # Get artist with albums
+ # Get artist info
artist = client.get(f"/api/v1/artist/{artist_id}")
if not artist:
if debug:
@@ -402,14 +461,20 @@ def monitor_artist_albums(
return (0, 0)
artist_name = artist.get("artistName", "Unknown")
- albums = artist.get("albums", [])
+
+ # Fetch albums separately using the album endpoint
+ albums = client.get("/api/v1/album", params={"artistId": artist_id})
+ if not isinstance(albums, list):
+ albums = []
# Debug: show artist stats
if debug:
monitored = artist.get("monitored", False)
statistics = artist.get("statistics", {})
album_count = statistics.get("albumCount", 0)
- print(f" DEBUG: Artist monitored={monitored}, albumCount={album_count}, albums in response={len(albums)}")
+ print(
+ f" DEBUG: Artist monitored={monitored}, albumCount={album_count}, albums fetched={len(albums)}"
+ )
if not albums:
if debug:
@@ -428,7 +493,10 @@ def monitor_artist_albums(
# Check if this album matches any in the playlists
for playlist_album in playlist_album_names:
# Normalize both for comparison
- if normalize_artist_name(album_title).lower() == normalize_artist_name(playlist_album).lower():
+ if (
+ normalize_artist_name(album_title).lower()
+ == normalize_artist_name(playlist_album).lower()
+ ):
matched_count += 1
if not already_monitored:
@@ -443,13 +511,18 @@ def monitor_artist_albums(
# Optionally trigger search
if search_albums:
- client.post("/api/v1/command", {
- "name": "AlbumSearch",
- "albumIds": [album_id]
- })
+ client.post(
+ "/api/v1/command",
+ {
+ "name": "AlbumSearch",
+ "albumIds": [album_id],
+ },
+ )
except Exception as e:
if debug:
- print(f" DEBUG: Failed to monitor album {album_title}: {e}")
+ print(
+ f" DEBUG: Failed to monitor album {album_title}: {e}"
+ )
else:
if debug:
print(f" Album already monitored: {album_title}")
@@ -459,33 +532,42 @@ def monitor_artist_albums(
return (matched_count, monitored_count)
-def get_existing_artists(client: ArrClient) -> tuple[Set[str], Set[str], Dict[str, int]]:
+def get_existing_artists(
+ client: ArrClient,
+) -> tuple[Set[str], Set[str], Dict[str, int], Dict[str, int]]:
"""
Get set of artist names and foreign IDs already in Lidarr.
Returns:
- Tuple of (normalized artist names set, foreign artist IDs set, foreign_id -> lidarr_id mapping)
+ Tuple of (normalized artist names set, foreign artist IDs set,
+ foreign_id -> lidarr_id mapping, normalized_name -> lidarr_id mapping)
"""
artists = client.get("/api/v1/artist")
names = set()
foreign_ids = set()
foreign_to_lidarr_id = {}
+ name_to_lidarr_id = {}
for artist in artists:
# Store normalized name
name = artist.get("artistName", "")
+ lidarr_id = artist.get("id")
+
if name:
- names.add(normalize_artist_name(name).lower())
+ normalized_name = normalize_artist_name(name).lower()
+ names.add(normalized_name)
+ # Map normalized name to Lidarr ID
+ if lidarr_id:
+ name_to_lidarr_id[normalized_name] = lidarr_id
# Store foreign artist ID (MusicBrainz ID)
foreign_id = artist.get("foreignArtistId")
- lidarr_id = artist.get("id")
if foreign_id:
foreign_ids.add(foreign_id)
if lidarr_id:
foreign_to_lidarr_id[foreign_id] = lidarr_id
- return names, foreign_ids, foreign_to_lidarr_id
+ return names, foreign_ids, foreign_to_lidarr_id, name_to_lidarr_id
def add_artist_to_lidarr(
@@ -515,7 +597,9 @@ def add_artist_to_lidarr(
# Check for suspicious fields in artist data
if debug:
suspicious_fields = ["folder", "path", "rootFolderPath"]
- found_suspicious = {k: artist.get(k) for k in suspicious_fields if k in artist}
+ found_suspicious = {
+ k: artist.get(k) for k in suspicious_fields if k in artist
+ }
if found_suspicious:
print(f" DEBUG: Found fields in artist data: {found_suspicious}")
@@ -601,7 +685,9 @@ def try_add_single_artist(
try:
# Search for artist in Lidarr's MusicBrainz database
- search_results = search_artist_in_lidarr(lidarr, artist_name, debug=debug)
+ search_results = search_artist_in_lidarr(
+ lidarr, artist_name, debug=debug
+ )
if not search_results:
return (False, "Not found in MusicBrainz", None)
@@ -613,18 +699,28 @@ def try_add_single_artist(
if "artist" in first_result:
artist_match = first_result["artist"]
# If result has 'album' field with nested artist
- elif "album" in first_result and isinstance(first_result["album"], dict):
+ elif "album" in first_result and isinstance(
+ first_result["album"], dict
+ ):
album = first_result["album"]
if "artist" in album:
artist_match = album["artist"]
else:
- return (False, f"Album result has no artist field: {list(album.keys())}", None)
+ return (
+ False,
+ f"Album result has no artist field: {list(album.keys())}",
+ None,
+ )
# If result has 'artistName', it's already an artist result
elif "artistName" in first_result:
artist_match = first_result
else:
# Unknown result type
- return (False, f"Unexpected search result format: {list(first_result.keys())}", None)
+ return (
+ False,
+ f"Unexpected search result format: {list(first_result.keys())}",
+ None,
+ )
artist_mb_name = artist_match.get("artistName", artist_name)
foreign_artist_id = artist_match.get("foreignArtistId")
@@ -668,7 +764,11 @@ def try_add_single_artist(
return (True, "", artist_mb_name)
else:
- return (False, "Failed to add artist (no ID returned)", artist_mb_name)
+ return (
+ False,
+ "Failed to add artist (no ID returned)",
+ artist_mb_name,
+ )
else:
print(" [DRY RUN] Would add this artist")
return (True, "", artist_mb_name)
@@ -747,17 +847,14 @@ def run(
playlist_ids = selected_ids
elif not playlist_ids:
- print(
- "\nError: No playlist IDs provided and no Spotify username set."
- )
+ print("\nError: No playlist IDs provided and no Spotify username set.")
print(
"Either provide playlist IDs as arguments or use "
"--spotify-username for interactive mode."
)
print("\nExamples:")
print(
- " arr lidarr sync-spotify http://localhost:8686 "
- "-u your-username"
+ " arr lidarr sync-spotify http://localhost:8686 -u your-username"
)
print(
" arr lidarr sync-spotify http://localhost:8686 "
@@ -774,13 +871,17 @@ def run(
print("\nChecking Lidarr naming configuration...")
try:
naming_config = lidarr.get("/api/v1/config/naming")
- artist_folder_format = naming_config.get("artistFolderFormat", "Not set")
+ artist_folder_format = naming_config.get(
+ "artistFolderFormat", "Not set"
+ )
print(f" Artist Folder Format: {artist_folder_format}")
except Exception as e:
print(f" Could not fetch naming config: {e}")
# Use specified root folder or auto-detect from Lidarr
- if root_folder and root_folder != "/music": # /music is the default from CLI
+ if (
+ root_folder and root_folder != "/music"
+ ): # /music is the default from CLI
if debug:
print(f"\nUsing user-specified root folder: {root_folder}")
else:
@@ -794,9 +895,13 @@ def run(
print(f" Root Folder: {root_folder}")
# Verify no duplicate segments in root folder
- _, has_duplicate, duplicate_seg = strip_duplicate_path_segments(root_folder)
+ _, has_duplicate, duplicate_seg = strip_duplicate_path_segments(
+ root_folder
+ )
if has_duplicate:
- print(f" WARNING: Root folder has duplicate segment '{duplicate_seg}'!")
+ print(
+ f" WARNING: Root folder has duplicate segment '{duplicate_seg}'!"
+ )
print(" This should have been cleaned by get_root_folder_path()")
# Track all unique artists and their albums
@@ -819,6 +924,7 @@ def run(
# Extract artists and albums
for track in tracks:
+ # Add all track artists (including features) to the list
for artist in track.get("artists", []):
artist_name = artist.get("name")
if artist_name:
@@ -827,11 +933,17 @@ def run(
"spotify_id": artist.get("id"),
"albums": set(),
}
- # Track which albums appear in playlists
- if track.get("album"):
- all_artists[artist_name]["albums"].add(
- track["album"]
- )
+
+ # But only associate the album with the album's actual artists
+ # (not featuring artists on the track)
+ album = track.get("album")
+ if album:
+ album_name = album.get("name")
+ album_artists = album.get("artists", [])
+ for artist in album_artists:
+ artist_name = artist.get("name")
+ if artist_name and artist_name in all_artists:
+ all_artists[artist_name]["albums"].add(album_name)
except Exception as e:
print(f"Error fetching playlist {playlist_id}: {e}")
@@ -846,7 +958,12 @@ def run(
# Check which artists are already in Lidarr
print_section_header("CHECKING LIDARR")
print("Fetching existing artists from Lidarr...")
- existing_names, existing_foreign_ids, foreign_to_lidarr_id = get_existing_artists(lidarr)
+ (
+ existing_names,
+ existing_foreign_ids,
+ foreign_to_lidarr_id,
+ name_to_lidarr_id,
+ ) = get_existing_artists(lidarr)
print(f"Found {len(existing_names)} artists already in Lidarr")
# Separate artists into existing and missing
@@ -987,47 +1104,35 @@ def run(
if not ctx.dry_run:
print_section_header("MONITORING PLAYLIST ALBUMS")
print("Checking which albums from playlists should be monitored...")
- print("\nNote: Newly added artists may take a few moments for Lidarr to")
+ print(
+ "\nNote: Newly added artists may take a few moments for Lidarr to"
+ )
print(" fetch their discography from MusicBrainz. If albums are")
- print(" missing, try running the sync again in a minute or trigger")
+ print(
+ " missing, try running the sync again in a minute or trigger"
+ )
print(" a 'Refresh Artist' in Lidarr's UI.\n")
# Collect artists to monitor (newly added + already existing)
artists_to_monitor = []
- # Add newly added artists (need to fetch their IDs)
- # For now, we'll fetch all artists again to get IDs
+ # Fetch updated artist list (includes newly added artists)
print("Fetching updated artist list...")
- _, _, updated_foreign_to_lidarr_id = get_existing_artists(lidarr)
+ _, _, _, updated_name_to_lidarr_id = get_existing_artists(lidarr)
# Process all artists that have albums in playlists
for artist_name, artist_data in all_artists.items():
if not artist_data.get("albums"):
continue
- # Try to find this artist in Lidarr by checking if we have their foreign ID
- # We need to search for them to get the foreign ID
- search_results = search_artist_in_lidarr(lidarr, artist_name, debug=False)
- if not search_results:
- continue
+ # Look up artist by normalized name (no MusicBrainz search needed!)
+ normalized_name = normalize_artist_name(artist_name).lower()
+ lidarr_id = updated_name_to_lidarr_id.get(normalized_name)
- # Extract artist from search result
- first_result = search_results[0]
- artist_match = None
- if "artist" in first_result:
- artist_match = first_result["artist"]
- elif "album" in first_result and isinstance(first_result["album"], dict):
- artist_match = first_result["album"].get("artist")
- elif "artistName" in first_result:
- artist_match = first_result
-
- if not artist_match:
- continue
-
- foreign_id = artist_match.get("foreignArtistId")
- if foreign_id and foreign_id in updated_foreign_to_lidarr_id:
- lidarr_id = updated_foreign_to_lidarr_id[foreign_id]
- artists_to_monitor.append((artist_name, lidarr_id, artist_data["albums"]))
+ if lidarr_id:
+ artists_to_monitor.append(
+ (artist_name, lidarr_id, artist_data["albums"])
+ )
if not artists_to_monitor:
print("No artists with playlist albums found in Lidarr")
@@ -1044,19 +1149,44 @@ def run(
lidarr_id,
playlist_albums,
search_albums=False, # Don't auto-search for now
- debug=debug
+ debug=debug,
)
total_matched += matched
total_monitored += monitored
if matched == 0:
# Check if artist has no albums at all (might need refresh)
- artist_data = lidarr.get(f"/api/v1/artist/{lidarr_id}")
- if artist_data and not artist_data.get("albums"):
- artists_needing_refresh.append((artist_name, lidarr_id))
+ artist_albums = lidarr.get(
+ "/api/v1/album", params={"artistId": lidarr_id}
+ )
+ if not isinstance(artist_albums, list):
+ artist_albums = []
+
+ if not artist_albums:
+ artists_needing_refresh.append(
+ (artist_name, lidarr_id)
+ )
print(" No albums found - may need refresh")
+ if debug:
+ print(
+ f" DEBUG: Artist has {len(artist_albums)} albums in API response"
+ )
+ print(
+ f" DEBUG: Albums in playlist: {list(playlist_albums)[:3]}"
+ )
else:
print(" No matching albums found in Lidarr")
+ if debug:
+ print(
+ f" DEBUG: Artist has {len(artist_albums)} albums in Lidarr"
+ )
+ if artist_albums:
+ print(
+ f" DEBUG: Sample Lidarr albums: {[a.get('title') for a in artist_albums[:3]]}"
+ )
+ print(
+ f" DEBUG: Albums in playlist: {list(playlist_albums)[:3]}"
+ )
elif monitored == 0:
print(f" {matched} album(s) already monitored")
@@ -1065,27 +1195,40 @@ def run(
# Offer to refresh artists with no albums
if artists_needing_refresh:
- print(f"\n {len(artists_needing_refresh)} artist(s) have no albums and may need refresh:")
+ print(
+ f"\n {len(artists_needing_refresh)} artist(s) have no albums and may need refresh:"
+ )
for name, _ in artists_needing_refresh[:5]:
print(f" - {name}")
if len(artists_needing_refresh) > 5:
- print(f" ... and {len(artists_needing_refresh) - 5} more")
+ print(
+ f" ... and {len(artists_needing_refresh) - 5} more"
+ )
if not ctx.no_confirm:
- response = input("\n Trigger refresh for these artists? (y/n): ").lower().strip()
+ response = (
+ input("\n Trigger refresh for these artists? (y/n): ")
+ .lower()
+ .strip()
+ )
if response in ["y", "yes"]:
print("\n Triggering refresh...")
for artist_name, artist_id in artists_needing_refresh:
print(f" Refreshing {artist_name}...", end=" ")
try:
- lidarr.post("/api/v1/command", {
- "name": "RefreshArtist",
- "artistId": artist_id
- })
+ lidarr.post(
+ "/api/v1/command",
+ {
+ "name": "RefreshArtist",
+ "artistId": artist_id,
+ },
+ )
print("✓")
except Exception as e:
print(f"✗ ({e})")
- print("\n Refresh commands sent. Wait a moment and run sync again")
+ print(
+ "\n Refresh commands sent. Wait a moment and run sync again"
+ )
print(" to monitor albums from playlists.")
# Final summary
@@ -1135,9 +1278,7 @@ def run(
if artist["albums"]:
album_count = len(artist["albums"])
print(f" Albums in playlists: {album_count}")
- print(
- f"\n\nTotal: {len(dry_run_artists)} artists to add manually"
- )
+ print(f"\n\nTotal: {len(dry_run_artists)} artists to add manually")
elif added_count > 0:
print(
f"\nMonitoring mode: {monitor}\n"
tools/arr/lib.py
@@ -111,9 +111,7 @@ class ArrClient:
sys.exit(1)
# Should not reach here, but just in case
- print(
- f"Error: Failed after {max_retries} attempts", file=sys.stderr
- )
+ print(f"Error: Failed after {max_retries} attempts", file=sys.stderr)
sys.exit(1)
def post(
@@ -177,13 +175,24 @@ class ArrClient:
# Always try to get response details
if e.response is not None:
- print(f" Response status: {e.response.status_code}", file=sys.stderr)
- print(f" Response headers: {dict(e.response.headers)}", file=sys.stderr)
+ print(
+ f" Response status: {e.response.status_code}",
+ file=sys.stderr,
+ )
+ print(
+ f" Response headers: {dict(e.response.headers)}",
+ file=sys.stderr,
+ )
try:
error_detail = e.response.json()
- print(f" Response JSON: {error_detail}", file=sys.stderr)
+ print(
+ f" Response JSON: {error_detail}", file=sys.stderr
+ )
except Exception:
- print(f" Response text: {e.response.text}", file=sys.stderr)
+ print(
+ f" Response text: {e.response.text}",
+ file=sys.stderr,
+ )
else:
print(" No response object available", file=sys.stderr)
return {}
@@ -350,9 +359,7 @@ def print_item_list(
print(f" ... and {remaining} more")
-def get_confirmation_decision(
- ctx: CommandContext, prompt: str
-) -> bool:
+def get_confirmation_decision(ctx: CommandContext, prompt: str) -> bool:
"""
Determine whether to proceed based on dry-run, no-confirm, or user input.
@@ -451,12 +458,8 @@ def select_with_fzf(
# User cancelled or fzf not found
return []
except FileNotFoundError:
- print(
- "Error: fzf not found. Please install fzf:", file=sys.stderr
- )
- print(
- " On NixOS: nix-env -iA nixpkgs.fzf", file=sys.stderr
- )
+ print("Error: fzf not found. Please install fzf:", file=sys.stderr)
+ print(" On NixOS: nix-env -iA nixpkgs.fzf", file=sys.stderr)
print(" On other systems: see https://github.com/junegunn/fzf")
sys.exit(1)
@@ -465,8 +468,7 @@ class JellyfinClient:
"""Client for Jellyfin API interactions."""
def __init__(
- self, base_url: str, api_token: str, user_id: str,
- debug: bool = False
+ self, base_url: str, api_token: str, user_id: str, debug: bool = False
):
"""
Initialize the Jellyfin API client.
@@ -504,7 +506,7 @@ class JellyfinClient:
"""
# Check if it's already a GUID (basic check for 8-4-4-4-12 format)
if len(user_identifier) == 32 or (
- len(user_identifier) == 36 and user_identifier.count('-') == 4
+ len(user_identifier) == 36 and user_identifier.count("-") == 4
):
return user_identifier
@@ -554,9 +556,7 @@ class JellyfinClient:
)
sys.exit(1)
- def post(
- self, endpoint: str, payload: Dict[str, Any]
- ) -> Dict[str, Any]:
+ def post(self, endpoint: str, payload: Dict[str, Any]) -> Dict[str, Any]:
"""
Make a POST request to the Jellyfin API.
@@ -594,8 +594,11 @@ class JellyfinClient:
sys.exit(1)
def search_tracks(
- self, query: str, limit: int = 50, artist_name: str = None,
- track_name: str = None
+ self,
+ query: str,
+ limit: int = 50,
+ artist_name: str = None,
+ track_name: str = None,
) -> List[Dict[str, Any]]:
"""
Search for tracks in Jellyfin library.
@@ -644,7 +647,7 @@ class JellyfinClient:
matched_artist = None
for artist in artists:
- if artist.get('Name', '').lower() == artist_name_lower:
+ if artist.get("Name", "").lower() == artist_name_lower:
matched_artist = artist
break
@@ -655,7 +658,7 @@ class JellyfinClient:
artist_id = matched_artist.get("Id")
if self.debug:
- artist_name_str = matched_artist.get('Name')
+ artist_name_str = matched_artist.get("Name")
print(
f"DEBUG: Using artist: {artist_name_str} "
f"(ID: {artist_id})"
@@ -667,8 +670,7 @@ class JellyfinClient:
"Recursive": True,
"Limit": limit,
"Fields": (
- "Artists,Album,AlbumArtist,"
- "AlbumArtists,ArtistItems"
+ "Artists,Album,AlbumArtist,AlbumArtists,ArtistItems"
),
}
result = self.get(
@@ -691,31 +693,26 @@ class JellyfinClient:
"IncludeItemTypes": "Audio",
"Recursive": True,
"Limit": 200, # Larger limit: filtering client-side
- "Fields": (
- "Artists,Album,AlbumArtist,"
- "AlbumArtists,ArtistItems"
- ),
+ "Fields": ("Artists,Album,AlbumArtist,AlbumArtists,ArtistItems"),
"EnableUserData": False, # Skip user data for speed
}
- result = self.get(
- f"/Users/{self.user_id}/Items", params=params
- )
+ result = self.get(f"/Users/{self.user_id}/Items", params=params)
items = result.get("Items", [])
# Enrich items with album artist data if track artists are missing
for item in items:
- if not item.get('Artists') and item.get('AlbumId'):
+ if not item.get("Artists") and item.get("AlbumId"):
try:
- album_id = item['AlbumId']
+ album_id = item["AlbumId"]
album = self.get(
f"/Users/{self.user_id}/Items/{album_id}",
- params={"Fields": "Artists,AlbumArtists"}
+ params={"Fields": "Artists,AlbumArtists"},
)
# Use album artists as track artists
- item['Artists'] = album.get('AlbumArtists', [])
- item['Album'] = album.get('Name', '')
+ item["Artists"] = album.get("AlbumArtists", [])
+ item["Album"] = album.get("Name", "")
except Exception:
pass # Continue even if album fetch fails
@@ -825,9 +822,7 @@ class JellyfinClient:
f"{self.base_url}/Playlists/{playlist_id}/Items"
f"?EntryIds={','.join(item_ids)}"
)
- response = requests.delete(
- url, headers=self.headers, timeout=30
- )
+ response = requests.delete(url, headers=self.headers, timeout=30)
response.raise_for_status()
return True
except requests.exceptions.RequestException as e:
@@ -886,6 +881,7 @@ class SpotifyClient:
for item in results.get("items", []):
if item and item.get("track"):
track = item["track"]
+ album_obj = track.get("album", {})
tracks.append(
{
"name": track.get("name"),
@@ -896,8 +892,17 @@ class SpotifyClient:
}
for artist in track.get("artists", [])
],
- "album": track.get("album", {}).get("name"),
- "album_id": track.get("album", {}).get("id"),
+ "album": {
+ "name": album_obj.get("name"),
+ "id": album_obj.get("id"),
+ "artists": [
+ {
+ "name": artist.get("name"),
+ "id": artist.get("id"),
+ }
+ for artist in album_obj.get("artists", [])
+ ],
+ },
}
)