fedora-csb-system-manager
  1#!/usr/bin/env -S uv run --script
  2# /// script
  3# dependencies = [
  4#     "requests",
  5#     "PyYAML",
  6#     "google-generativeai",
  7# ]
  8# ///
  9
 10import os
 11import os.path
 12import subprocess
 13import socket
 14import sys
 15from typing import Any, Dict, List, Optional
 16
 17import requests
 18import yaml  # pip install pyyaml types-pyyaml
 19import urllib.parse as urlparse
 20import google.generativeai as genai
 21
 22
 23def debug(msg: str):
 24    print(f"[DEBUG] {msg}", file=sys.stderr)
 25
 26
 27def check_running(api_base, timeout=0.5) -> bool:
 28    """Quickly check if Ollama is accessible at the given host and port."""
 29    url = urlparse.urlparse(api_base)
 30    port = url.port or (80 if url.scheme == "http" else 443)
 31    debug(f"Checking if {url.hostname}:{port} is running (api_base={api_base})")
 32    try:
 33        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 34        sock.settimeout(timeout)
 35        result = sock.connect_ex((url.hostname, port))
 36        sock.close()
 37        debug(f"Socket connect_ex result for {url.hostname}:{port}: {result}")
 38        return result == 0
 39    except Exception as e:
 40        debug(f"Exception in check_running: {e}")
 41        raise e
 42
 43
 44def load_config(config_path: str) -> Dict[str, Any]:
 45    debug(f"Loading config from {config_path}")
 46    with open(config_path, "r") as file:
 47        config = yaml.safe_load(file)
 48        debug(f"Loaded config: {config}")
 49        return config
 50
 51
 52def get_models(api_base: str, api_key_config: str | None) -> List[Dict[str, str]]:
 53    """Query the models endpoint and return a list of model data"""
 54    debug(
 55        f"get_models called with api_base={api_base}, api_key_config={api_key_config}"
 56    )
 57    actual_api_key = None
 58    if api_key_config:
 59        if api_key_config.startswith("passage::"):
 60            passage_path = api_key_config.split("::", 1)[1]
 61            debug(f"Retrieving API key from passage at {passage_path}")
 62            actual_api_key = get_passageword(passage_path)
 63            if not actual_api_key:
 64                print(
 65                    f"Could not retrieve API key from passage for path: {passage_path}",
 66                    file=sys.stderr,
 67                )
 68                debug(f"Failed to retrieve API key from passage for {passage_path}")
 69                # Decide how to handle failure: skip, return empty, etc.
 70                # Here we'll proceed without a key, which might fail later.
 71        else:
 72            actual_api_key = api_key_config  # Use the key directly if not a passage path
 73            debug("Using API key directly from config")
 74
 75    headers = {}
 76    if actual_api_key:
 77        headers["Authorization"] = f"Bearer {actual_api_key}"
 78        debug("Authorization header set")
 79
 80    # Ensure the URL is properly formatted
 81    if not api_base.endswith("/"):
 82        api_base = api_base + "/"
 83        debug(f"api_base adjusted to {api_base}")
 84
 85    models_url = f"{api_base}models"
 86    debug(f"Querying models endpoint: {models_url}")
 87
 88    try:
 89        response = requests.get(models_url, headers=headers, timeout=10)
 90        debug(f"HTTP GET {models_url} status_code={response.status_code}")
 91        response.raise_for_status()
 92        data = response.json()
 93        debug(f"Response JSON: {data}")
 94
 95        # Extract models from response
 96        models = data.get("data", [])
 97        debug(f"Extracted models: {models}")
 98        return [
 99            {"name": model.get("id"), "description": model.get("id")}
100            for model in models
101        ]
102    except Exception as e:
103        print(f"Error querying {api_base}: {str(e)}", file=sys.stderr)
104        debug(f"Exception in get_models: {e}")
105        return []
106
107
108def get_gemini_models(api_key: Optional[str]) -> List[Dict[str, str]]:
109    """Query Google's Gemini API and return a list of available models"""
110    debug(f"get_gemini_models called with api_key={'***' if api_key else None}")
111    if not api_key:
112        print("Error: API key is required for Google Gemini API", file=sys.stderr)
113        debug("No API key provided to get_gemini_models")
114        return []
115
116    try:
117        # Configure the Gemini API with the provided key
118        debug("Configuring genai with provided API key")
119        genai.configure(api_key=api_key)
120
121        # Get list of available models
122        debug("Listing models from genai")
123        models_list = genai.list_models()
124        debug(f"Models list: {models_list}")
125
126        # Filter for Gemini models
127        gemini_models = [
128            {"name": model.name.split("/")[-1], "description": model.description}
129            for model in models_list
130            if "gemini" in model.name.lower()
131        ]
132        debug(f"Filtered Gemini models: {gemini_models}")
133
134        return gemini_models
135    except Exception as e:
136        print(f"Error querying Google Gemini API: {str(e)}", file=sys.stderr)
137        debug(f"Exception in get_gemini_models: {e}")
138        return []
139
140
141def get_passageword(passage_path: str) -> str | None:
142    """Retrieve passageword from passage using the given path."""
143    debug(f"get_passageword called for passage_path={passage_path}")
144    try:
145        result = subprocess.run(
146            ["passage", "show", passage_path], capture_output=True, text=True, check=True
147        )
148        # Return the first line of the output, stripping newline
149        passageword = result.stdout.splitlines()[0]
150        debug(f"Passageword retrieved from passage for {passage_path}")
151        return passageword
152    except FileNotFoundError:
153        print(
154            "Error: 'passage' command not found. Is passage installed and in your PATH?",
155            file=sys.stderr,
156        )
157        debug("'passage' command not found")
158        return None
159    except subprocess.CalledProcessError as e:
160        print(f"Error running passage show {passage_path}: {e.stderr}", file=sys.stderr)
161        debug(f"subprocess.CalledProcessError in get_passageword: {e}")
162        return None
163    except IndexError:
164        print(f"Error: 'passage show {passage_path}' returned empty output.", file=sys.stderr)
165        debug(f"IndexError: passage show {passage_path} returned empty output")
166        return None
167
168
169def main():
170    # Support reading from script directory or ~/.config/aichat/
171    script_dir = os.path.dirname(os.path.abspath(__file__))
172    config_path = os.path.join(script_dir, "config.yaml.in")
173
174    if not os.path.exists(config_path):
175        config_path = os.path.expanduser("~/.config/aichat/config.yaml.in")
176
177    debug(f"main: config_path={config_path}")
178    config_data = load_config(config_path)
179
180    if "clients" in config_data:
181        updated_clients = []
182        debug(f"main: found {len(config_data.get('clients', []))} clients")
183        for client in config_data.get("clients", []):
184            # Make a copy to avoid modifying the original dict during iteration if needed elsewhere
185            updated_client = client.copy()
186            debug(f"Processing client: {updated_client}")
187
188            actual_api_key = None
189            api_key_config = updated_client.get("api_key")
190            if api_key_config and api_key_config.startswith("passage::"):
191                passage_path = api_key_config.split("::", 1)[1]
192                debug(f"main: retrieving api_key from passage for {passage_path}")
193                actual_api_key = get_passageword(passage_path)
194            else:
195                actual_api_key = api_key_config
196            updated_client["api_key"] = actual_api_key
197            debug("main: actual_api_key set for client")
198
199            # For OpenAI-compatible clients, query and potentially add models
200            if updated_client.get("type") == "openai-compatible":
201                # Check if models are NOT already defined in config or are empty
202                if not updated_client.get("models"):
203                    api_base = updated_client.get("api_base")
204                    api_key = updated_client.get("api_key")
205                    debug(
206                        f"main: openai-compatible client, api_base={api_base}, api_key={'***' if api_key else None}"
207                    )
208
209                    # Skip ollama explicitly if needed, or handle based on your logic
210                    if api_base:
211                        if not check_running(api_base):
212                            debug(f"main: {api_base} not running, skipping client")
213                            continue
214                        # Try to fetch models from API
215                        fetched_models = get_models(api_base, api_key)
216                        if fetched_models:
217                            updated_client["models"] = fetched_models
218                            debug("main: models fetched and set for client")
219                        else:
220                            # Keep models empty/undefined or add an empty list
221                            updated_client["models"] = []
222                            debug("main: no models fetched, set empty list")
223                    else:
224                        # Handle cases where type is openai-compatible but no api_base
225                        updated_client["models"] = []
226                        debug(
227                            "main: openai-compatible client with no api_base, set empty models"
228                        )
229
230            # For Google Gemini clients, query and potentially add models
231            elif updated_client.get("type") == "gemini":
232                # Check if models are NOT already defined in config or are empty
233                if not updated_client.get("models"):
234                    api_key = updated_client.get("api_key")
235                    debug(f"main: gemini client, api_key={'***' if api_key else None}")
236                    fetched_models = get_gemini_models(api_key)
237                    if fetched_models:
238                        updated_client["models"] = fetched_models
239                        debug("main: gemini models fetched and set for client")
240                    else:
241                        updated_client["models"] = []
242                        debug("main: no gemini models fetched, set empty list")
243
244            updated_clients.append(updated_client)
245            debug("main: client processed and added to updated_clients")
246
247        # Replace the original clients list with the updated one
248        config_data["clients"] = updated_clients
249        debug("main: updated_clients set in config_data")
250
251    # Print the entire (potentially updated) configuration as YAML
252    debug("main: dumping config_data as YAML")
253    print(yaml.dump(config_data, default_flow_style=False, sort_keys=False))
254
255
256if __name__ == "__main__":
257    main()