zulip_bot/app/utils/bot_service.py
2025-05-16 18:00:22 +04:00

402 lines
16 KiB
Python

"""
Zulip bot service for handling interactions with Zulip.
"""
import os
import re
import logging
import threading
import time
import hashlib
import tempfile
from typing import Optional, List, Dict, Any
import zulip
from app.db.chroma_service import ChromaDBService
from app.utils.ai_service import GeminiService
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("bot_service")
class ZulipBotService:
"""Service for handling Zulip bot interactions."""
# Singleton instance
_instance = None
_lock = threading.Lock()
_process_id = os.getpid() # Store the process ID when this module is loaded
def __new__(cls, *args, **kwargs):
with cls._lock:
current_pid = os.getpid()
if cls._instance is None or cls._process_id != current_pid:
logger.info(f"Creating new ZulipBotService singleton instance for process {current_pid}")
cls._instance = super(ZulipBotService, cls).__new__(cls)
cls._instance._initialized = False
cls._process_id = current_pid # Update the stored process ID
return cls._instance
def __init__(self,
email: Optional[str] = None,
api_key: Optional[str] = None,
site: Optional[str] = None,
chroma_service: Optional[ChromaDBService] = None,
ai_service: Optional[GeminiService] = None):
"""Initialize the Zulip bot service."""
with self._lock:
# Skip initialization if already initialized (singleton pattern)
if self._initialized:
return
# Load config from environment variables if not provided
self.email = email or os.getenv("ZULIP_BOT_EMAIL")
self.api_key = api_key or os.getenv("ZULIP_BOT_API_KEY")
self.site = site or os.getenv("ZULIP_SITE")
if not all([self.email, self.api_key, self.site]):
raise ValueError("Missing Zulip configuration. Set ZULIP_BOT_EMAIL, ZULIP_BOT_API_KEY, and ZULIP_SITE env variables.")
# Initialize Zulip client
self.client = zulip.Client(
email=self.email,
api_key=self.api_key,
site=self.site
)
# Initialize services
self.chroma_service = chroma_service or ChromaDBService()
self.ai_service = ai_service or GeminiService()
# Thread for message handling
self.thread = None
self.running = False
# Simple set to track processed message IDs
self.processed_message_ids = set()
# Bot identification pattern - exact match for IT_Bot mention in Zulip format
self.bot_mention_pattern = re.compile(r'@\*\*IT_Bot\*\*')
# Default response for empty queries
self.default_response = "Hello. If you have a technical question, please ask. If you require assistance with non-technical matters, please contact IT support."
# Track backoff state for rate limiting
self._backoff_time = 1 # Start with 1 second backoff
self._consecutive_rate_limit_errors = 0
self._max_backoff_time = 60 # Maximum backoff of 60 seconds
# Mark as initialized
self._initialized = True
logger.info("Initialized ZulipBotService")
def start(self):
"""Start the bot service in a separate thread."""
with self._lock:
if self.thread and self.thread.is_alive():
logger.warning("Bot service is already running")
return
self.running = True
self.thread = threading.Thread(target=self._message_loop)
self.thread.daemon = True
self.thread.start()
logger.info("Started ZulipBotService")
def stop(self):
"""Stop the bot service."""
with self._lock:
if not self.thread or not self.thread.is_alive():
logger.warning("Bot service is not running")
return
self.running = False
self.thread.join(timeout=5.0)
logger.info("Stopped ZulipBotService")
def _message_loop(self):
"""Main message handling loop."""
# How far back to check for mentions (in seconds)
# Default to 60 seconds, but can be adjusted
lookback_period = 60
while self.running:
try:
# Get messages that mention the bot
new_messages = self._check_for_mentions(lookback_period)
# Process new messages
for message in new_messages:
self._process_message(message)
# Add a small delay between processing messages
time.sleep(0.5)
# Clean up old processed message IDs periodically
if len(self.processed_message_ids) > 1000:
self.processed_message_ids = set(list(self.processed_message_ids)[-1000:])
# Wait before checking again (reduces API usage)
time.sleep(5.0)
except Exception as e:
logger.error(f"Error in message loop: {str(e)}")
# Apply backoff on errors to avoid hammering the API
if "API usage exceeded rate limit" in str(e):
self._consecutive_rate_limit_errors += 1
backoff_time = min(self._backoff_time * 2, self._max_backoff_time)
logger.info(f"Rate limit hit, backing off for {backoff_time} seconds")
time.sleep(backoff_time)
self._backoff_time = backoff_time
else:
# For other errors, just wait a bit
time.sleep(3)
def _check_for_mentions(self, lookback_period):
"""
Check for new messages that mention the bot.
Args:
lookback_period: How far back to check for mentions (in seconds)
Returns:
List of messages that mention the bot
"""
# Calculate the timestamp for the lookback period
lookback_timestamp = int(time.time() - lookback_period)
try:
# If we've had rate limit errors, apply backoff
if self._consecutive_rate_limit_errors > 0:
backoff_delay = min(self._backoff_time, self._max_backoff_time)
logger.info(f"Rate limit backoff: waiting {backoff_delay} seconds before API call")
time.sleep(backoff_delay)
# Get all messages that mention the bot
# Use the request endpoint for more control
request = {
"anchor": "newest",
"num_before": 100,
"num_after": 0,
"narrow": [
{"operator": "is", "operand": "mentioned"},
{"operator": "streams", "operand": "public"}
],
"client_gravatar": False,
"apply_markdown": False
}
result = self.client.get_messages(request)
# Reset backoff if request was successful
if result.get("result") == "success":
if self._consecutive_rate_limit_errors > 0:
logger.info("Successful API call, resetting rate limit backoff")
self._consecutive_rate_limit_errors = 0
self._backoff_time = 1
else:
logger.error(f"Failed to get messages: {result.get('msg', 'Unknown error')}")
return []
# Filter messages
new_messages = []
for message in result.get("messages", []):
# Skip if we've already processed this message
if message["id"] in self.processed_message_ids:
continue
# Skip messages not sent after our lookback time
if message.get("timestamp", 0) < lookback_timestamp:
continue
# Skip messages from the bot itself
if message.get("sender_email") == self.email:
continue
# Check if the bot is actually mentioned in the content
if self.bot_mention_pattern.search(message.get("content", "")):
# Add to processed set and new message list
self.processed_message_ids.add(message["id"])
new_messages.append(message)
if new_messages:
logger.info(f"Found {len(new_messages)} new mention(s) of the bot")
return new_messages
except Exception as e:
if "API usage exceeded rate limit" in str(e):
self._consecutive_rate_limit_errors += 1
self._backoff_time = min(self._backoff_time * 2, self._max_backoff_time)
logger.error(f"Error checking for mentions: {str(e)} (backoff: {self._backoff_time}s)")
else:
logger.error(f"Error checking for mentions: {str(e)}")
return []
def _process_message(self, message):
"""
Process a message and send a response.
Args:
message: The message to process.
"""
try:
# Extract content
content = message.get("content", "")
# Log detailed information
logger.info(f"Processing message ID: {message.get('id')}")
# Extract user query (remove the bot mention)
query = self.bot_mention_pattern.sub("", content).strip()
# Log the incoming message
logger.info(f"Extracted query: {query[:50]}...")
# If query is empty, provide the default response
if not query:
logger.info(f"Empty query received, sending default response")
self._send_response(message, self.default_response)
return
# Retrieve relevant context from ChromaDB
context = self._retrieve_context(query)
# Generate response using the AI service
response_text = self.ai_service.generate_response(query, context)
# Send the response
self._send_response(message, response_text)
except Exception as e:
logger.error(f"Error processing message: {str(e)}")
self._send_response(message,
"I apologize, but I encountered an error while processing your request. "
"Please try again or contact the IT support team if the issue persists.")
def _retrieve_context(self, query, n_results=40):
"""
Retrieve relevant context from ChromaDB with enhanced relevance.
Args:
query: The user's query.
n_results: Number of results to retrieve.
Returns:
A list of relevant context documents.
"""
try:
# Search for similar documents in ChromaDB
search_results = self.chroma_service.search_similar(query, n_results=n_results)
if not search_results:
logger.warning(f"No context found for query: {query[:50]}...")
return []
# Extract documents and metadata
documents = []
# Check if there are documents in the results
if search_results.get("documents") and len(search_results.get("documents", [])) > 0:
# Get the documents and their metadata
docs = search_results.get("documents", [[]])[0]
metas = search_results.get("metadatas", [[]])[0]
# Calculate a simple relevance score for each document based on position
relevance_scores = []
for i, (doc, metadata) in enumerate(zip(docs, metas)):
# Create a document with its metadata
if isinstance(doc, list) and len(doc) > 0:
doc = doc[0] # Handle nested lists
# Include relevance position in metadata
if metadata:
metadata["relevance_position"] = i + 1
# Store document with enhanced metadata
documents.append({
"content": doc,
"metadata": metadata,
})
logger.info(f"Retrieved {len(documents)} context documents for query: {query[:30]}...")
return documents
except Exception as e:
logger.error(f"Error retrieving context: {str(e)}")
return []
def _send_response(self, original_message, response_text):
"""
Send a response to a message.
Args:
original_message: The original message being responded to.
response_text: The text of the response to send.
"""
try:
message_type = original_message.get("type")
if message_type == "stream":
# For stream messages, respond in the same stream and topic
response = {
"type": "stream",
"to": original_message.get("display_recipient"),
"subject": original_message.get("subject"),
"content": response_text
}
else:
# For private messages, respond to the sender
response = {
"type": "private",
"to": [original_message.get("sender_email")],
"content": response_text
}
result = self.client.send_message(response)
if result.get("result") != "success":
error_msg = result.get("msg", "Unknown error")
logger.error(f"Failed to send response: {error_msg}")
else:
logger.info(f"Sent response to message: {original_message.get('id')}")
except Exception as e:
logger.error(f"Error sending response: {str(e)}")
def send_test_message(self, recipient, content):
"""
Send a test message to verify the bot is working.
Args:
recipient: The recipient of the message (email for private, channel name for stream).
content: The content of the message.
Returns:
The result of the API call.
"""
if "@" in recipient:
# Private message
message = {
"type": "private",
"to": [recipient],
"content": content
}
else:
# Stream message
message = {
"type": "stream",
"to": recipient,
"subject": "Bot Test",
"content": content
}
result = self.client.send_message(message)
logger.info(f"Sent test message to {recipient}, result: {result.get('result')}")
return result
def reset_cache(self):
"""Reset message cache."""
with self._lock:
logger.info("Resetting message caches")
self.processed_message_ids = set()
return "Message cache reset successfully"