#!/usr/bin/env python """ Script to compare the number of messages in Zulip channels to ChromaDB. This script will gather statistics on message counts from both Zulip DB and ChromaDB, then generate a report showing discrepancies between the two. """ import os import sys import logging from collections import defaultdict, Counter from datetime import datetime, timedelta import argparse # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger("compare_messages") # Add the current directory to the path so we can import the app module sys.path.append(os.path.dirname(os.path.abspath(__file__))) # Apply NumPy compatibility patch for ChromaDB from app.utils import patch_chromadb_numpy patch_chromadb_numpy() from app import create_app from app.db import get_chroma_collection, get_db_session from app.db.zulip_service import ZulipDatabaseService from app.models.zulip import Message, Stream, Recipient, UserProfile from sqlalchemy import and_, not_, or_ from app.config import Config def get_excluded_user_ids(): """Get the user IDs of IT_Bot and ai_bot.""" session = get_db_session() excluded_users = session.query(UserProfile).filter( UserProfile.full_name.in_(['IT_Bot', 'ai_bot']) ).all() excluded_user_ids = [user.id for user in excluded_users] logger.info(f"Excluding messages from users: {[u.full_name for u in excluded_users]} (IDs: {excluded_user_ids})") return excluded_user_ids def get_sandbox_recipient_id(): """Get the recipient ID for the sandbox channel.""" session = get_db_session() sandbox_stream = session.query(Stream).filter( Stream.name == 'sandbox' ).first() if sandbox_stream: logger.info(f"Excluding messages from sandbox channel (recipient_id={sandbox_stream.recipient_id})") return sandbox_stream.recipient_id else: logger.warning("Sandbox channel not found") return None def get_zulip_message_counts(days=30): """ Get message counts from Zulip database for all channels except sandbox, also excluding IT_Bot and ai_bot messages. Args: days: Number of days to look back Returns: dict: Channel name to message count mapping """ logger.info(f"Getting message counts from Zulip DB for the last {days} days") try: session = get_db_session() # Get excluded user IDs (IT_Bot and ai_bot) excluded_user_ids = get_excluded_user_ids() # Get sandbox recipient ID to exclude sandbox_recipient_id = get_sandbox_recipient_id() # Build filters since_date = datetime.now() - timedelta(days=days) filters = [Message.date_sent >= since_date] # Add filter for excluded users if excluded_user_ids: filters.append(not_(Message.sender_id.in_(excluded_user_ids))) # Add filter for excluded recipient (sandbox) if sandbox_recipient_id: filters.append(Message.recipient_id != sandbox_recipient_id) # Get all messages messages = session.query(Message).filter(and_(*filters)).all() # Get all channels except sandbox streams = session.query(Stream).filter( Stream.deactivated == False ).all() # Filter out sandbox included_streams = [stream for stream in streams if stream.recipient_id != sandbox_recipient_id] # Print the list of channels being analyzed channels = [(stream.name, stream.recipient_id) for stream in included_streams] channels.sort(key=lambda x: x[0]) logger.info(f"Analyzing messages from {len(channels)} channels:") for channel_name, recipient_id in channels: logger.info(f"- {channel_name} (recipient_id={recipient_id})") # Count messages by channel channel_counts = defaultdict(int) message_ids = set() for message in messages: channel_name = ZulipDatabaseService.get_channel_name_for_message(message) if channel_name and channel_name != "sandbox": channel_counts[channel_name] += 1 message_ids.add(str(message.id)) # Convert to string for comparison with ChromaDB # Print the message counts by channel logger.info(f"Message counts by channel:") for channel, count in sorted(channel_counts.items()): logger.info(f"- {channel}: {count} messages") return { 'channel_counts': dict(channel_counts), 'total_count': len(messages), 'unique_count': len(message_ids), 'message_ids': message_ids } except Exception as e: logger.error(f"Error getting Zulip message counts: {e}") return {'channel_counts': {}, 'total_count': 0, 'unique_count': 0, 'message_ids': set()} def get_chromadb_message_counts(): """ Get message counts from ChromaDB. Returns: dict: Statistics about ChromaDB messages """ logger.info("Getting message counts from ChromaDB") try: collection = get_chroma_collection() if not collection: logger.error("Failed to get ChromaDB collection") return {'channel_counts': {}, 'total_count': 0, 'unique_count': 0, 'message_ids': set()} # Get all entries result = collection.get(include=['metadatas']) if not result or 'ids' not in result or not result['ids']: logger.info("No entries found in ChromaDB") return {'channel_counts': {}, 'total_count': 0, 'unique_count': 0, 'message_ids': set()} # Count messages by channel channel_counts = defaultdict(int) message_ids = set() for i, message_id in enumerate(result['ids']): # Extract channel from metadata if result.get('metadatas') and len(result['metadatas']) > i: metadata = result['metadatas'][i] channel = metadata.get('channel', 'Unknown') if channel != "sandbox": channel_counts[channel] += 1 # Add to message_ids set message_ids.add(message_id) # Count duplicates id_counts = Counter(result['ids']) duplicates = {message_id: count for message_id, count in id_counts.items() if count > 1} # Print the message counts by channel logger.info(f"ChromaDB message counts by channel:") for channel, count in sorted(channel_counts.items()): logger.info(f"- {channel}: {count} messages") return { 'channel_counts': dict(channel_counts), 'total_count': len(result['ids']), 'unique_count': len(message_ids), 'message_ids': message_ids, 'duplicate_count': len(duplicates), 'duplicates': duplicates } except Exception as e: logger.error(f"Error getting ChromaDB message counts: {e}") return {'channel_counts': {}, 'total_count': 0, 'unique_count': 0, 'message_ids': set()} def compare_counts(zulip_counts, chromadb_counts, days): """ Compare message counts between Zulip and ChromaDB. Args: zulip_counts: Counts from Zulip DB chromadb_counts: Counts from ChromaDB days: Number of days looked back Returns: dict: Comparison statistics """ logger.info("Comparing message counts") # Get message IDs in Zulip but not in ChromaDB zulip_ids = set(zulip_counts['message_ids']) chroma_ids = set(chromadb_counts['message_ids']) # Convert all IDs to strings for comparison zulip_ids = {str(id) for id in zulip_ids} chroma_ids = {str(id) for id in chroma_ids} missing_from_chromadb = zulip_ids - chroma_ids # Get message IDs in ChromaDB but not in Zulip (within the timeframe) extra_in_chromadb = chroma_ids - zulip_ids # Channel comparison channel_comparison = {} all_channels = set(zulip_counts['channel_counts'].keys()) | set(chromadb_counts['channel_counts'].keys()) for channel in all_channels: zulip_count = zulip_counts['channel_counts'].get(channel, 0) chromadb_count = chromadb_counts['channel_counts'].get(channel, 0) difference = zulip_count - chromadb_count channel_comparison[channel] = { 'zulip_count': zulip_count, 'chromadb_count': chromadb_count, 'difference': difference, 'percentage': (chromadb_count / zulip_count * 100) if zulip_count > 0 else 0 } return { 'channel_comparison': channel_comparison, 'missing_from_chromadb': missing_from_chromadb, 'missing_count': len(missing_from_chromadb), 'extra_in_chromadb': extra_in_chromadb, 'extra_count': len(extra_in_chromadb), 'zulip_total': zulip_counts['total_count'], 'chromadb_total': chromadb_counts['total_count'], 'zulip_unique': zulip_counts['unique_count'], 'chromadb_unique': chromadb_counts['unique_count'], 'duplicate_count': chromadb_counts.get('duplicate_count', 0), 'days': days } def print_comparison_report(comparison): """ Print a report of the comparison. Args: comparison: Comparison statistics """ print("\n" + "=" * 80) print(f"ZULIP TO CHROMADB COMPARISON REPORT (Last {comparison['days']} days)") print("=" * 80) print("\nSUMMARY:") print(f"Zulip total messages: {comparison['zulip_total']}") print(f"Zulip unique messages: {comparison['zulip_unique']}") print(f"ChromaDB total entries: {comparison['chromadb_total']}") print(f"ChromaDB unique entries: {comparison['chromadb_unique']}") print(f"Duplicate entries in ChromaDB: {comparison['duplicate_count']}") sync_percentage = (comparison['chromadb_unique'] / comparison['zulip_unique'] * 100) if comparison['zulip_unique'] > 0 else 0 print(f"Overall sync rate: {sync_percentage:.2f}%") print(f"Messages in Zulip but missing from ChromaDB: {comparison['missing_count']}") print(f"Entries in ChromaDB not found in recent Zulip data: {comparison['extra_count']}") print("\nCHANNEL BREAKDOWN:") print("-" * 80) print(f"{'Channel':<25} {'Zulip':<10} {'ChromaDB':<10} {'Diff':<10} {'Sync %':<10}") print("-" * 80) for channel, stats in sorted(comparison['channel_comparison'].items()): print(f"{channel:<25} {stats['zulip_count']:<10} {stats['chromadb_count']:<10} {stats['difference']:<10} {stats['percentage']:.2f}%") if comparison['missing_count'] > 0: print("\nMISSING MESSAGE IDS (Sample):") print(", ".join(str(mid) for mid in list(comparison['missing_from_chromadb'])[:10])) if comparison['duplicate_count'] > 0: print("\nDUPLICATE ENTRIES DETECTED") print(f"Total messages with duplicates: {comparison['duplicate_count']}") print("\n" + "=" * 80) print("RECOMMENDATIONS:") if comparison['duplicate_count'] > 0: print("- Run ./fix_duplicate_entries.py to remove duplicate entries") if comparison['missing_count'] > 0: print("- Run python sync_all_channels.py --force --days {0} to sync missing messages".format(comparison['days'])) if sync_percentage < 95: print("- Investigate sync service settings and DB connection issues") print("=" * 80 + "\n") def main(): """Main entry point.""" parser = argparse.ArgumentParser(description="Compare Zulip channel messages to ChromaDB entries") parser.add_argument("--days", type=int, default=30, help="Number of days to look back in Zulip history") args = parser.parse_args() logger.info("Starting message comparison") # Create the Flask app (needed for context) app = create_app() with app.app_context(): # Get message counts zulip_counts = get_zulip_message_counts(days=args.days) chromadb_counts = get_chromadb_message_counts() # Compare counts comparison = compare_counts(zulip_counts, chromadb_counts, args.days) # Print report print_comparison_report(comparison) logger.info("Comparison completed") if __name__ == "__main__": main()