""" AI service for OpenAI API integration. This module provides a class for generating responses using the OpenAI API. It handles authentication, prompt engineering, error handling, and retries. """ import os import time import logging import hashlib import functools from datetime import datetime, timedelta from typing import List, Dict, Any, Optional, Tuple from openai import OpenAI # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("ai_service") # Simple in-memory cache for responses RESPONSE_CACHE = {} CACHE_TTL = 3600 # 1 hour in seconds class OpenAIService: """Service for generating responses using the OpenAI API.""" def __init__(self, api_key: Optional[str] = None, model_name: str = "gpt-4o", enable_cache: bool = True, cache_ttl: int = CACHE_TTL, rate_limit: int = 60): # 60 requests per minute """ Initialize the OpenAI service. Args: api_key: API key for OpenAI. If None, uses OPENAI_API_KEY environment variable. model_name: Name of the OpenAI model to use. enable_cache: Whether to enable response caching. cache_ttl: Time-to-live for cached responses in seconds. rate_limit: Maximum number of requests allowed per minute. """ self.api_key = api_key or os.getenv("OPENAI_API_KEY") if not self.api_key: raise ValueError("OpenAI API key not provided. Set OPENAI_API_KEY environment variable or pass api_key parameter.") self.model_name = model_name self.enable_cache = enable_cache self.cache_ttl = cache_ttl self.rate_limit = rate_limit # Rate limiting state self.request_timestamps = [] # Configure OpenAI API self.client = OpenAI(api_key=self.api_key) logger.info(f"Initialized OpenAIService with model: {model_name}") def _check_rate_limit(self): """ Check if the rate limit has been reached. Waits if necessary to stay within the rate limit. """ current_time = time.time() # Remove timestamps older than 60 seconds self.request_timestamps = [ts for ts in self.request_timestamps if current_time - ts < 60] # Check if we've reached the rate limit if len(self.request_timestamps) >= self.rate_limit: # Calculate how long to wait oldest_timestamp = min(self.request_timestamps) sleep_time = 60 - (current_time - oldest_timestamp) if sleep_time > 0: logger.warning(f"Rate limit reached. Waiting {sleep_time:.2f} seconds...") time.sleep(sleep_time) # Add current timestamp to the list self.request_timestamps.append(time.time()) def _detect_language(self, text: str) -> str: """ Detect the language of a text string. Args: text: The text to detect the language of. Returns: A language code, e.g. 'en' for English, 'ka' for Georgian. """ try: # Use a very small prompt to detect language if not text: return 'en' # Default to English for empty text # Simple language detection using a dedicated small request response = self.client.chat.completions.create( model=self.model_name, messages=[ {"role": "system", "content": "You are a language detection service. Respond with only the ISO language code ('en' for English, 'ka' for Georgian, etc.)."}, {"role": "user", "content": f"Detect the language of this text: {text[:100]}"} ], max_tokens=10, temperature=0 ) language_code = response.choices[0].message.content.strip().lower() logger.info(f"Detected language: {language_code}") # Validate and default to English for any issues if language_code not in ['en', 'ka']: return 'en' return language_code except Exception as e: logger.error(f"Error detecting language: {e}") return 'en' # Default to English on error def _generate_cache_key(self, query: str, context: List[Dict[str, Any]]) -> str: """ Generate a cache key for the query and context. Args: query: The query string. context: The context documents. Returns: A string hash key for caching. """ # Create a string representation of the context context_str = "" for doc in context: if 'content' in doc: context_str += doc['content'][:100] # Use just the beginning for performance # Create a hash of the query and context key_str = query + context_str return hashlib.md5(key_str.encode('utf-8')).hexdigest() def _get_cached_response(self, cache_key: str) -> Optional[str]: """ Get a cached response if available and not expired. Args: cache_key: The cache key. Returns: The cached response, or None if not found or expired. """ if not self.enable_cache: return None if cache_key in RESPONSE_CACHE: timestamp, response = RESPONSE_CACHE[cache_key] # Check if the cache entry has expired if time.time() - timestamp < self.cache_ttl: logger.info("Using cached response") return response # Remove expired cache entry del RESPONSE_CACHE[cache_key] return None def _cache_response(self, cache_key: str, response: str): """ Cache a response. Args: cache_key: The cache key. response: The response to cache. """ if not self.enable_cache: return RESPONSE_CACHE[cache_key] = (time.time(), response) # Clean up expired cache entries if cache is getting large if len(RESPONSE_CACHE) > 1000: # Arbitrary limit self._cleanup_cache() def _cleanup_cache(self): """Clean up expired cache entries.""" current_time = time.time() keys_to_delete = [] for key, (timestamp, _) in RESPONSE_CACHE.items(): if current_time - timestamp >= self.cache_ttl: keys_to_delete.append(key) for key in keys_to_delete: del RESPONSE_CACHE[key] logger.info(f"Cleaned up {len(keys_to_delete)} expired cache entries") def generate_response(self, query: str, context: List[Dict[str, Any]], max_retries: int = 3, temperature: float = 0.7) -> str: """ Generate a response using the OpenAI API. Args: query: The user's query. context: A list of relevant context documents from ChromaDB. Each document should be a dict with 'content' and 'metadata' keys. max_retries: Maximum number of retry attempts for API failures. temperature: Controls randomness in the response. Lower is more deterministic. Returns: The generated response text. """ # Check rate limit self._check_rate_limit() # Detect language language = self._detect_language(query) # Check cache cache_key = self._generate_cache_key(query, context) cached_response = self._get_cached_response(cache_key) if cached_response: return cached_response # Construct the messages using the context messages = self._construct_messages(query, context, language) # Try to generate response with retries retry_count = 0 while retry_count <= max_retries: try: logger.info(f"Attempting to generate response (attempt {retry_count+1}/{max_retries+1})") # Generate with OpenAI API response = self.client.chat.completions.create( model=self.model_name, messages=messages, temperature=temperature, max_tokens=4096, top_p=0.8 ) # Extract the response text response_text = response.choices[0].message.content # Cache the response self._cache_response(cache_key, response_text) # Return the response text return response_text except Exception as e: retry_count += 1 wait_time = 2 ** retry_count # Exponential backoff # Log more details about the error logger.error(f"API call error: {type(e).__name__}: {str(e)}") if retry_count <= max_retries: logger.warning(f"API call failed: {str(e)}. Retrying in {wait_time} seconds. (Attempt {retry_count}/{max_retries})") time.sleep(wait_time) else: logger.error(f"API call failed after {max_retries} retries: {str(e)}") # Return a simple response when all retries fail return f"I'm here to help with IT questions. However, I'm currently experiencing technical difficulties. Please try again later or contact IT support directly." def _construct_messages(self, query: str, context: List[Dict[str, Any]], language: str = 'en') -> List[Dict[str, str]]: """ Construct message list with the query and context. Args: query: The user's query. context: A list of relevant context documents from ChromaDB. language: The language code (e.g., 'en', 'ka'). Returns: List of message dictionaries for the OpenAI API. """ # System instruction based on language if language == 'ka': system_instruction = """როგორც IT_Bot, თქვენი როლია ორგანიზაციაში IT ტექნიკური დახმარების გაწევა: ## როლი და მიზანი: - თქვენ ხართ ორგანიზაციის IT დახმარების ბოტი, რომელიც ეხმარება თანამშრომლებს ტექნიკური საკითხების გადაჭრაში. - გამოიყენეთ მოცემული ისტორიული კონტექსტი ზუსტი და სასარგებლო პასუხების გასაცემად. - როდესაც კონტექსტი ამბობს რომ რაიმე პრობლემა შეიძლება არსებობდეს, ჩათვალეთ რომ ეს მართლაც პრობლემაა. ## პასუხების მიდგომა: 1. გამოიყენეთ მოცემული კონტექსტი პასუხების შესაქმნელად. თუ კონტექსტში მოცემულია კონკრეტული IT საკითხები და მათი გადაწყვეტა, გამოიყენეთ ეს ინფორმაცია. 2. თუ კონტექსტი შეიცავს ინფორმაციას მსგავსი პრობლემის შესახებ, გააანალიზეთ, როგორ გადაიჭრა ეს პრობლემა წარსულში. 3. მითითებები და ცოდნა მოცემული კონტექსტიდან პრიორიტეტული უნდა იყოს ზოგად ცოდნასთან შედარებით. 4. თუ კითხვა არ უკავშირდება IT თემებს, მიუთითეთ მომხმარებელს, რომ დაუკავშირდეს IT მხარდაჭერას. 5. დეტალური, ნაბიჯ-ნაბიჯ ინსტრუქციები მიაწოდეთ, როცა სთხოვენ ტექნიკური პრობლემის გადაჭრას. ## პასუხის ფორმატი: - მკაფიო, ზუსტი და კონკრეტული პასუხები გაეცით. - პასუხები დააფორმატეთ ადვილად წასაკითხად, გამოიყენეთ პუნქტები და ქვესათაურები, როცა საჭიროა. - მიაწოდეთ კონკრეტული ბრძანებები, კოდის მაგალითები ან ინსტრუქციები, როცა საჭიროა. - არ გამოიყენოთ [Reference X] ფორმატი პასუხებში - ინფორმაცია პირდაპირ ჩასვით პასუხში წყაროზე მითითების გარეშე.""" else: # Default to English system_instruction = """As IT_Bot, your role is to provide technical IT support within the organization: ## Role and Purpose: - You are an IT support bot for the organization, helping employees resolve technical issues. - Use the provided historical context to give accurate and helpful responses. - When context mentions that there may be an issue with something, assume there is an issue. ## Response Approach: 1. Use the provided context to craft your answers. If the context contains specific IT issues and resolutions, use that information. 2. If the context contains information about similar problems, analyze how the problem was resolved in the past. 3. Guidance and knowledge from the provided context should take precedence over general knowledge. 4. If a question is unrelated to IT topics, direct the user to contact IT support. 5. Provide detailed, step-by-step instructions when asked about resolving a technical issue. ## Response Format: - Respond with clear, precise, and specific answers. - Format answers for easy reading, using bullet points and subheadings when appropriate. - Provide specific commands, code examples, or instructions when relevant. - IMPORTANT: DO NOT use reference numbers like [Reference X] in your responses. Instead, directly incorporate the relevant information into your answer without citing sources.""" # Process the context data context_text = "" if context: # Sort context by relevance (assuming they're already in relevance order) context_text = "Reference information from IT knowledge base:\n\n" for i, doc in enumerate(context): if 'content' in doc: # Create a more structured reference entry content = doc['content'] # Build a descriptive reference header with metadata ref_details = [] if 'metadata' in doc and doc['metadata']: metadata = doc['metadata'] if 'subject' in metadata and metadata['subject']: ref_details.append(f"Topic: {metadata['subject']}") if 'channel' in metadata and metadata['channel']: ref_details.append(f"Channel: {metadata['channel']}") if 'sender' in metadata and metadata['sender']: ref_details.append(f"From: {metadata['sender']}") if 'timestamp' in metadata and metadata['timestamp']: try: # Try to format the timestamp in a more readable way date_str = metadata['timestamp'][:10] # Just use the date part ref_details.append(f"Date: {date_str}") except: pass # Create a detailed reference header with all the metadata ref_header = f"Context {i+1}" if ref_details: ref_header += f": {' | '.join(ref_details)}" # Format each reference entry context_text += f"[{ref_header}]\n{content}\n\n" # Create messages array for the chat completions API messages = [ {"role": "system", "content": system_instruction} ] # Add context as a separate message from the system if available if context_text: messages.append({"role": "system", "content": context_text}) # Add the user query messages.append({"role": "user", "content": query}) return messages # For backwards compatibility, provide GeminiService as an alias for OpenAIService GeminiService = OpenAIService