333 lines
12 KiB
Python
Executable File
333 lines
12 KiB
Python
Executable File
#!/usr/bin/env python
|
|
"""
|
|
Script to compare the number of messages in Zulip channels to ChromaDB.
|
|
|
|
This script will gather statistics on message counts from both Zulip DB and ChromaDB,
|
|
then generate a report showing discrepancies between the two.
|
|
"""
|
|
import os
|
|
import sys
|
|
import logging
|
|
from collections import defaultdict, Counter
|
|
from datetime import datetime, timedelta
|
|
import argparse
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
|
)
|
|
logger = logging.getLogger("compare_messages")
|
|
|
|
# Add the current directory to the path so we can import the app module
|
|
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
# Apply NumPy compatibility patch for ChromaDB
|
|
from app.utils import patch_chromadb_numpy
|
|
patch_chromadb_numpy()
|
|
|
|
from app import create_app
|
|
from app.db import get_chroma_collection, get_db_session
|
|
from app.db.zulip_service import ZulipDatabaseService
|
|
from app.models.zulip import Message, Stream, Recipient, UserProfile
|
|
from sqlalchemy import and_, not_, or_
|
|
from app.config import Config
|
|
|
|
def get_excluded_user_ids():
|
|
"""Get the user IDs of IT_Bot and ai_bot."""
|
|
session = get_db_session()
|
|
excluded_users = session.query(UserProfile).filter(
|
|
UserProfile.full_name.in_(['IT_Bot', 'ai_bot'])
|
|
).all()
|
|
|
|
excluded_user_ids = [user.id for user in excluded_users]
|
|
logger.info(f"Excluding messages from users: {[u.full_name for u in excluded_users]} (IDs: {excluded_user_ids})")
|
|
return excluded_user_ids
|
|
|
|
def get_sandbox_recipient_id():
|
|
"""Get the recipient ID for the sandbox channel."""
|
|
session = get_db_session()
|
|
sandbox_stream = session.query(Stream).filter(
|
|
Stream.name == 'sandbox'
|
|
).first()
|
|
|
|
if sandbox_stream:
|
|
logger.info(f"Excluding messages from sandbox channel (recipient_id={sandbox_stream.recipient_id})")
|
|
return sandbox_stream.recipient_id
|
|
else:
|
|
logger.warning("Sandbox channel not found")
|
|
return None
|
|
|
|
def get_zulip_message_counts(days=30):
|
|
"""
|
|
Get message counts from Zulip database for all channels except sandbox,
|
|
also excluding IT_Bot and ai_bot messages.
|
|
|
|
Args:
|
|
days: Number of days to look back
|
|
|
|
Returns:
|
|
dict: Channel name to message count mapping
|
|
"""
|
|
logger.info(f"Getting message counts from Zulip DB for the last {days} days")
|
|
|
|
try:
|
|
session = get_db_session()
|
|
# Get excluded user IDs (IT_Bot and ai_bot)
|
|
excluded_user_ids = get_excluded_user_ids()
|
|
|
|
# Get sandbox recipient ID to exclude
|
|
sandbox_recipient_id = get_sandbox_recipient_id()
|
|
|
|
# Build filters
|
|
since_date = datetime.now() - timedelta(days=days)
|
|
filters = [Message.date_sent >= since_date]
|
|
|
|
# Add filter for excluded users
|
|
if excluded_user_ids:
|
|
filters.append(not_(Message.sender_id.in_(excluded_user_ids)))
|
|
|
|
# Add filter for excluded recipient (sandbox)
|
|
if sandbox_recipient_id:
|
|
filters.append(Message.recipient_id != sandbox_recipient_id)
|
|
|
|
# Get all messages
|
|
messages = session.query(Message).filter(and_(*filters)).all()
|
|
|
|
# Get all channels except sandbox
|
|
streams = session.query(Stream).filter(
|
|
Stream.deactivated == False
|
|
).all()
|
|
|
|
# Filter out sandbox
|
|
included_streams = [stream for stream in streams
|
|
if stream.recipient_id != sandbox_recipient_id]
|
|
|
|
# Print the list of channels being analyzed
|
|
channels = [(stream.name, stream.recipient_id) for stream in included_streams]
|
|
channels.sort(key=lambda x: x[0])
|
|
logger.info(f"Analyzing messages from {len(channels)} channels:")
|
|
for channel_name, recipient_id in channels:
|
|
logger.info(f"- {channel_name} (recipient_id={recipient_id})")
|
|
|
|
# Count messages by channel
|
|
channel_counts = defaultdict(int)
|
|
message_ids = set()
|
|
|
|
for message in messages:
|
|
channel_name = ZulipDatabaseService.get_channel_name_for_message(message)
|
|
if channel_name and channel_name != "sandbox":
|
|
channel_counts[channel_name] += 1
|
|
message_ids.add(str(message.id)) # Convert to string for comparison with ChromaDB
|
|
|
|
# Print the message counts by channel
|
|
logger.info(f"Message counts by channel:")
|
|
for channel, count in sorted(channel_counts.items()):
|
|
logger.info(f"- {channel}: {count} messages")
|
|
|
|
return {
|
|
'channel_counts': dict(channel_counts),
|
|
'total_count': len(messages),
|
|
'unique_count': len(message_ids),
|
|
'message_ids': message_ids
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error getting Zulip message counts: {e}")
|
|
return {'channel_counts': {}, 'total_count': 0, 'unique_count': 0, 'message_ids': set()}
|
|
|
|
def get_chromadb_message_counts():
|
|
"""
|
|
Get message counts from ChromaDB.
|
|
|
|
Returns:
|
|
dict: Statistics about ChromaDB messages
|
|
"""
|
|
logger.info("Getting message counts from ChromaDB")
|
|
|
|
try:
|
|
collection = get_chroma_collection()
|
|
|
|
if not collection:
|
|
logger.error("Failed to get ChromaDB collection")
|
|
return {'channel_counts': {}, 'total_count': 0, 'unique_count': 0, 'message_ids': set()}
|
|
|
|
# Get all entries
|
|
result = collection.get(include=['metadatas'])
|
|
|
|
if not result or 'ids' not in result or not result['ids']:
|
|
logger.info("No entries found in ChromaDB")
|
|
return {'channel_counts': {}, 'total_count': 0, 'unique_count': 0, 'message_ids': set()}
|
|
|
|
# Count messages by channel
|
|
channel_counts = defaultdict(int)
|
|
message_ids = set()
|
|
|
|
for i, message_id in enumerate(result['ids']):
|
|
# Extract channel from metadata
|
|
if result.get('metadatas') and len(result['metadatas']) > i:
|
|
metadata = result['metadatas'][i]
|
|
channel = metadata.get('channel', 'Unknown')
|
|
if channel != "sandbox":
|
|
channel_counts[channel] += 1
|
|
|
|
# Add to message_ids set
|
|
message_ids.add(message_id)
|
|
|
|
# Count duplicates
|
|
id_counts = Counter(result['ids'])
|
|
duplicates = {message_id: count for message_id, count in id_counts.items() if count > 1}
|
|
|
|
# Print the message counts by channel
|
|
logger.info(f"ChromaDB message counts by channel:")
|
|
for channel, count in sorted(channel_counts.items()):
|
|
logger.info(f"- {channel}: {count} messages")
|
|
|
|
return {
|
|
'channel_counts': dict(channel_counts),
|
|
'total_count': len(result['ids']),
|
|
'unique_count': len(message_ids),
|
|
'message_ids': message_ids,
|
|
'duplicate_count': len(duplicates),
|
|
'duplicates': duplicates
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error getting ChromaDB message counts: {e}")
|
|
return {'channel_counts': {}, 'total_count': 0, 'unique_count': 0, 'message_ids': set()}
|
|
|
|
def compare_counts(zulip_counts, chromadb_counts, days):
|
|
"""
|
|
Compare message counts between Zulip and ChromaDB.
|
|
|
|
Args:
|
|
zulip_counts: Counts from Zulip DB
|
|
chromadb_counts: Counts from ChromaDB
|
|
days: Number of days looked back
|
|
|
|
Returns:
|
|
dict: Comparison statistics
|
|
"""
|
|
logger.info("Comparing message counts")
|
|
|
|
# Get message IDs in Zulip but not in ChromaDB
|
|
zulip_ids = set(zulip_counts['message_ids'])
|
|
chroma_ids = set(chromadb_counts['message_ids'])
|
|
|
|
# Convert all IDs to strings for comparison
|
|
zulip_ids = {str(id) for id in zulip_ids}
|
|
chroma_ids = {str(id) for id in chroma_ids}
|
|
|
|
missing_from_chromadb = zulip_ids - chroma_ids
|
|
|
|
# Get message IDs in ChromaDB but not in Zulip (within the timeframe)
|
|
extra_in_chromadb = chroma_ids - zulip_ids
|
|
|
|
# Channel comparison
|
|
channel_comparison = {}
|
|
all_channels = set(zulip_counts['channel_counts'].keys()) | set(chromadb_counts['channel_counts'].keys())
|
|
|
|
for channel in all_channels:
|
|
zulip_count = zulip_counts['channel_counts'].get(channel, 0)
|
|
chromadb_count = chromadb_counts['channel_counts'].get(channel, 0)
|
|
difference = zulip_count - chromadb_count
|
|
|
|
channel_comparison[channel] = {
|
|
'zulip_count': zulip_count,
|
|
'chromadb_count': chromadb_count,
|
|
'difference': difference,
|
|
'percentage': (chromadb_count / zulip_count * 100) if zulip_count > 0 else 0
|
|
}
|
|
|
|
return {
|
|
'channel_comparison': channel_comparison,
|
|
'missing_from_chromadb': missing_from_chromadb,
|
|
'missing_count': len(missing_from_chromadb),
|
|
'extra_in_chromadb': extra_in_chromadb,
|
|
'extra_count': len(extra_in_chromadb),
|
|
'zulip_total': zulip_counts['total_count'],
|
|
'chromadb_total': chromadb_counts['total_count'],
|
|
'zulip_unique': zulip_counts['unique_count'],
|
|
'chromadb_unique': chromadb_counts['unique_count'],
|
|
'duplicate_count': chromadb_counts.get('duplicate_count', 0),
|
|
'days': days
|
|
}
|
|
|
|
def print_comparison_report(comparison):
|
|
"""
|
|
Print a report of the comparison.
|
|
|
|
Args:
|
|
comparison: Comparison statistics
|
|
"""
|
|
print("\n" + "=" * 80)
|
|
print(f"ZULIP TO CHROMADB COMPARISON REPORT (Last {comparison['days']} days)")
|
|
print("=" * 80)
|
|
|
|
print("\nSUMMARY:")
|
|
print(f"Zulip total messages: {comparison['zulip_total']}")
|
|
print(f"Zulip unique messages: {comparison['zulip_unique']}")
|
|
print(f"ChromaDB total entries: {comparison['chromadb_total']}")
|
|
print(f"ChromaDB unique entries: {comparison['chromadb_unique']}")
|
|
print(f"Duplicate entries in ChromaDB: {comparison['duplicate_count']}")
|
|
|
|
sync_percentage = (comparison['chromadb_unique'] / comparison['zulip_unique'] * 100) if comparison['zulip_unique'] > 0 else 0
|
|
print(f"Overall sync rate: {sync_percentage:.2f}%")
|
|
|
|
print(f"Messages in Zulip but missing from ChromaDB: {comparison['missing_count']}")
|
|
print(f"Entries in ChromaDB not found in recent Zulip data: {comparison['extra_count']}")
|
|
|
|
print("\nCHANNEL BREAKDOWN:")
|
|
print("-" * 80)
|
|
print(f"{'Channel':<25} {'Zulip':<10} {'ChromaDB':<10} {'Diff':<10} {'Sync %':<10}")
|
|
print("-" * 80)
|
|
|
|
for channel, stats in sorted(comparison['channel_comparison'].items()):
|
|
print(f"{channel:<25} {stats['zulip_count']:<10} {stats['chromadb_count']:<10} {stats['difference']:<10} {stats['percentage']:.2f}%")
|
|
|
|
if comparison['missing_count'] > 0:
|
|
print("\nMISSING MESSAGE IDS (Sample):")
|
|
print(", ".join(str(mid) for mid in list(comparison['missing_from_chromadb'])[:10]))
|
|
|
|
if comparison['duplicate_count'] > 0:
|
|
print("\nDUPLICATE ENTRIES DETECTED")
|
|
print(f"Total messages with duplicates: {comparison['duplicate_count']}")
|
|
|
|
print("\n" + "=" * 80)
|
|
print("RECOMMENDATIONS:")
|
|
|
|
if comparison['duplicate_count'] > 0:
|
|
print("- Run ./fix_duplicate_entries.py to remove duplicate entries")
|
|
|
|
if comparison['missing_count'] > 0:
|
|
print("- Run python sync_all_channels.py --force --days {0} to sync missing messages".format(comparison['days']))
|
|
|
|
if sync_percentage < 95:
|
|
print("- Investigate sync service settings and DB connection issues")
|
|
|
|
print("=" * 80 + "\n")
|
|
|
|
def main():
|
|
"""Main entry point."""
|
|
parser = argparse.ArgumentParser(description="Compare Zulip channel messages to ChromaDB entries")
|
|
parser.add_argument("--days", type=int, default=30, help="Number of days to look back in Zulip history")
|
|
args = parser.parse_args()
|
|
|
|
logger.info("Starting message comparison")
|
|
|
|
# Create the Flask app (needed for context)
|
|
app = create_app()
|
|
|
|
with app.app_context():
|
|
# Get message counts
|
|
zulip_counts = get_zulip_message_counts(days=args.days)
|
|
chromadb_counts = get_chromadb_message_counts()
|
|
|
|
# Compare counts
|
|
comparison = compare_counts(zulip_counts, chromadb_counts, args.days)
|
|
|
|
# Print report
|
|
print_comparison_report(comparison)
|
|
|
|
logger.info("Comparison completed")
|
|
|
|
if __name__ == "__main__":
|
|
main() |