157 lines
5.6 KiB
Python
Executable File
157 lines
5.6 KiB
Python
Executable File
#!/usr/bin/env python
|
|
"""
|
|
Script to sync ALL messages from Zulip to ChromaDB with NO restrictions.
|
|
This script will sync everything - all channels, all users, all time periods.
|
|
"""
|
|
import os
|
|
import sys
|
|
import logging
|
|
from datetime import datetime
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
|
)
|
|
logger = logging.getLogger("sync_all_messages")
|
|
|
|
# Add the current directory to the path so we can import the app module
|
|
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
# Apply NumPy compatibility patch for ChromaDB
|
|
from app.utils import patch_chromadb_numpy
|
|
patch_chromadb_numpy()
|
|
|
|
from app import create_app
|
|
from app.db import get_db_session
|
|
from app.db.zulip_service import ZulipDatabaseService
|
|
from app.db.chroma_service import ChromaDBService
|
|
from app.models.zulip import Message
|
|
|
|
def sync_all_messages():
|
|
"""
|
|
Sync ALL messages from Zulip to ChromaDB with no restrictions.
|
|
All messages are processed in a single pass.
|
|
|
|
Returns:
|
|
dict: Statistics about the sync
|
|
"""
|
|
logger.info("Starting unrestricted sync of ALL messages in one pass")
|
|
|
|
session = get_db_session()
|
|
total_synced = 0
|
|
already_exists = 0
|
|
channel_counts = {}
|
|
|
|
# Get all messages at once
|
|
logger.info("Fetching ALL messages from Zulip database")
|
|
messages = session.query(Message).order_by(Message.id).all()
|
|
total_messages = len(messages)
|
|
logger.info(f"Found {total_messages} total messages in Zulip database")
|
|
|
|
# Process all messages
|
|
logger.info("Processing all messages")
|
|
for i, message in enumerate(messages):
|
|
message_id = message.id
|
|
|
|
# Log progress at intervals
|
|
if i % 500 == 0 and i > 0:
|
|
logger.info(f"Progress: {i}/{total_messages} messages processed ({(i/total_messages)*100:.1f}%)")
|
|
|
|
# Get message details
|
|
try:
|
|
channel_name = ZulipDatabaseService.get_channel_name_for_message(message)
|
|
sender_name = ZulipDatabaseService.get_sender_name_for_message(message)
|
|
|
|
# Handle None channel names
|
|
if channel_name is None:
|
|
channel_name = "Unknown Channel"
|
|
logger.warning(f"Message {message_id} has None channel name, using '{channel_name}' instead")
|
|
|
|
# Check if message already exists in ChromaDB
|
|
if ChromaDBService.message_exists(message_id):
|
|
already_exists += 1
|
|
continue
|
|
|
|
# Add message to ChromaDB
|
|
success = ChromaDBService.add_message(
|
|
message_id=message_id,
|
|
content=message.content,
|
|
channel_name=channel_name,
|
|
subject=message.subject,
|
|
sender_name=sender_name,
|
|
date_sent=message.date_sent
|
|
)
|
|
|
|
if success:
|
|
total_synced += 1
|
|
|
|
# Update channel counts
|
|
if channel_name not in channel_counts:
|
|
channel_counts[channel_name] = 0
|
|
channel_counts[channel_name] += 1
|
|
else:
|
|
logger.warning(f"Failed to add message {message_id} to ChromaDB")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing message {message_id}: {e}")
|
|
|
|
# Print channel statistics
|
|
if channel_counts:
|
|
logger.info("Messages synced by channel:")
|
|
for channel, count in sorted(channel_counts.items()):
|
|
logger.info(f"- {channel}: {count} messages")
|
|
|
|
# Return statistics
|
|
return {
|
|
'total_messages': total_messages,
|
|
'total_synced': total_synced,
|
|
'already_exists': already_exists,
|
|
'channel_counts': channel_counts
|
|
}
|
|
|
|
def main():
|
|
"""Main entry point."""
|
|
try:
|
|
# Create the Flask app (needed for context)
|
|
app = create_app()
|
|
|
|
with app.app_context():
|
|
print("\n====================================================")
|
|
print("STARTING UNRESTRICTED SYNC OF ALL ZULIP MESSAGES")
|
|
print(f"Started at: {datetime.now()}")
|
|
print("====================================================\n")
|
|
|
|
# Sync all messages
|
|
start_time = datetime.now()
|
|
stats = sync_all_messages()
|
|
end_time = datetime.now()
|
|
duration = end_time - start_time
|
|
|
|
# Print summary
|
|
print("\n====================================================")
|
|
print("SYNC COMPLETE")
|
|
print(f"Started at: {start_time}")
|
|
print(f"Completed at: {end_time}")
|
|
print(f"Duration: {duration}")
|
|
print(f"Total messages in Zulip: {stats['total_messages']}")
|
|
print(f"Total messages synced: {stats['total_synced']}")
|
|
print(f"Messages already in ChromaDB: {stats['already_exists']}")
|
|
|
|
# Print channel counts
|
|
if stats['channel_counts']:
|
|
print("\nMessages synced by channel:")
|
|
for channel, count in sorted(stats['channel_counts'].items()):
|
|
print(f"- {channel}: {count} messages")
|
|
|
|
print("====================================================\n")
|
|
|
|
except KeyboardInterrupt:
|
|
print("\nSync process interrupted by user")
|
|
logger.info("Sync process interrupted by user")
|
|
except Exception as e:
|
|
print(f"\nError during sync: {e}")
|
|
logger.error(f"Error during sync: {e}")
|
|
|
|
if __name__ == "__main__":
|
|
main() |