402 lines
16 KiB
Python
402 lines
16 KiB
Python
"""
|
|
Zulip bot service for handling interactions with Zulip.
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import logging
|
|
import threading
|
|
import time
|
|
import hashlib
|
|
import tempfile
|
|
from typing import Optional, List, Dict, Any
|
|
|
|
import zulip
|
|
from app.db.chroma_service import ChromaDBService
|
|
from app.utils.ai_service import GeminiService
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger("bot_service")
|
|
|
|
class ZulipBotService:
|
|
"""Service for handling Zulip bot interactions."""
|
|
|
|
# Singleton instance
|
|
_instance = None
|
|
_lock = threading.Lock()
|
|
_process_id = os.getpid() # Store the process ID when this module is loaded
|
|
|
|
def __new__(cls, *args, **kwargs):
|
|
with cls._lock:
|
|
current_pid = os.getpid()
|
|
if cls._instance is None or cls._process_id != current_pid:
|
|
logger.info(f"Creating new ZulipBotService singleton instance for process {current_pid}")
|
|
cls._instance = super(ZulipBotService, cls).__new__(cls)
|
|
cls._instance._initialized = False
|
|
cls._process_id = current_pid # Update the stored process ID
|
|
return cls._instance
|
|
|
|
def __init__(self,
|
|
email: Optional[str] = None,
|
|
api_key: Optional[str] = None,
|
|
site: Optional[str] = None,
|
|
chroma_service: Optional[ChromaDBService] = None,
|
|
ai_service: Optional[GeminiService] = None):
|
|
"""Initialize the Zulip bot service."""
|
|
with self._lock:
|
|
# Skip initialization if already initialized (singleton pattern)
|
|
if self._initialized:
|
|
return
|
|
|
|
# Load config from environment variables if not provided
|
|
self.email = email or os.getenv("ZULIP_BOT_EMAIL")
|
|
self.api_key = api_key or os.getenv("ZULIP_BOT_API_KEY")
|
|
self.site = site or os.getenv("ZULIP_SITE")
|
|
|
|
if not all([self.email, self.api_key, self.site]):
|
|
raise ValueError("Missing Zulip configuration. Set ZULIP_BOT_EMAIL, ZULIP_BOT_API_KEY, and ZULIP_SITE env variables.")
|
|
|
|
# Initialize Zulip client
|
|
self.client = zulip.Client(
|
|
email=self.email,
|
|
api_key=self.api_key,
|
|
site=self.site
|
|
)
|
|
|
|
# Initialize services
|
|
self.chroma_service = chroma_service or ChromaDBService()
|
|
self.ai_service = ai_service or GeminiService()
|
|
|
|
# Thread for message handling
|
|
self.thread = None
|
|
self.running = False
|
|
|
|
# Simple set to track processed message IDs
|
|
self.processed_message_ids = set()
|
|
|
|
# Bot identification pattern - exact match for IT_Bot mention in Zulip format
|
|
self.bot_mention_pattern = re.compile(r'@\*\*IT_Bot\*\*')
|
|
|
|
# Default response for empty queries
|
|
self.default_response = "Hello. If you have a technical question, please ask. If you require assistance with non-technical matters, please contact IT support."
|
|
|
|
# Track backoff state for rate limiting
|
|
self._backoff_time = 1 # Start with 1 second backoff
|
|
self._consecutive_rate_limit_errors = 0
|
|
self._max_backoff_time = 60 # Maximum backoff of 60 seconds
|
|
|
|
# Mark as initialized
|
|
self._initialized = True
|
|
|
|
logger.info("Initialized ZulipBotService")
|
|
|
|
def start(self):
|
|
"""Start the bot service in a separate thread."""
|
|
with self._lock:
|
|
if self.thread and self.thread.is_alive():
|
|
logger.warning("Bot service is already running")
|
|
return
|
|
|
|
self.running = True
|
|
self.thread = threading.Thread(target=self._message_loop)
|
|
self.thread.daemon = True
|
|
self.thread.start()
|
|
logger.info("Started ZulipBotService")
|
|
|
|
def stop(self):
|
|
"""Stop the bot service."""
|
|
with self._lock:
|
|
if not self.thread or not self.thread.is_alive():
|
|
logger.warning("Bot service is not running")
|
|
return
|
|
|
|
self.running = False
|
|
self.thread.join(timeout=5.0)
|
|
logger.info("Stopped ZulipBotService")
|
|
|
|
def _message_loop(self):
|
|
"""Main message handling loop."""
|
|
# How far back to check for mentions (in seconds)
|
|
# Default to 60 seconds, but can be adjusted
|
|
lookback_period = 60
|
|
|
|
while self.running:
|
|
try:
|
|
# Get messages that mention the bot
|
|
new_messages = self._check_for_mentions(lookback_period)
|
|
|
|
# Process new messages
|
|
for message in new_messages:
|
|
self._process_message(message)
|
|
# Add a small delay between processing messages
|
|
time.sleep(0.5)
|
|
|
|
# Clean up old processed message IDs periodically
|
|
if len(self.processed_message_ids) > 1000:
|
|
self.processed_message_ids = set(list(self.processed_message_ids)[-1000:])
|
|
|
|
# Wait before checking again (reduces API usage)
|
|
time.sleep(5.0)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in message loop: {str(e)}")
|
|
# Apply backoff on errors to avoid hammering the API
|
|
if "API usage exceeded rate limit" in str(e):
|
|
self._consecutive_rate_limit_errors += 1
|
|
backoff_time = min(self._backoff_time * 2, self._max_backoff_time)
|
|
logger.info(f"Rate limit hit, backing off for {backoff_time} seconds")
|
|
time.sleep(backoff_time)
|
|
self._backoff_time = backoff_time
|
|
else:
|
|
# For other errors, just wait a bit
|
|
time.sleep(3)
|
|
|
|
def _check_for_mentions(self, lookback_period):
|
|
"""
|
|
Check for new messages that mention the bot.
|
|
|
|
Args:
|
|
lookback_period: How far back to check for mentions (in seconds)
|
|
|
|
Returns:
|
|
List of messages that mention the bot
|
|
"""
|
|
# Calculate the timestamp for the lookback period
|
|
lookback_timestamp = int(time.time() - lookback_period)
|
|
|
|
try:
|
|
# If we've had rate limit errors, apply backoff
|
|
if self._consecutive_rate_limit_errors > 0:
|
|
backoff_delay = min(self._backoff_time, self._max_backoff_time)
|
|
logger.info(f"Rate limit backoff: waiting {backoff_delay} seconds before API call")
|
|
time.sleep(backoff_delay)
|
|
|
|
# Get all messages that mention the bot
|
|
# Use the request endpoint for more control
|
|
request = {
|
|
"anchor": "newest",
|
|
"num_before": 100,
|
|
"num_after": 0,
|
|
"narrow": [
|
|
{"operator": "is", "operand": "mentioned"},
|
|
{"operator": "streams", "operand": "public"}
|
|
],
|
|
"client_gravatar": False,
|
|
"apply_markdown": False
|
|
}
|
|
|
|
result = self.client.get_messages(request)
|
|
|
|
# Reset backoff if request was successful
|
|
if result.get("result") == "success":
|
|
if self._consecutive_rate_limit_errors > 0:
|
|
logger.info("Successful API call, resetting rate limit backoff")
|
|
self._consecutive_rate_limit_errors = 0
|
|
self._backoff_time = 1
|
|
else:
|
|
logger.error(f"Failed to get messages: {result.get('msg', 'Unknown error')}")
|
|
return []
|
|
|
|
# Filter messages
|
|
new_messages = []
|
|
for message in result.get("messages", []):
|
|
# Skip if we've already processed this message
|
|
if message["id"] in self.processed_message_ids:
|
|
continue
|
|
|
|
# Skip messages not sent after our lookback time
|
|
if message.get("timestamp", 0) < lookback_timestamp:
|
|
continue
|
|
|
|
# Skip messages from the bot itself
|
|
if message.get("sender_email") == self.email:
|
|
continue
|
|
|
|
# Check if the bot is actually mentioned in the content
|
|
if self.bot_mention_pattern.search(message.get("content", "")):
|
|
# Add to processed set and new message list
|
|
self.processed_message_ids.add(message["id"])
|
|
new_messages.append(message)
|
|
|
|
if new_messages:
|
|
logger.info(f"Found {len(new_messages)} new mention(s) of the bot")
|
|
|
|
return new_messages
|
|
|
|
except Exception as e:
|
|
if "API usage exceeded rate limit" in str(e):
|
|
self._consecutive_rate_limit_errors += 1
|
|
self._backoff_time = min(self._backoff_time * 2, self._max_backoff_time)
|
|
logger.error(f"Error checking for mentions: {str(e)} (backoff: {self._backoff_time}s)")
|
|
else:
|
|
logger.error(f"Error checking for mentions: {str(e)}")
|
|
return []
|
|
|
|
def _process_message(self, message):
|
|
"""
|
|
Process a message and send a response.
|
|
|
|
Args:
|
|
message: The message to process.
|
|
"""
|
|
try:
|
|
# Extract content
|
|
content = message.get("content", "")
|
|
|
|
# Log detailed information
|
|
logger.info(f"Processing message ID: {message.get('id')}")
|
|
|
|
# Extract user query (remove the bot mention)
|
|
query = self.bot_mention_pattern.sub("", content).strip()
|
|
|
|
# Log the incoming message
|
|
logger.info(f"Extracted query: {query[:50]}...")
|
|
|
|
# If query is empty, provide the default response
|
|
if not query:
|
|
logger.info(f"Empty query received, sending default response")
|
|
self._send_response(message, self.default_response)
|
|
return
|
|
|
|
# Retrieve relevant context from ChromaDB
|
|
context = self._retrieve_context(query)
|
|
|
|
# Generate response using the AI service
|
|
response_text = self.ai_service.generate_response(query, context)
|
|
|
|
# Send the response
|
|
self._send_response(message, response_text)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing message: {str(e)}")
|
|
self._send_response(message,
|
|
"I apologize, but I encountered an error while processing your request. "
|
|
"Please try again or contact the IT support team if the issue persists.")
|
|
|
|
def _retrieve_context(self, query, n_results=40):
|
|
"""
|
|
Retrieve relevant context from ChromaDB with enhanced relevance.
|
|
|
|
Args:
|
|
query: The user's query.
|
|
n_results: Number of results to retrieve.
|
|
|
|
Returns:
|
|
A list of relevant context documents.
|
|
"""
|
|
try:
|
|
# Search for similar documents in ChromaDB
|
|
search_results = self.chroma_service.search_similar(query, n_results=n_results)
|
|
|
|
if not search_results:
|
|
logger.warning(f"No context found for query: {query[:50]}...")
|
|
return []
|
|
|
|
# Extract documents and metadata
|
|
documents = []
|
|
|
|
# Check if there are documents in the results
|
|
if search_results.get("documents") and len(search_results.get("documents", [])) > 0:
|
|
# Get the documents and their metadata
|
|
docs = search_results.get("documents", [[]])[0]
|
|
metas = search_results.get("metadatas", [[]])[0]
|
|
|
|
# Calculate a simple relevance score for each document based on position
|
|
relevance_scores = []
|
|
for i, (doc, metadata) in enumerate(zip(docs, metas)):
|
|
# Create a document with its metadata
|
|
if isinstance(doc, list) and len(doc) > 0:
|
|
doc = doc[0] # Handle nested lists
|
|
|
|
# Include relevance position in metadata
|
|
if metadata:
|
|
metadata["relevance_position"] = i + 1
|
|
|
|
# Store document with enhanced metadata
|
|
documents.append({
|
|
"content": doc,
|
|
"metadata": metadata,
|
|
})
|
|
|
|
logger.info(f"Retrieved {len(documents)} context documents for query: {query[:30]}...")
|
|
return documents
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error retrieving context: {str(e)}")
|
|
return []
|
|
|
|
def _send_response(self, original_message, response_text):
|
|
"""
|
|
Send a response to a message.
|
|
|
|
Args:
|
|
original_message: The original message being responded to.
|
|
response_text: The text of the response to send.
|
|
"""
|
|
try:
|
|
message_type = original_message.get("type")
|
|
if message_type == "stream":
|
|
# For stream messages, respond in the same stream and topic
|
|
response = {
|
|
"type": "stream",
|
|
"to": original_message.get("display_recipient"),
|
|
"subject": original_message.get("subject"),
|
|
"content": response_text
|
|
}
|
|
else:
|
|
# For private messages, respond to the sender
|
|
response = {
|
|
"type": "private",
|
|
"to": [original_message.get("sender_email")],
|
|
"content": response_text
|
|
}
|
|
|
|
result = self.client.send_message(response)
|
|
|
|
if result.get("result") != "success":
|
|
error_msg = result.get("msg", "Unknown error")
|
|
logger.error(f"Failed to send response: {error_msg}")
|
|
else:
|
|
logger.info(f"Sent response to message: {original_message.get('id')}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error sending response: {str(e)}")
|
|
|
|
def send_test_message(self, recipient, content):
|
|
"""
|
|
Send a test message to verify the bot is working.
|
|
|
|
Args:
|
|
recipient: The recipient of the message (email for private, channel name for stream).
|
|
content: The content of the message.
|
|
|
|
Returns:
|
|
The result of the API call.
|
|
"""
|
|
if "@" in recipient:
|
|
# Private message
|
|
message = {
|
|
"type": "private",
|
|
"to": [recipient],
|
|
"content": content
|
|
}
|
|
else:
|
|
# Stream message
|
|
message = {
|
|
"type": "stream",
|
|
"to": recipient,
|
|
"subject": "Bot Test",
|
|
"content": content
|
|
}
|
|
|
|
result = self.client.send_message(message)
|
|
logger.info(f"Sent test message to {recipient}, result: {result.get('result')}")
|
|
return result
|
|
|
|
def reset_cache(self):
|
|
"""Reset message cache."""
|
|
with self._lock:
|
|
logger.info("Resetting message caches")
|
|
self.processed_message_ids = set()
|
|
return "Message cache reset successfully" |