zulip_bot/sync_all_messages.py
2025-05-16 18:00:22 +04:00

157 lines
5.6 KiB
Python
Executable File

#!/usr/bin/env python
"""
Script to sync ALL messages from Zulip to ChromaDB with NO restrictions.
This script will sync everything - all channels, all users, all time periods.
"""
import os
import sys
import logging
from datetime import datetime
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("sync_all_messages")
# Add the current directory to the path so we can import the app module
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
# Apply NumPy compatibility patch for ChromaDB
from app.utils import patch_chromadb_numpy
patch_chromadb_numpy()
from app import create_app
from app.db import get_db_session
from app.db.zulip_service import ZulipDatabaseService
from app.db.chroma_service import ChromaDBService
from app.models.zulip import Message
def sync_all_messages():
"""
Sync ALL messages from Zulip to ChromaDB with no restrictions.
All messages are processed in a single pass.
Returns:
dict: Statistics about the sync
"""
logger.info("Starting unrestricted sync of ALL messages in one pass")
session = get_db_session()
total_synced = 0
already_exists = 0
channel_counts = {}
# Get all messages at once
logger.info("Fetching ALL messages from Zulip database")
messages = session.query(Message).order_by(Message.id).all()
total_messages = len(messages)
logger.info(f"Found {total_messages} total messages in Zulip database")
# Process all messages
logger.info("Processing all messages")
for i, message in enumerate(messages):
message_id = message.id
# Log progress at intervals
if i % 500 == 0 and i > 0:
logger.info(f"Progress: {i}/{total_messages} messages processed ({(i/total_messages)*100:.1f}%)")
# Get message details
try:
channel_name = ZulipDatabaseService.get_channel_name_for_message(message)
sender_name = ZulipDatabaseService.get_sender_name_for_message(message)
# Handle None channel names
if channel_name is None:
channel_name = "Unknown Channel"
logger.warning(f"Message {message_id} has None channel name, using '{channel_name}' instead")
# Check if message already exists in ChromaDB
if ChromaDBService.message_exists(message_id):
already_exists += 1
continue
# Add message to ChromaDB
success = ChromaDBService.add_message(
message_id=message_id,
content=message.content,
channel_name=channel_name,
subject=message.subject,
sender_name=sender_name,
date_sent=message.date_sent
)
if success:
total_synced += 1
# Update channel counts
if channel_name not in channel_counts:
channel_counts[channel_name] = 0
channel_counts[channel_name] += 1
else:
logger.warning(f"Failed to add message {message_id} to ChromaDB")
except Exception as e:
logger.error(f"Error processing message {message_id}: {e}")
# Print channel statistics
if channel_counts:
logger.info("Messages synced by channel:")
for channel, count in sorted(channel_counts.items()):
logger.info(f"- {channel}: {count} messages")
# Return statistics
return {
'total_messages': total_messages,
'total_synced': total_synced,
'already_exists': already_exists,
'channel_counts': channel_counts
}
def main():
"""Main entry point."""
try:
# Create the Flask app (needed for context)
app = create_app()
with app.app_context():
print("\n====================================================")
print("STARTING UNRESTRICTED SYNC OF ALL ZULIP MESSAGES")
print(f"Started at: {datetime.now()}")
print("====================================================\n")
# Sync all messages
start_time = datetime.now()
stats = sync_all_messages()
end_time = datetime.now()
duration = end_time - start_time
# Print summary
print("\n====================================================")
print("SYNC COMPLETE")
print(f"Started at: {start_time}")
print(f"Completed at: {end_time}")
print(f"Duration: {duration}")
print(f"Total messages in Zulip: {stats['total_messages']}")
print(f"Total messages synced: {stats['total_synced']}")
print(f"Messages already in ChromaDB: {stats['already_exists']}")
# Print channel counts
if stats['channel_counts']:
print("\nMessages synced by channel:")
for channel, count in sorted(stats['channel_counts'].items()):
print(f"- {channel}: {count} messages")
print("====================================================\n")
except KeyboardInterrupt:
print("\nSync process interrupted by user")
logger.info("Sync process interrupted by user")
except Exception as e:
print(f"\nError during sync: {e}")
logger.error(f"Error during sync: {e}")
if __name__ == "__main__":
main()