main
1"""
2Manage Lidarr queue items with interactive selection.
3
4This script:
51. Fetches all items from the Lidarr queue
62. Filters for items that need manual import or have errors
73. Displays them in an interactive selector (fzf)
84. Allows removal of selected queue items
9"""
10
11import json
12import subprocess
13import sys
14from typing import Any, Dict, List
15
16from lib import (
17 ArrClient,
18 CommandContext,
19 get_confirmation_decision,
20 print_section_header,
21)
22
23
24def get_queue_items(
25 client: ArrClient,
26 include_unknown_artist: bool = True,
27 page_size: int = 1000,
28) -> List[Dict[str, Any]]:
29 """
30 Fetch all items from the Lidarr queue.
31
32 Args:
33 client: ArrClient instance
34 include_unknown_artist: Include items without matched artists
35 page_size: Number of items per page
36
37 Returns:
38 List of queue item dictionaries
39 """
40 params = {
41 "page": 1,
42 "pageSize": page_size,
43 "includeUnknownArtistItems": include_unknown_artist,
44 "includeAlbum": True,
45 "includeArtist": True,
46 }
47 response = client.get("/api/v1/queue", params=params)
48
49 # Handle paginated response
50 if isinstance(response, dict):
51 return response.get("records", [])
52 return response
53
54
55def delete_queue_item(
56 client: ArrClient,
57 queue_id: int,
58 remove_from_client: bool = True,
59 blocklist: bool = False,
60 skip_redownload: bool = False,
61) -> bool:
62 """
63 Delete a queue item.
64
65 Args:
66 client: ArrClient instance
67 queue_id: Queue item ID to delete
68 remove_from_client: Remove from download client
69 blocklist: Add to blocklist
70 skip_redownload: Skip automatic redownload
71
72 Returns:
73 True if successful
74 """
75 params = {
76 "removeFromClient": str(remove_from_client).lower(),
77 "blocklist": str(blocklist).lower(),
78 "skipRedownload": str(skip_redownload).lower(),
79 }
80
81 # Use requests directly for DELETE with query params
82 import requests
83
84 url = f"{client.base_url}/api/v1/queue/{queue_id}"
85 try:
86 response = requests.delete(
87 url, headers=client.headers, params=params, timeout=30
88 )
89 response.raise_for_status()
90 return True
91 except requests.exceptions.RequestException as e:
92 print(f"Error deleting queue item {queue_id}: {e}")
93 return False
94
95
96def format_queue_item(item: Dict[str, Any]) -> str:
97 """
98 Format a queue item for display.
99
100 Args:
101 item: Queue item dictionary
102
103 Returns:
104 Formatted string for display
105 """
106 queue_id = item.get("id", "?")
107 status = item.get("status", "unknown")
108 error_message = item.get("errorMessage", "")
109
110 # Get artist and album info if available
111 artist_name = "Unknown Artist"
112 album_title = "Unknown Album"
113
114 if "artist" in item and item["artist"]:
115 artist_name = item["artist"].get("artistName", artist_name)
116
117 if "album" in item and item["album"]:
118 album_title = item["album"].get("title", album_title)
119
120 # Get quality info
121 quality = "Unknown"
122 if "quality" in item and item["quality"]:
123 quality_profile = item["quality"].get("quality", {})
124 quality = quality_profile.get("name", "Unknown")
125
126 # Get size info
127 size_str = ""
128 size = item.get("size", 0)
129 sizeleft = item.get("sizeleft", 0)
130 if size > 0:
131 size_mb = size / (1024 * 1024)
132 if sizeleft > 0:
133 percent = ((size - sizeleft) / size) * 100
134 size_str = f"{size_mb:.1f}MB ({percent:.0f}%)"
135 else:
136 size_str = f"{size_mb:.1f}MB"
137
138 # Get protocol and download client
139 protocol = item.get("protocol", "")
140 download_client = item.get("downloadClient", "")
141
142 # Get tracked download info
143 tracked_status = item.get("trackedDownloadStatus", "")
144
145 # Determine status indicator
146 status_icon = "⚠️"
147 if status == "warning":
148 status_icon = "⚠️"
149 elif status == "error":
150 status_icon = "❌"
151 elif "completed" in status.lower():
152 status_icon = "✓"
153 elif "downloading" in status.lower():
154 status_icon = "⬇"
155 elif "manual" in status.lower():
156 status_icon = "🔧"
157
158 # Build display string
159 parts = [
160 f"[{queue_id}]",
161 status_icon,
162 f"{artist_name} - {album_title}",
163 ]
164
165 # Add quality and size
166 details = []
167 if quality != "Unknown":
168 details.append(quality)
169 if size_str:
170 details.append(size_str)
171 if protocol:
172 details.append(protocol.upper())
173 if download_client:
174 details.append(f"via {download_client}")
175
176 if details:
177 parts.append(f"[{' | '.join(details)}]")
178
179 # Add status
180 status_parts = [status]
181 if tracked_status and tracked_status != status:
182 status_parts.append(tracked_status)
183 parts.append(f"({', '.join(status_parts)})")
184
185 if error_message:
186 # Truncate long error messages
187 error_short = (
188 error_message[:60] + "..."
189 if len(error_message) > 60
190 else error_message
191 )
192 parts.append(f"- {error_short}")
193
194 return " ".join(parts)
195
196
197def format_queue_item_preview(item: Dict[str, Any]) -> str:
198 """
199 Format a detailed preview of a queue item.
200
201 Args:
202 item: Queue item dictionary
203
204 Returns:
205 Formatted preview string with full details
206 """
207 lines = []
208
209 # Header
210 queue_id = item.get("id", "?")
211 title = item.get("title", "Unknown")
212 lines.append("=" * 80)
213 lines.append(f"QUEUE ITEM #{queue_id}")
214 lines.append("=" * 80)
215 lines.append("")
216
217 # Basic Info
218 lines.append("BASIC INFO:")
219 lines.append(f" Title: {title}")
220
221 if "artist" in item and item["artist"]:
222 artist_name = item["artist"].get("artistName", "Unknown")
223 lines.append(f" Artist: {artist_name}")
224
225 if "album" in item and item["album"]:
226 album_title = item["album"].get("title", "Unknown")
227 release_date = item["album"].get("releaseDate", "Unknown")
228 lines.append(f" Album: {album_title}")
229 lines.append(f" Release Date: {release_date}")
230
231 lines.append("")
232
233 # Download Info
234 lines.append("DOWNLOAD INFO:")
235 status = item.get("status", "unknown")
236 lines.append(f" Status: {status}")
237
238 tracked_status = item.get("trackedDownloadStatus", "")
239 if tracked_status:
240 lines.append(f" Tracked Status: {tracked_status}")
241
242 tracked_state = item.get("trackedDownloadState", "")
243 if tracked_state:
244 lines.append(f" Tracked State: {tracked_state}")
245
246 protocol = item.get("protocol", "")
247 if protocol:
248 lines.append(f" Protocol: {protocol.upper()}")
249
250 download_client = item.get("downloadClient", "")
251 if download_client:
252 lines.append(f" Download Client: {download_client}")
253
254 # Size info
255 size = item.get("size", 0)
256 sizeleft = item.get("sizeleft", 0)
257 if size > 0:
258 size_mb = size / (1024 * 1024)
259 lines.append(f" Total Size: {size_mb:.2f} MB")
260 if sizeleft > 0:
261 sizeleft_mb = sizeleft / (1024 * 1024)
262 percent = ((size - sizeleft) / size) * 100
263 downloaded_mb = size_mb - sizeleft_mb
264 lines.append(
265 f" Downloaded: {downloaded_mb:.2f} MB ({percent:.1f}%)"
266 )
267 lines.append(f" Remaining: {sizeleft_mb:.2f} MB")
268
269 lines.append("")
270
271 # Quality Info
272 if "quality" in item and item["quality"]:
273 lines.append("QUALITY:")
274 quality_profile = item["quality"].get("quality", {})
275 quality_name = quality_profile.get("name", "Unknown")
276 lines.append(f" Quality: {quality_name}")
277 lines.append("")
278
279 # Output Path
280 output_path = item.get("outputPath", "")
281 if output_path:
282 lines.append("OUTPUT PATH:")
283 lines.append(f" {output_path}")
284 lines.append("")
285
286 # Error/Warning Messages
287 error_message = item.get("errorMessage", "")
288 if error_message:
289 lines.append("ERROR MESSAGE:")
290 lines.append(f" {error_message}")
291 lines.append("")
292
293 status_messages = item.get("statusMessages", [])
294 if status_messages:
295 lines.append("STATUS MESSAGES:")
296 for msg in status_messages:
297 msg_title = msg.get("title", "")
298 msg_messages = msg.get("messages", [])
299 if msg_title:
300 lines.append(f" • {msg_title}")
301 for m in msg_messages:
302 lines.append(f" - {m}")
303 lines.append("")
304
305 # Download ID
306 download_id = item.get("downloadId", "")
307 if download_id:
308 lines.append("DOWNLOAD ID:")
309 lines.append(f" {download_id}")
310 lines.append("")
311
312 # Timestamps
313 lines.append("TIMESTAMPS:")
314 added = item.get("added", "")
315 if added:
316 lines.append(f" Added: {added}")
317
318 estimated_completion = item.get("estimatedCompletionTime", "")
319 if estimated_completion:
320 lines.append(f" Estimated Completion: {estimated_completion}")
321
322 return "\n".join(lines)
323
324
325def select_queue_items_with_preview(
326 items: List[Dict[str, Any]]
327) -> List[str]:
328 """
329 Use fzf with preview to interactively select queue items.
330
331 Args:
332 items: List of queue item dictionaries
333
334 Returns:
335 List of selected item IDs (empty list if cancelled)
336 """
337 if not items:
338 return []
339
340 # Create a temporary mapping file for preview
341 import tempfile
342
343 # Create lookup table: display text -> item data
344 lookup = {}
345 lines = []
346
347 for item in items:
348 item_id = str(item.get("id"))
349 display = format_queue_item(item)
350 lines.append(display)
351 lookup[display] = {
352 "id": item_id,
353 "preview": format_queue_item_preview(item)
354 }
355
356 # Prepare fzf input
357 fzf_input = "\n".join(lines)
358
359 # Create a temporary script for preview
360 with tempfile.NamedTemporaryFile(
361 mode='w', suffix='.json', delete=False
362 ) as f:
363 # Store the lookup data
364 preview_data = {
365 display: data["preview"] for display, data in lookup.items()
366 }
367 json.dump(preview_data, f)
368 preview_file = f.name
369
370 try:
371 # Run fzf with preview
372 # We'll use a simple approach: pass the preview text directly
373 fzf_args = [
374 "fzf",
375 "--ansi",
376 "--multi",
377 "--prompt=Select queue items (TAB to select, ENTER to confirm): ",
378 "--preview=echo {}",
379 "--preview-window=right:60%:wrap",
380 "--bind=ctrl-/:toggle-preview",
381 ]
382
383 # Create a Python script for preview
384 import os
385 preview_script = tempfile.NamedTemporaryFile(
386 mode='w', suffix='.py', delete=False
387 )
388 preview_script.write(f"""#!/usr/bin/env python3
389import json
390import sys
391
392preview_file = {repr(preview_file)}
393line = sys.argv[1] if len(sys.argv) > 1 else ""
394
395try:
396 with open(preview_file, 'r') as f:
397 data = json.load(f)
398 print(data.get(line, "No preview available"))
399except Exception as e:
400 print(f"Error: {{e}}")
401""")
402 preview_script.close()
403 os.chmod(preview_script.name, 0o755)
404
405 prompt = (
406 "Select queue items "
407 "(TAB to select, ENTER to confirm, Ctrl-/ to toggle preview): "
408 )
409 header = (
410 "TAB: select | ENTER: confirm | Ctrl-/: toggle preview | "
411 "Ctrl-↑/↓: scroll preview | ESC: cancel"
412 )
413 fzf_args = [
414 "fzf",
415 "--ansi",
416 "--multi",
417 f"--prompt={prompt}",
418 f"--preview={preview_script.name} {{}}",
419 "--preview-window=right:60%:wrap",
420 "--bind=ctrl-/:toggle-preview",
421 "--bind=ctrl-up:preview-page-up",
422 "--bind=ctrl-down:preview-page-down",
423 "--bind=ctrl-u:preview-half-page-up",
424 "--bind=ctrl-d:preview-half-page-down",
425 f"--header={header}"
426 ]
427
428 result = subprocess.run(
429 fzf_args,
430 input=fzf_input,
431 text=True,
432 capture_output=True,
433 check=True,
434 )
435
436 # Parse selected lines
437 selected_lines = result.stdout.strip().split("\n")
438 selected_ids = [
439 lookup[line]["id"]
440 for line in selected_lines
441 if line in lookup
442 ]
443
444 return selected_ids
445
446 except subprocess.CalledProcessError:
447 # User cancelled or fzf not found
448 return []
449 except FileNotFoundError:
450 print("Error: fzf not found. Please install fzf:", file=sys.stderr)
451 print(
452 " On NixOS: nix-env -iA nixpkgs.fzf",
453 file=sys.stderr
454 )
455 print(
456 " On other systems: see https://github.com/junegunn/fzf",
457 file=sys.stderr
458 )
459 sys.exit(1)
460 finally:
461 # Clean up temporary files
462 import os
463 try:
464 os.unlink(preview_file)
465 os.unlink(preview_script.name)
466 except Exception:
467 pass
468
469
470def filter_queue_items(
471 items: List[Dict[str, Any]],
472 filter_type: str = "all",
473 tracked_state: str = None
474) -> List[Dict[str, Any]]:
475 """
476 Filter queue items by type and tracked state.
477
478 Args:
479 items: List of queue items
480 filter_type: Filter type - 'all', 'manual', 'warning',
481 'error', 'completed'
482 tracked_state: Specific tracked download state to filter by
483 (e.g., 'importFailed', 'imported', 'importing')
484
485 Returns:
486 Filtered list of queue items
487 """
488 filtered = items if filter_type == "all" else []
489
490 # First apply the filter_type filter
491 if filter_type != "all":
492 for item in items:
493 status = item.get("status", "").lower()
494 error_message = item.get("errorMessage", "").lower()
495 tracked_download_status = item.get("trackedDownloadStatus", "")
496 tracked_download_status = tracked_download_status.lower()
497
498 if filter_type == "manual":
499 # Items that need manual import
500 if (
501 "warning" in status
502 or "manual" in tracked_download_status
503 or "manual import" in error_message
504 ):
505 filtered.append(item)
506 elif filter_type == "warning":
507 if "warning" in status:
508 filtered.append(item)
509 elif filter_type == "error":
510 if "error" in status or item.get("errorMessage"):
511 filtered.append(item)
512 elif filter_type == "completed":
513 if (
514 "completed" in status
515 or "completed" in tracked_download_status
516 ):
517 filtered.append(item)
518
519 # Then apply the tracked_state filter if specified
520 if tracked_state:
521 tracked_state_lower = tracked_state.lower()
522 filtered = [
523 item
524 for item in filtered
525 if item.get("trackedDownloadState", "").lower()
526 == tracked_state_lower
527 ]
528
529 return filtered
530
531
532def run(
533 url: str,
534 api_key: str,
535 filter_type: str,
536 tracked_state: str,
537 remove_from_client: bool,
538 blocklist: bool,
539 skip_redownload: bool,
540 dry_run: bool,
541 no_confirm: bool,
542):
543 """Execute the lidarr manage-queue command."""
544 # Create client and context
545 client = ArrClient(url, api_key)
546 ctx = CommandContext(dry_run, no_confirm)
547
548 print_section_header("FETCHING LIDARR QUEUE")
549 print(f"Connecting to {client.base_url}...")
550
551 all_items = get_queue_items(client)
552 print(f"Found {len(all_items)} total queue items")
553
554 if not all_items:
555 print("\nQueue is empty!")
556 return
557
558 # Filter items based on filter type and tracked state
559 filtered_items = filter_queue_items(all_items, filter_type, tracked_state)
560
561 # Build filter description
562 filter_desc_parts = []
563 if filter_type != "all":
564 filter_desc_parts.append(f"type: {filter_type}")
565 if tracked_state:
566 filter_desc_parts.append(f"tracked state: {tracked_state}")
567
568 if filter_desc_parts:
569 filter_desc = ", ".join(filter_desc_parts)
570 print(
571 f"Filtered to {len(filtered_items)} items "
572 f"({filter_desc})"
573 )
574
575 if not filtered_items:
576 filter_msg = f"filter '{filter_type}'"
577 if tracked_state:
578 filter_msg = (
579 f"filters (type: {filter_type}, "
580 f"tracked state: {tracked_state})"
581 )
582 print(f"\nNo items matching {filter_msg}")
583 return
584
585 print_section_header("QUEUE ITEMS")
586
587 # Display summary by status
588 status_counts = {}
589 for item in filtered_items:
590 status = item.get("status", "unknown")
591 status_counts[status] = status_counts.get(status, 0) + 1
592
593 print("\nItems by status:")
594 for status, count in sorted(status_counts.items()):
595 print(f" - {status}: {count}")
596
597 # Interactive selection with preview
598 print("\nOpening interactive selector with preview...")
599 print(
600 "Controls: TAB=select | Ctrl-/=toggle preview | "
601 "Ctrl-↑/↓=scroll preview | ENTER=confirm"
602 )
603
604 selected_ids = select_queue_items_with_preview(filtered_items)
605
606 if not selected_ids:
607 print("\nNo items selected. Exiting.")
608 return
609
610 print(f"\n{len(selected_ids)} item(s) selected for removal")
611
612 # Show what will be removed
613 print("\nItems to remove:")
614 for item_id in selected_ids:
615 item = next(
616 (i for i in filtered_items if str(i.get("id")) == item_id), None
617 )
618 if item:
619 print(f" - {format_queue_item(item)}")
620
621 # Show removal options
622 print("\nRemoval options:")
623 print(f" - Remove from download client: {remove_from_client}")
624 print(f" - Add to blocklist: {blocklist}")
625 print(f" - Skip automatic redownload: {skip_redownload}")
626
627 # Confirm deletion
628 prompt = f"\nRemove {len(selected_ids)} item(s) from queue?"
629 should_proceed = get_confirmation_decision(ctx, prompt)
630
631 if not should_proceed:
632 print("Cancelled.")
633 return
634
635 # Delete selected items
636 print_section_header("REMOVING QUEUE ITEMS")
637
638 success_count = 0
639 failed_count = 0
640
641 for item_id in selected_ids:
642 queue_id = int(item_id)
643 item = next(
644 (i for i in filtered_items if i.get("id") == queue_id), None
645 )
646 item_title = item.get("title", f"ID {queue_id}") if item else queue_id
647
648 print(f"\nRemoving: {item_title}")
649
650 if delete_queue_item(
651 client,
652 queue_id,
653 remove_from_client,
654 blocklist,
655 skip_redownload,
656 ):
657 print(" ✓ Removed successfully")
658 success_count += 1
659 else:
660 print(" ✗ Failed to remove")
661 failed_count += 1
662
663 # Final summary
664 print_section_header("FINAL SUMMARY")
665 print(f"\nItems selected: {len(selected_ids)}")
666 print(f" - Successfully removed: {success_count}")
667 print(f" - Failed: {failed_count}")
668
669 if success_count > 0:
670 print("\n✓ Queue items removed successfully")