main
  1#!/usr/bin/env python3
  2"""
  3XMPP Research Bot - Automated research assistant via XMPP
  4
  5Listens for dynamic commands and uses Claude/Gemini APIs to generate responses.
  6Results are saved to inbox.org for later review.
  7"""
  8
  9import asyncio
 10import logging
 11import os
 12import sys
 13import yaml
 14from datetime import datetime
 15from pathlib import Path
 16from typing import Dict, Any
 17
 18import slixmpp
 19from anthropic import AnthropicVertex
 20
 21try:
 22    from google import genai
 23    GEMINI_AVAILABLE = True
 24except ImportError:
 25    GEMINI_AVAILABLE = False
 26    logging.warning("google-genai not available, Gemini support disabled")
 27
 28# Configure logging
 29logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
 30log = logging.getLogger(__name__)
 31
 32
 33# Default commands configuration
 34DEFAULT_COMMANDS = {
 35    "research": {
 36        "description": "Research assistant for quick, accurate queries",
 37        "system_prompt": """You are a research assistant helping with quick, accurate research queries.
 38
 39Your task is to provide concise, well-structured research summaries that can be saved as notes.
 40
 41Guidelines:
 42- Provide factual, accurate information
 43- Structure responses with clear headings
 44- Include relevant sources or references when possible
 45- Keep responses focused and actionable
 46- Use markdown formatting
 47- Aim for 200-500 words unless more detail is requested""",
 48        "default_model": "sonnet",
 49        "max_tokens": 2000,
 50        "save_to": "inbox.org",
 51    }
 52}
 53
 54
 55class ResearchBot(slixmpp.ClientXMPP):
 56    """XMPP bot that performs research using Claude (Vertex AI) and Gemini (Developer API)"""
 57
 58    def __init__(self, jid, password, owner_jid, project_id, region, inbox_path, commands_path=None, gemini_api_key=None):
 59        super().__init__(jid, password)
 60        self.owner_jid = owner_jid
 61        self.inbox_path = Path(inbox_path)
 62        self.project_id = project_id
 63        self.region = region
 64        self.commands_path = commands_path
 65
 66        # Initialize Vertex AI client for Claude
 67        self.anthropic_client = AnthropicVertex(project_id=project_id, region=region)
 68
 69        # Initialize Gemini client if available
 70        # Uses direct Gemini Developer API with API key (not Vertex AI)
 71        self.gemini_client = None
 72        if GEMINI_AVAILABLE and gemini_api_key:
 73            try:
 74                self.gemini_client = genai.Client(api_key=gemini_api_key)
 75                log.info("Gemini client initialized successfully (Developer API)")
 76            except Exception as e:
 77                log.warning(f"Failed to initialize Gemini client: {e}")
 78        elif GEMINI_AVAILABLE:
 79            log.warning("Gemini API key not provided, Gemini support disabled")
 80
 81        # Load commands configuration
 82        self.commands = self.load_commands()
 83
 84        # Register plugins
 85        self.register_plugin('xep_0030')  # Service Discovery
 86        self.register_plugin('xep_0199')  # XMPP Ping
 87
 88        # Register event handlers
 89        self.add_event_handler("session_start", self.on_session_start)
 90        self.add_event_handler("message", self.on_message)
 91        self.add_event_handler("disconnected", self.on_disconnected)
 92        self.add_event_handler("connection_failed", self.on_connection_failed)
 93        self.add_event_handler("session_end", self.on_session_end)
 94
 95        log.info(f"Bot initialized: {jid}")
 96        log.info(f"Owner: {owner_jid}")
 97        log.info(f"Vertex AI: {project_id} / {region}")
 98        log.info(f"Inbox: {inbox_path}")
 99        log.info(f"Commands loaded: {', '.join(self.commands.keys())}")
100
101    def load_commands(self) -> Dict[str, Any]:
102        """Load commands from YAML file or use defaults"""
103        if self.commands_path and Path(self.commands_path).exists():
104            try:
105                with open(self.commands_path, 'r') as f:
106                    loaded = yaml.safe_load(f)
107                    if loaded and 'commands' in loaded:
108                        log.info(f"Loaded {len(loaded['commands'])} commands from {self.commands_path}")
109                        return loaded['commands']
110            except Exception as e:
111                log.error(f"Failed to load commands from {self.commands_path}: {e}")
112
113        log.info("Using default commands configuration")
114        return DEFAULT_COMMANDS
115
116    async def on_session_start(self, event):
117        """Called when XMPP session starts"""
118        self.send_presence()
119        await self.get_roster()
120        log.info("Session started, presence sent")
121
122    async def on_session_end(self, event):
123        """Called when XMPP session ends"""
124        log.warning("Session ended")
125
126    async def on_disconnected(self, event):
127        """Called when disconnected from XMPP server"""
128        log.warning(f"Disconnected from XMPP server: {event}")
129
130    async def on_connection_failed(self, event):
131        """Called when connection to XMPP server fails"""
132        log.error(f"Connection failed: {event}")
133
134    async def on_message(self, msg):
135        """Handle incoming messages"""
136        # Only respond to chat messages from owner
137        if msg["type"] not in ("chat", "normal"):
138            return
139
140        sender = str(msg["from"]).split("/")[0]  # Remove resource
141        if sender != self.owner_jid:
142            log.warning(f"Ignoring message from non-owner: {sender}")
143            return
144
145        body = msg["body"].strip()
146        log.info(f"Received from {sender}: {body}")
147
148        # Check if message starts with /
149        if not body.startswith("/"):
150            help_msg = """I'm a research assistant bot! 🤖
151
152Use commands like:
153• /research <query> - Research a topic
154• /help - Show all commands
155
156Example: /research how does XMPP work?"""
157            msg.reply(help_msg).send()
158            return
159
160        # Parse command and arguments
161        parts = body[1:].split(None, 1)
162        command = parts[0]
163        args = parts[1] if len(parts) > 1 else ""
164
165        # Handle built-in commands
166        if command == "help":
167            await self.cmd_help(msg)
168        elif command == "ping":
169            await self.cmd_ping(msg)
170        elif command == "reload-commands":
171            await self.cmd_reload_commands(msg)
172        elif command in self.commands:
173            await self.cmd_dynamic(msg, command, args)
174        else:
175            msg.reply(f"Unknown command: /{command}\nType /help for available commands").send()
176
177    async def cmd_help(self, msg):
178        """Show help message with all available commands"""
179        help_lines = ["Available commands:"]
180
181        # Dynamic commands
182        for cmd_name, cmd_config in self.commands.items():
183            desc = cmd_config.get('description', 'No description')
184            help_lines.append(f"/{cmd_name} <query> - {desc}")
185            if cmd_name == "research":  # Show model selection for research
186                help_lines.append("  Model selection:")
187                help_lines.append("    opus: or o: - Use Claude Opus 4.5 (most intelligent)")
188                help_lines.append("    sonnet: or s: - Use Claude Sonnet 4.5 (default, faster)")
189                if self.gemini_client:
190                    help_lines.append("    gemini-pro: or gp: - Use Gemini 3 Pro (advanced reasoning)")
191                    help_lines.append("    gemini: or g: - Use Gemini 3 Flash (fast, cost-effective)")
192                help_lines.append("  Examples:")
193                help_lines.append("    /research how does XMPP work?")
194                help_lines.append("    /research opus: analyze distributed systems complexity")
195                help_lines.append("    /research gp: solve complex reasoning problem")
196
197        # Built-in commands
198        help_lines.append("/help - Show this help message")
199        help_lines.append("/ping - Check if bot is alive")
200        help_lines.append("/reload-commands - Reload commands from YAML file")
201
202        msg.reply("\n".join(help_lines)).send()
203
204    async def cmd_ping(self, msg):
205        """Ping command"""
206        msg.reply("🤖 Pong! Bot is alive.").send()
207
208    async def cmd_reload_commands(self, msg):
209        """Reload commands from YAML file"""
210        old_count = len(self.commands)
211        self.commands = self.load_commands()
212        new_count = len(self.commands)
213        msg.reply(f"✅ Reloaded commands: {old_count}{new_count}").send()
214        log.info("Commands reloaded via /reload-commands")
215
216    async def cmd_dynamic(self, msg, command: str, args: str):
217        """Execute a dynamic command from YAML configuration"""
218        if not args:
219            usage = self.commands[command].get('usage', f'Usage: /{command} <query>')
220            msg.reply(usage).send()
221            return
222
223        cmd_config = self.commands[command]
224        query = args.strip()
225
226        # Parse model selection (format: "model: query")
227        model = cmd_config.get('default_model', 'sonnet')
228        model_name = self.get_model_display_name(model)
229
230        if query.startswith("opus:") or query.startswith("o:"):
231            model = "opus"
232            model_name = "Opus 4.5"
233            query = query.split(":", 1)[1].strip()
234        elif query.startswith("sonnet:") or query.startswith("s:"):
235            model = "sonnet"
236            model_name = "Sonnet 4.5"
237            query = query.split(":", 1)[1].strip()
238        elif query.startswith("gemini:") or query.startswith("g:"):
239            if not self.gemini_client:
240                msg.reply("❌ Gemini is not available (client not initialized)").send()
241                return
242            model = "gemini"
243            model_name = "Gemini 3 Flash"
244            query = query.split(":", 1)[1].strip()
245        elif query.startswith("gemini-pro:") or query.startswith("gp:"):
246            if not self.gemini_client:
247                msg.reply("❌ Gemini is not available (client not initialized)").send()
248                return
249            model = "gemini-pro"
250            model_name = "Gemini 3 Pro"
251            query = query.split(":", 1)[1].strip()
252
253        # Acknowledge we picked up the work
254        msg.reply(f"🔍 {cmd_config.get('description', 'Processing')} with {model_name}: {query}").send()
255
256        try:
257            # Perform research/processing
258            result = await self.process_query(
259                query=query,
260                model=model,
261                system_prompt=cmd_config.get('system_prompt', ''),
262                max_tokens=cmd_config.get('max_tokens', 2000)
263            )
264
265            # Save to file if configured
266            save_to = cmd_config.get('save_to')
267            if save_to:
268                await self.save_to_file(query, result, filename=save_to, model=model_name)
269                msg.reply(f"✅ Complete! Saved to {save_to}\n\nPreview:\n{result[:200]}...").send()
270            else:
271                msg.reply(f"✅ Complete!\n\n{result}").send()
272
273            log.info(f"/{command} completed for: {query} (model: {model})")
274
275        except Exception as e:
276            log.error(f"/{command} failed: {e}", exc_info=True)
277            msg.reply(f"❌ Failed: {str(e)}").send()
278
279    def get_model_display_name(self, model: str) -> str:
280        """Get human-readable model name"""
281        model_names = {
282            "sonnet": "Sonnet 4.5",
283            "opus": "Opus 4.5",
284            "gemini": "Gemini 3 Flash",
285            "gemini-pro": "Gemini 3 Pro",
286        }
287        return model_names.get(model, model.title())
288
289    async def process_query(self, query: str, model: str, system_prompt: str, max_tokens: int) -> str:
290        """
291        Process query using specified model.
292
293        Args:
294            query: Question to answer
295            model: Model to use - "sonnet", "opus", or "gemini"
296            system_prompt: System prompt for the model
297            max_tokens: Maximum tokens to generate
298
299        Returns:
300            Generated response text
301        """
302        log.info(f"Processing query with {model}: {query}")
303
304        if model in ("sonnet", "opus"):
305            return await self.process_claude(query, model, system_prompt, max_tokens)
306        elif model in ("gemini", "gemini-pro"):
307            return await self.process_gemini(query, model, system_prompt, max_tokens)
308        else:
309            raise ValueError(f"Unknown model: {model}")
310
311    async def process_claude(self, query: str, model: str, system_prompt: str, max_tokens: int) -> str:
312        """Process query with Claude (Anthropic)"""
313        # Map model names to Vertex AI model IDs
314        model_ids = {
315            "sonnet": "claude-sonnet-4-5@20250929",
316            "opus": "claude-opus-4-5@20251101",
317        }
318
319        # System prompt with caching
320        system = [
321            {
322                "type": "text",
323                "text": system_prompt,
324                "cache_control": {"type": "ephemeral"},
325            }
326        ]
327
328        # Call Claude API via Vertex AI
329        response = self.anthropic_client.messages.create(
330            model=model_ids[model],
331            max_tokens=max_tokens,
332            system=system,
333            messages=[{"role": "user", "content": query}],
334        )
335
336        result = response.content[0].text
337
338        # Log cache usage
339        usage = response.usage
340        log.info(
341            f"Claude usage - Input: {usage.input_tokens}, "
342            f"Cache creation: {getattr(usage, 'cache_creation_input_tokens', 0)}, "
343            f"Cache read: {getattr(usage, 'cache_read_input_tokens', 0)}, "
344            f"Output: {usage.output_tokens}"
345        )
346
347        return result
348
349    async def process_gemini(self, query: str, model: str, system_prompt: str, max_tokens: int) -> str:
350        """Process query with Gemini"""
351        if not self.gemini_client:
352            raise RuntimeError("Gemini client not initialized")
353
354        # Map model names to Vertex AI model IDs
355        model_ids = {
356            "gemini": "gemini-3-flash-preview",
357            "gemini-pro": "gemini-3-pro-preview",
358        }
359
360        # Combine system prompt with query
361        full_prompt = f"{system_prompt}\n\nUser query: {query}"
362
363        # Call Gemini API via Vertex AI
364        # For Gemini 3 models, use thinking_config for reasoning depth
365        config = {
366            "max_output_tokens": max_tokens,
367            "temperature": 1.0,
368        }
369
370        # Add thinking config for Pro model (deep reasoning)
371        if model == "gemini-pro":
372            config["thinking_config"] = {"thinking_level": "high"}
373
374        response = self.gemini_client.models.generate_content(
375            model=model_ids[model],
376            contents=full_prompt,
377            config=config
378        )
379
380        result = response.text
381
382        log.info(f"Gemini ({model_ids[model]}) response generated successfully")
383
384        return result
385
386    async def save_to_file(self, query: str, result: str, filename: str, model: str = "Sonnet 4.5"):
387        """Save research result to org file"""
388        timestamp = datetime.now().strftime("%Y-%m-%d %a %H:%M")
389
390        entry = f"""* TODO Review research: {query}
391:PROPERTIES:
392:CREATED: [{timestamp}]
393:MODEL: {model}
394:END:
395
396** Query
397{query}
398
399** Result
400#+begin_src markdown
401{result}
402#+end_src
403
404"""
405
406        # Determine full path
407        if filename.startswith("/"):
408            # Absolute path
409            file_path = Path(filename)
410        else:
411            # Relative to inbox directory
412            file_path = self.inbox_path.parent / filename
413
414        # Append to file
415        file_path.parent.mkdir(parents=True, exist_ok=True)
416        with open(file_path, "a", encoding="utf-8") as f:
417            f.write(entry)
418
419        log.info(f"Saved to {file_path}")
420
421
422async def main():
423    """Main entry point"""
424    # Read configuration from environment
425    jid = os.getenv("XMPP_JID")
426    password = os.getenv("XMPP_PASSWORD")
427    owner_jid = os.getenv("XMPP_OWNER_JID")
428    project_id = os.getenv("VERTEX_PROJECT_ID")
429    region = os.getenv("VERTEX_REGION", "global")
430    inbox_path = os.getenv("INBOX_PATH", "/home/vincent/desktop/org/inbox.org")
431    commands_path = os.getenv("COMMANDS_PATH")
432    gemini_api_key = os.getenv("GEMINI_API_KEY")
433
434    if not all([jid, password, owner_jid, project_id]):
435        log.error("Missing required environment variables:")
436        log.error("  XMPP_JID, XMPP_PASSWORD, XMPP_OWNER_JID, VERTEX_PROJECT_ID")
437        sys.exit(1)
438
439    # Create and start bot
440    bot = ResearchBot(jid, password, owner_jid, project_id, region, inbox_path, commands_path, gemini_api_key)
441
442    log.info("Connecting to XMPP server...")
443    bot.connect()
444
445    # Process events - keep event loop running
446    # Auto-reconnect is handled by the disconnected/connection_failed event handlers
447    try:
448        while True:
449            await asyncio.sleep(1)
450    except KeyboardInterrupt:
451        log.info("Shutting down...")
452        bot.disconnect()
453        await bot.disconnected
454
455
456if __name__ == "__main__":
457    asyncio.run(main())