zulip_bot/compare_messages.py
2025-05-16 18:00:22 +04:00

333 lines
12 KiB
Python
Executable File

#!/usr/bin/env python
"""
Script to compare the number of messages in Zulip channels to ChromaDB.
This script will gather statistics on message counts from both Zulip DB and ChromaDB,
then generate a report showing discrepancies between the two.
"""
import os
import sys
import logging
from collections import defaultdict, Counter
from datetime import datetime, timedelta
import argparse
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("compare_messages")
# Add the current directory to the path so we can import the app module
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
# Apply NumPy compatibility patch for ChromaDB
from app.utils import patch_chromadb_numpy
patch_chromadb_numpy()
from app import create_app
from app.db import get_chroma_collection, get_db_session
from app.db.zulip_service import ZulipDatabaseService
from app.models.zulip import Message, Stream, Recipient, UserProfile
from sqlalchemy import and_, not_, or_
from app.config import Config
def get_excluded_user_ids():
"""Get the user IDs of IT_Bot and ai_bot."""
session = get_db_session()
excluded_users = session.query(UserProfile).filter(
UserProfile.full_name.in_(['IT_Bot', 'ai_bot'])
).all()
excluded_user_ids = [user.id for user in excluded_users]
logger.info(f"Excluding messages from users: {[u.full_name for u in excluded_users]} (IDs: {excluded_user_ids})")
return excluded_user_ids
def get_sandbox_recipient_id():
"""Get the recipient ID for the sandbox channel."""
session = get_db_session()
sandbox_stream = session.query(Stream).filter(
Stream.name == 'sandbox'
).first()
if sandbox_stream:
logger.info(f"Excluding messages from sandbox channel (recipient_id={sandbox_stream.recipient_id})")
return sandbox_stream.recipient_id
else:
logger.warning("Sandbox channel not found")
return None
def get_zulip_message_counts(days=30):
"""
Get message counts from Zulip database for all channels except sandbox,
also excluding IT_Bot and ai_bot messages.
Args:
days: Number of days to look back
Returns:
dict: Channel name to message count mapping
"""
logger.info(f"Getting message counts from Zulip DB for the last {days} days")
try:
session = get_db_session()
# Get excluded user IDs (IT_Bot and ai_bot)
excluded_user_ids = get_excluded_user_ids()
# Get sandbox recipient ID to exclude
sandbox_recipient_id = get_sandbox_recipient_id()
# Build filters
since_date = datetime.now() - timedelta(days=days)
filters = [Message.date_sent >= since_date]
# Add filter for excluded users
if excluded_user_ids:
filters.append(not_(Message.sender_id.in_(excluded_user_ids)))
# Add filter for excluded recipient (sandbox)
if sandbox_recipient_id:
filters.append(Message.recipient_id != sandbox_recipient_id)
# Get all messages
messages = session.query(Message).filter(and_(*filters)).all()
# Get all channels except sandbox
streams = session.query(Stream).filter(
Stream.deactivated == False
).all()
# Filter out sandbox
included_streams = [stream for stream in streams
if stream.recipient_id != sandbox_recipient_id]
# Print the list of channels being analyzed
channels = [(stream.name, stream.recipient_id) for stream in included_streams]
channels.sort(key=lambda x: x[0])
logger.info(f"Analyzing messages from {len(channels)} channels:")
for channel_name, recipient_id in channels:
logger.info(f"- {channel_name} (recipient_id={recipient_id})")
# Count messages by channel
channel_counts = defaultdict(int)
message_ids = set()
for message in messages:
channel_name = ZulipDatabaseService.get_channel_name_for_message(message)
if channel_name and channel_name != "sandbox":
channel_counts[channel_name] += 1
message_ids.add(str(message.id)) # Convert to string for comparison with ChromaDB
# Print the message counts by channel
logger.info(f"Message counts by channel:")
for channel, count in sorted(channel_counts.items()):
logger.info(f"- {channel}: {count} messages")
return {
'channel_counts': dict(channel_counts),
'total_count': len(messages),
'unique_count': len(message_ids),
'message_ids': message_ids
}
except Exception as e:
logger.error(f"Error getting Zulip message counts: {e}")
return {'channel_counts': {}, 'total_count': 0, 'unique_count': 0, 'message_ids': set()}
def get_chromadb_message_counts():
"""
Get message counts from ChromaDB.
Returns:
dict: Statistics about ChromaDB messages
"""
logger.info("Getting message counts from ChromaDB")
try:
collection = get_chroma_collection()
if not collection:
logger.error("Failed to get ChromaDB collection")
return {'channel_counts': {}, 'total_count': 0, 'unique_count': 0, 'message_ids': set()}
# Get all entries
result = collection.get(include=['metadatas'])
if not result or 'ids' not in result or not result['ids']:
logger.info("No entries found in ChromaDB")
return {'channel_counts': {}, 'total_count': 0, 'unique_count': 0, 'message_ids': set()}
# Count messages by channel
channel_counts = defaultdict(int)
message_ids = set()
for i, message_id in enumerate(result['ids']):
# Extract channel from metadata
if result.get('metadatas') and len(result['metadatas']) > i:
metadata = result['metadatas'][i]
channel = metadata.get('channel', 'Unknown')
if channel != "sandbox":
channel_counts[channel] += 1
# Add to message_ids set
message_ids.add(message_id)
# Count duplicates
id_counts = Counter(result['ids'])
duplicates = {message_id: count for message_id, count in id_counts.items() if count > 1}
# Print the message counts by channel
logger.info(f"ChromaDB message counts by channel:")
for channel, count in sorted(channel_counts.items()):
logger.info(f"- {channel}: {count} messages")
return {
'channel_counts': dict(channel_counts),
'total_count': len(result['ids']),
'unique_count': len(message_ids),
'message_ids': message_ids,
'duplicate_count': len(duplicates),
'duplicates': duplicates
}
except Exception as e:
logger.error(f"Error getting ChromaDB message counts: {e}")
return {'channel_counts': {}, 'total_count': 0, 'unique_count': 0, 'message_ids': set()}
def compare_counts(zulip_counts, chromadb_counts, days):
"""
Compare message counts between Zulip and ChromaDB.
Args:
zulip_counts: Counts from Zulip DB
chromadb_counts: Counts from ChromaDB
days: Number of days looked back
Returns:
dict: Comparison statistics
"""
logger.info("Comparing message counts")
# Get message IDs in Zulip but not in ChromaDB
zulip_ids = set(zulip_counts['message_ids'])
chroma_ids = set(chromadb_counts['message_ids'])
# Convert all IDs to strings for comparison
zulip_ids = {str(id) for id in zulip_ids}
chroma_ids = {str(id) for id in chroma_ids}
missing_from_chromadb = zulip_ids - chroma_ids
# Get message IDs in ChromaDB but not in Zulip (within the timeframe)
extra_in_chromadb = chroma_ids - zulip_ids
# Channel comparison
channel_comparison = {}
all_channels = set(zulip_counts['channel_counts'].keys()) | set(chromadb_counts['channel_counts'].keys())
for channel in all_channels:
zulip_count = zulip_counts['channel_counts'].get(channel, 0)
chromadb_count = chromadb_counts['channel_counts'].get(channel, 0)
difference = zulip_count - chromadb_count
channel_comparison[channel] = {
'zulip_count': zulip_count,
'chromadb_count': chromadb_count,
'difference': difference,
'percentage': (chromadb_count / zulip_count * 100) if zulip_count > 0 else 0
}
return {
'channel_comparison': channel_comparison,
'missing_from_chromadb': missing_from_chromadb,
'missing_count': len(missing_from_chromadb),
'extra_in_chromadb': extra_in_chromadb,
'extra_count': len(extra_in_chromadb),
'zulip_total': zulip_counts['total_count'],
'chromadb_total': chromadb_counts['total_count'],
'zulip_unique': zulip_counts['unique_count'],
'chromadb_unique': chromadb_counts['unique_count'],
'duplicate_count': chromadb_counts.get('duplicate_count', 0),
'days': days
}
def print_comparison_report(comparison):
"""
Print a report of the comparison.
Args:
comparison: Comparison statistics
"""
print("\n" + "=" * 80)
print(f"ZULIP TO CHROMADB COMPARISON REPORT (Last {comparison['days']} days)")
print("=" * 80)
print("\nSUMMARY:")
print(f"Zulip total messages: {comparison['zulip_total']}")
print(f"Zulip unique messages: {comparison['zulip_unique']}")
print(f"ChromaDB total entries: {comparison['chromadb_total']}")
print(f"ChromaDB unique entries: {comparison['chromadb_unique']}")
print(f"Duplicate entries in ChromaDB: {comparison['duplicate_count']}")
sync_percentage = (comparison['chromadb_unique'] / comparison['zulip_unique'] * 100) if comparison['zulip_unique'] > 0 else 0
print(f"Overall sync rate: {sync_percentage:.2f}%")
print(f"Messages in Zulip but missing from ChromaDB: {comparison['missing_count']}")
print(f"Entries in ChromaDB not found in recent Zulip data: {comparison['extra_count']}")
print("\nCHANNEL BREAKDOWN:")
print("-" * 80)
print(f"{'Channel':<25} {'Zulip':<10} {'ChromaDB':<10} {'Diff':<10} {'Sync %':<10}")
print("-" * 80)
for channel, stats in sorted(comparison['channel_comparison'].items()):
print(f"{channel:<25} {stats['zulip_count']:<10} {stats['chromadb_count']:<10} {stats['difference']:<10} {stats['percentage']:.2f}%")
if comparison['missing_count'] > 0:
print("\nMISSING MESSAGE IDS (Sample):")
print(", ".join(str(mid) for mid in list(comparison['missing_from_chromadb'])[:10]))
if comparison['duplicate_count'] > 0:
print("\nDUPLICATE ENTRIES DETECTED")
print(f"Total messages with duplicates: {comparison['duplicate_count']}")
print("\n" + "=" * 80)
print("RECOMMENDATIONS:")
if comparison['duplicate_count'] > 0:
print("- Run ./fix_duplicate_entries.py to remove duplicate entries")
if comparison['missing_count'] > 0:
print("- Run python sync_all_channels.py --force --days {0} to sync missing messages".format(comparison['days']))
if sync_percentage < 95:
print("- Investigate sync service settings and DB connection issues")
print("=" * 80 + "\n")
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(description="Compare Zulip channel messages to ChromaDB entries")
parser.add_argument("--days", type=int, default=30, help="Number of days to look back in Zulip history")
args = parser.parse_args()
logger.info("Starting message comparison")
# Create the Flask app (needed for context)
app = create_app()
with app.app_context():
# Get message counts
zulip_counts = get_zulip_message_counts(days=args.days)
chromadb_counts = get_chromadb_message_counts()
# Compare counts
comparison = compare_counts(zulip_counts, chromadb_counts, args.days)
# Print report
print_comparison_report(comparison)
logger.info("Comparison completed")
if __name__ == "__main__":
main()