141 lines
5.6 KiB
Python
Executable File
141 lines
5.6 KiB
Python
Executable File
#!/usr/bin/env python
|
|
"""
|
|
Script to manually sync messages from Zulip to ChromaDB.
|
|
This can be run standalone or as a scheduled cron job.
|
|
"""
|
|
import os
|
|
import sys
|
|
import argparse
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
|
|
# Add the current directory to the path so we can import the app module
|
|
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
# Apply NumPy compatibility patch for ChromaDB
|
|
from app.utils import patch_chromadb_numpy
|
|
patch_chromadb_numpy()
|
|
|
|
from app import create_app
|
|
from app.utils.sync_service import MessageSyncService
|
|
from app.db.zulip_service import ZulipDatabaseService
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
|
)
|
|
logger = logging.getLogger("sync_messages")
|
|
|
|
def sync_messages(days=None, force=False, batch_size=200, max_messages=5000):
|
|
"""
|
|
Sync messages from Zulip to ChromaDB.
|
|
|
|
Args:
|
|
days (int): Number of days to look back for messages (default: use sync state)
|
|
force (bool): Whether to force sync all messages from the lookback period
|
|
batch_size (int): Number of messages to process in each batch
|
|
max_messages (int): Maximum total number of messages to sync
|
|
"""
|
|
# Create the Flask app
|
|
app = create_app()
|
|
|
|
with app.app_context():
|
|
sync_service = MessageSyncService()
|
|
|
|
if force and days:
|
|
# If force syncing for a specific number of days, reset the sync state
|
|
sync_service.last_sync_time = datetime.now() - timedelta(days=days)
|
|
sync_service.last_message_id = None
|
|
logger.info(f"Force syncing messages from the last {days} days")
|
|
|
|
# Count total messages to sync
|
|
if force:
|
|
# Query to get message count
|
|
since_date = datetime.now() - timedelta(days=days if days else 30)
|
|
all_messages = ZulipDatabaseService.get_messages_from_it_channels(
|
|
since=since_date if since_date else None,
|
|
limit=5000
|
|
)
|
|
total_messages = len(all_messages)
|
|
logger.info(f"Found a total of {total_messages} messages to sync")
|
|
|
|
# Run multiple batches of sync
|
|
total_synced = 0
|
|
batch_count = 0
|
|
|
|
# In force mode, we need to manually run multiple batches
|
|
if force:
|
|
while total_synced < min(total_messages, max_messages):
|
|
# Manual sync with our custom batch size
|
|
logger.info(f"Running batch {batch_count+1}, synced {total_synced} messages so far")
|
|
|
|
# For first batch, we already reset the sync state above
|
|
# For subsequent batches, we'll use the last_message_id that was set
|
|
|
|
# Run the sync
|
|
sync_service._set_batch_size(batch_size)
|
|
sync_service.sync_now()
|
|
|
|
# Update counters
|
|
batch_count += 1
|
|
|
|
# Check how many we've synced by looking at highest message ID
|
|
if sync_service.last_message_id:
|
|
# We've synced up to this message ID
|
|
synced_in_batch = ZulipDatabaseService.count_messages_up_to_id(
|
|
sync_service.last_message_id,
|
|
since=since_date if since_date else None
|
|
)
|
|
|
|
# Update total (use max to ensure we don't decrease if count is wrong)
|
|
total_synced = max(total_synced, synced_in_batch)
|
|
|
|
logger.info(f"Processed {synced_in_batch} messages out of {total_messages}")
|
|
|
|
# If we've synced all messages or reached our limit, break
|
|
if synced_in_batch >= total_messages or synced_in_batch >= max_messages:
|
|
break
|
|
else:
|
|
# If no message ID was set, something went wrong
|
|
logger.warning("No message ID set after sync, may not have found any messages")
|
|
break
|
|
else:
|
|
# Just run a single sync with default settings
|
|
sync_service.sync_now()
|
|
|
|
# Get the stats
|
|
stats = {
|
|
'last_sync_time': sync_service.last_sync_time,
|
|
'last_message_id': sync_service.last_message_id,
|
|
'total_synced': total_synced,
|
|
'batches': batch_count
|
|
}
|
|
|
|
logger.info(f"Sync completed. Current state: {stats}")
|
|
|
|
return stats
|
|
|
|
if __name__ == "__main__":
|
|
# Parse command line arguments
|
|
parser = argparse.ArgumentParser(description="Sync messages from Zulip to ChromaDB")
|
|
parser.add_argument("--days", type=int, help="Number of days to look back for messages")
|
|
parser.add_argument("--force", action="store_true", help="Force sync all messages from the lookback period")
|
|
parser.add_argument("--batch-size", type=int, default=200, help="Number of messages to process in each batch")
|
|
parser.add_argument("--max-messages", type=int, default=5000, help="Maximum total number of messages to sync")
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Sync messages
|
|
stats = sync_messages(
|
|
days=args.days,
|
|
force=args.force,
|
|
batch_size=args.batch_size,
|
|
max_messages=args.max_messages
|
|
)
|
|
|
|
print(f"\nSync completed at {datetime.now()}")
|
|
print(f"Last sync time: {stats['last_sync_time']}")
|
|
print(f"Last message ID: {stats['last_message_id']}")
|
|
print(f"Total messages synced: {stats['total_synced']}")
|
|
print(f"Number of batches: {stats['batches']}") |