main
1#!/usr/bin/env python3
2"""
3XMPP Research Bot - Automated research assistant via XMPP
4
5Listens for dynamic commands and uses Claude/Gemini APIs to generate responses.
6Results are saved to inbox.org for later review.
7"""
8
9import asyncio
10import logging
11import os
12import sys
13import yaml
14from datetime import datetime
15from pathlib import Path
16from typing import Dict, Any
17
18import slixmpp
19from anthropic import AnthropicVertex
20
21try:
22 from google import genai
23 GEMINI_AVAILABLE = True
24except ImportError:
25 GEMINI_AVAILABLE = False
26 logging.warning("google-genai not available, Gemini support disabled")
27
28# Configure logging
29logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
30log = logging.getLogger(__name__)
31
32
33# Default commands configuration
34DEFAULT_COMMANDS = {
35 "research": {
36 "description": "Research assistant for quick, accurate queries",
37 "system_prompt": """You are a research assistant helping with quick, accurate research queries.
38
39Your task is to provide concise, well-structured research summaries that can be saved as notes.
40
41Guidelines:
42- Provide factual, accurate information
43- Structure responses with clear headings
44- Include relevant sources or references when possible
45- Keep responses focused and actionable
46- Use markdown formatting
47- Aim for 200-500 words unless more detail is requested""",
48 "default_model": "sonnet",
49 "max_tokens": 2000,
50 "save_to": "inbox.org",
51 }
52}
53
54
55class ResearchBot(slixmpp.ClientXMPP):
56 """XMPP bot that performs research using Claude (Vertex AI) and Gemini (Developer API)"""
57
58 def __init__(self, jid, password, owner_jid, project_id, region, inbox_path, commands_path=None, gemini_api_key=None):
59 super().__init__(jid, password)
60 self.owner_jid = owner_jid
61 self.inbox_path = Path(inbox_path)
62 self.project_id = project_id
63 self.region = region
64 self.commands_path = commands_path
65
66 # Initialize Vertex AI client for Claude
67 self.anthropic_client = AnthropicVertex(project_id=project_id, region=region)
68
69 # Initialize Gemini client if available
70 # Uses direct Gemini Developer API with API key (not Vertex AI)
71 self.gemini_client = None
72 if GEMINI_AVAILABLE and gemini_api_key:
73 try:
74 self.gemini_client = genai.Client(api_key=gemini_api_key)
75 log.info("Gemini client initialized successfully (Developer API)")
76 except Exception as e:
77 log.warning(f"Failed to initialize Gemini client: {e}")
78 elif GEMINI_AVAILABLE:
79 log.warning("Gemini API key not provided, Gemini support disabled")
80
81 # Load commands configuration
82 self.commands = self.load_commands()
83
84 # Register plugins
85 self.register_plugin('xep_0030') # Service Discovery
86 self.register_plugin('xep_0199') # XMPP Ping
87
88 # Register event handlers
89 self.add_event_handler("session_start", self.on_session_start)
90 self.add_event_handler("message", self.on_message)
91 self.add_event_handler("disconnected", self.on_disconnected)
92 self.add_event_handler("connection_failed", self.on_connection_failed)
93 self.add_event_handler("session_end", self.on_session_end)
94
95 log.info(f"Bot initialized: {jid}")
96 log.info(f"Owner: {owner_jid}")
97 log.info(f"Vertex AI: {project_id} / {region}")
98 log.info(f"Inbox: {inbox_path}")
99 log.info(f"Commands loaded: {', '.join(self.commands.keys())}")
100
101 def load_commands(self) -> Dict[str, Any]:
102 """Load commands from YAML file or use defaults"""
103 if self.commands_path and Path(self.commands_path).exists():
104 try:
105 with open(self.commands_path, 'r') as f:
106 loaded = yaml.safe_load(f)
107 if loaded and 'commands' in loaded:
108 log.info(f"Loaded {len(loaded['commands'])} commands from {self.commands_path}")
109 return loaded['commands']
110 except Exception as e:
111 log.error(f"Failed to load commands from {self.commands_path}: {e}")
112
113 log.info("Using default commands configuration")
114 return DEFAULT_COMMANDS
115
116 async def on_session_start(self, event):
117 """Called when XMPP session starts"""
118 self.send_presence()
119 await self.get_roster()
120 log.info("Session started, presence sent")
121
122 async def on_session_end(self, event):
123 """Called when XMPP session ends"""
124 log.warning("Session ended")
125
126 async def on_disconnected(self, event):
127 """Called when disconnected from XMPP server"""
128 log.warning(f"Disconnected from XMPP server: {event}")
129
130 async def on_connection_failed(self, event):
131 """Called when connection to XMPP server fails"""
132 log.error(f"Connection failed: {event}")
133
134 async def on_message(self, msg):
135 """Handle incoming messages"""
136 # Only respond to chat messages from owner
137 if msg["type"] not in ("chat", "normal"):
138 return
139
140 sender = str(msg["from"]).split("/")[0] # Remove resource
141 if sender != self.owner_jid:
142 log.warning(f"Ignoring message from non-owner: {sender}")
143 return
144
145 body = msg["body"].strip()
146 log.info(f"Received from {sender}: {body}")
147
148 # Check if message starts with /
149 if not body.startswith("/"):
150 help_msg = """I'm a research assistant bot! 🤖
151
152Use commands like:
153• /research <query> - Research a topic
154• /help - Show all commands
155
156Example: /research how does XMPP work?"""
157 msg.reply(help_msg).send()
158 return
159
160 # Parse command and arguments
161 parts = body[1:].split(None, 1)
162 command = parts[0]
163 args = parts[1] if len(parts) > 1 else ""
164
165 # Handle built-in commands
166 if command == "help":
167 await self.cmd_help(msg)
168 elif command == "ping":
169 await self.cmd_ping(msg)
170 elif command == "reload-commands":
171 await self.cmd_reload_commands(msg)
172 elif command in self.commands:
173 await self.cmd_dynamic(msg, command, args)
174 else:
175 msg.reply(f"Unknown command: /{command}\nType /help for available commands").send()
176
177 async def cmd_help(self, msg):
178 """Show help message with all available commands"""
179 help_lines = ["Available commands:"]
180
181 # Dynamic commands
182 for cmd_name, cmd_config in self.commands.items():
183 desc = cmd_config.get('description', 'No description')
184 help_lines.append(f"/{cmd_name} <query> - {desc}")
185 if cmd_name == "research": # Show model selection for research
186 help_lines.append(" Model selection:")
187 help_lines.append(" opus: or o: - Use Claude Opus 4.5 (most intelligent)")
188 help_lines.append(" sonnet: or s: - Use Claude Sonnet 4.5 (default, faster)")
189 if self.gemini_client:
190 help_lines.append(" gemini-pro: or gp: - Use Gemini 3 Pro (advanced reasoning)")
191 help_lines.append(" gemini: or g: - Use Gemini 3 Flash (fast, cost-effective)")
192 help_lines.append(" Examples:")
193 help_lines.append(" /research how does XMPP work?")
194 help_lines.append(" /research opus: analyze distributed systems complexity")
195 help_lines.append(" /research gp: solve complex reasoning problem")
196
197 # Built-in commands
198 help_lines.append("/help - Show this help message")
199 help_lines.append("/ping - Check if bot is alive")
200 help_lines.append("/reload-commands - Reload commands from YAML file")
201
202 msg.reply("\n".join(help_lines)).send()
203
204 async def cmd_ping(self, msg):
205 """Ping command"""
206 msg.reply("🤖 Pong! Bot is alive.").send()
207
208 async def cmd_reload_commands(self, msg):
209 """Reload commands from YAML file"""
210 old_count = len(self.commands)
211 self.commands = self.load_commands()
212 new_count = len(self.commands)
213 msg.reply(f"✅ Reloaded commands: {old_count} → {new_count}").send()
214 log.info("Commands reloaded via /reload-commands")
215
216 async def cmd_dynamic(self, msg, command: str, args: str):
217 """Execute a dynamic command from YAML configuration"""
218 if not args:
219 usage = self.commands[command].get('usage', f'Usage: /{command} <query>')
220 msg.reply(usage).send()
221 return
222
223 cmd_config = self.commands[command]
224 query = args.strip()
225
226 # Parse model selection (format: "model: query")
227 model = cmd_config.get('default_model', 'sonnet')
228 model_name = self.get_model_display_name(model)
229
230 if query.startswith("opus:") or query.startswith("o:"):
231 model = "opus"
232 model_name = "Opus 4.5"
233 query = query.split(":", 1)[1].strip()
234 elif query.startswith("sonnet:") or query.startswith("s:"):
235 model = "sonnet"
236 model_name = "Sonnet 4.5"
237 query = query.split(":", 1)[1].strip()
238 elif query.startswith("gemini:") or query.startswith("g:"):
239 if not self.gemini_client:
240 msg.reply("❌ Gemini is not available (client not initialized)").send()
241 return
242 model = "gemini"
243 model_name = "Gemini 3 Flash"
244 query = query.split(":", 1)[1].strip()
245 elif query.startswith("gemini-pro:") or query.startswith("gp:"):
246 if not self.gemini_client:
247 msg.reply("❌ Gemini is not available (client not initialized)").send()
248 return
249 model = "gemini-pro"
250 model_name = "Gemini 3 Pro"
251 query = query.split(":", 1)[1].strip()
252
253 # Acknowledge we picked up the work
254 msg.reply(f"🔍 {cmd_config.get('description', 'Processing')} with {model_name}: {query}").send()
255
256 try:
257 # Perform research/processing
258 result = await self.process_query(
259 query=query,
260 model=model,
261 system_prompt=cmd_config.get('system_prompt', ''),
262 max_tokens=cmd_config.get('max_tokens', 2000)
263 )
264
265 # Save to file if configured
266 save_to = cmd_config.get('save_to')
267 if save_to:
268 await self.save_to_file(query, result, filename=save_to, model=model_name)
269 msg.reply(f"✅ Complete! Saved to {save_to}\n\nPreview:\n{result[:200]}...").send()
270 else:
271 msg.reply(f"✅ Complete!\n\n{result}").send()
272
273 log.info(f"/{command} completed for: {query} (model: {model})")
274
275 except Exception as e:
276 log.error(f"/{command} failed: {e}", exc_info=True)
277 msg.reply(f"❌ Failed: {str(e)}").send()
278
279 def get_model_display_name(self, model: str) -> str:
280 """Get human-readable model name"""
281 model_names = {
282 "sonnet": "Sonnet 4.5",
283 "opus": "Opus 4.5",
284 "gemini": "Gemini 3 Flash",
285 "gemini-pro": "Gemini 3 Pro",
286 }
287 return model_names.get(model, model.title())
288
289 async def process_query(self, query: str, model: str, system_prompt: str, max_tokens: int) -> str:
290 """
291 Process query using specified model.
292
293 Args:
294 query: Question to answer
295 model: Model to use - "sonnet", "opus", or "gemini"
296 system_prompt: System prompt for the model
297 max_tokens: Maximum tokens to generate
298
299 Returns:
300 Generated response text
301 """
302 log.info(f"Processing query with {model}: {query}")
303
304 if model in ("sonnet", "opus"):
305 return await self.process_claude(query, model, system_prompt, max_tokens)
306 elif model in ("gemini", "gemini-pro"):
307 return await self.process_gemini(query, model, system_prompt, max_tokens)
308 else:
309 raise ValueError(f"Unknown model: {model}")
310
311 async def process_claude(self, query: str, model: str, system_prompt: str, max_tokens: int) -> str:
312 """Process query with Claude (Anthropic)"""
313 # Map model names to Vertex AI model IDs
314 model_ids = {
315 "sonnet": "claude-sonnet-4-5@20250929",
316 "opus": "claude-opus-4-5@20251101",
317 }
318
319 # System prompt with caching
320 system = [
321 {
322 "type": "text",
323 "text": system_prompt,
324 "cache_control": {"type": "ephemeral"},
325 }
326 ]
327
328 # Call Claude API via Vertex AI
329 response = self.anthropic_client.messages.create(
330 model=model_ids[model],
331 max_tokens=max_tokens,
332 system=system,
333 messages=[{"role": "user", "content": query}],
334 )
335
336 result = response.content[0].text
337
338 # Log cache usage
339 usage = response.usage
340 log.info(
341 f"Claude usage - Input: {usage.input_tokens}, "
342 f"Cache creation: {getattr(usage, 'cache_creation_input_tokens', 0)}, "
343 f"Cache read: {getattr(usage, 'cache_read_input_tokens', 0)}, "
344 f"Output: {usage.output_tokens}"
345 )
346
347 return result
348
349 async def process_gemini(self, query: str, model: str, system_prompt: str, max_tokens: int) -> str:
350 """Process query with Gemini"""
351 if not self.gemini_client:
352 raise RuntimeError("Gemini client not initialized")
353
354 # Map model names to Vertex AI model IDs
355 model_ids = {
356 "gemini": "gemini-3-flash-preview",
357 "gemini-pro": "gemini-3-pro-preview",
358 }
359
360 # Combine system prompt with query
361 full_prompt = f"{system_prompt}\n\nUser query: {query}"
362
363 # Call Gemini API via Vertex AI
364 # For Gemini 3 models, use thinking_config for reasoning depth
365 config = {
366 "max_output_tokens": max_tokens,
367 "temperature": 1.0,
368 }
369
370 # Add thinking config for Pro model (deep reasoning)
371 if model == "gemini-pro":
372 config["thinking_config"] = {"thinking_level": "high"}
373
374 response = self.gemini_client.models.generate_content(
375 model=model_ids[model],
376 contents=full_prompt,
377 config=config
378 )
379
380 result = response.text
381
382 log.info(f"Gemini ({model_ids[model]}) response generated successfully")
383
384 return result
385
386 async def save_to_file(self, query: str, result: str, filename: str, model: str = "Sonnet 4.5"):
387 """Save research result to org file"""
388 timestamp = datetime.now().strftime("%Y-%m-%d %a %H:%M")
389
390 entry = f"""* TODO Review research: {query}
391:PROPERTIES:
392:CREATED: [{timestamp}]
393:MODEL: {model}
394:END:
395
396** Query
397{query}
398
399** Result
400#+begin_src markdown
401{result}
402#+end_src
403
404"""
405
406 # Determine full path
407 if filename.startswith("/"):
408 # Absolute path
409 file_path = Path(filename)
410 else:
411 # Relative to inbox directory
412 file_path = self.inbox_path.parent / filename
413
414 # Append to file
415 file_path.parent.mkdir(parents=True, exist_ok=True)
416 with open(file_path, "a", encoding="utf-8") as f:
417 f.write(entry)
418
419 log.info(f"Saved to {file_path}")
420
421
422async def main():
423 """Main entry point"""
424 # Read configuration from environment
425 jid = os.getenv("XMPP_JID")
426 password = os.getenv("XMPP_PASSWORD")
427 owner_jid = os.getenv("XMPP_OWNER_JID")
428 project_id = os.getenv("VERTEX_PROJECT_ID")
429 region = os.getenv("VERTEX_REGION", "global")
430 inbox_path = os.getenv("INBOX_PATH", "/home/vincent/desktop/org/inbox.org")
431 commands_path = os.getenv("COMMANDS_PATH")
432 gemini_api_key = os.getenv("GEMINI_API_KEY")
433
434 if not all([jid, password, owner_jid, project_id]):
435 log.error("Missing required environment variables:")
436 log.error(" XMPP_JID, XMPP_PASSWORD, XMPP_OWNER_JID, VERTEX_PROJECT_ID")
437 sys.exit(1)
438
439 # Create and start bot
440 bot = ResearchBot(jid, password, owner_jid, project_id, region, inbox_path, commands_path, gemini_api_key)
441
442 log.info("Connecting to XMPP server...")
443 bot.connect()
444
445 # Process events - keep event loop running
446 # Auto-reconnect is handled by the disconnected/connection_failed event handlers
447 try:
448 while True:
449 await asyncio.sleep(1)
450 except KeyboardInterrupt:
451 log.info("Shutting down...")
452 bot.disconnect()
453 await bot.disconnected
454
455
456if __name__ == "__main__":
457 asyncio.run(main())