vendor_report/scheduler.py

247 lines
9.9 KiB
Python

#!/usr/bin/env python3
"""
Report Scheduler
Schedules automatic report generation with optional SharePoint downloads.
"""
import logging
from datetime import datetime
from typing import Optional
from pathlib import Path
try:
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
SCHEDULER_AVAILABLE = True
except ImportError:
SCHEDULER_AVAILABLE = False
logging.warning("APScheduler not installed. Scheduling features disabled.")
from config import load_config
from report_generator import generate_report
from sharepoint_downloader import download_from_sharepoint
logger = logging.getLogger(__name__)
# Cleanup function (duplicated from api_server to avoid circular import)
def cleanup_old_reports(output_dir: Path, reports_dir: Path, max_reports: int = 10):
"""
Cleanup old reports and Excel files, keeping only the last max_reports.
Args:
output_dir: Directory containing report HTML/JSON files
reports_dir: Directory containing Excel files
max_reports: Maximum number of reports to keep
"""
try:
# Get all report HTML files sorted by modification time (newest first)
html_files = sorted(output_dir.glob('report-*.html'), key=lambda p: p.stat().st_mtime, reverse=True)
if len(html_files) <= max_reports:
return # No cleanup needed
# Get reports to delete (oldest ones)
reports_to_delete = html_files[max_reports:]
deleted_count = 0
for html_file in reports_to_delete:
report_id = html_file.stem
# Delete HTML file
try:
html_file.unlink()
logger.info(f"Deleted old report HTML: {html_file.name}")
deleted_count += 1
except Exception as e:
logger.warning(f"Failed to delete {html_file.name}: {e}")
# Delete corresponding JSON file
json_file = output_dir / f"{report_id}.json"
if json_file.exists():
try:
json_file.unlink()
logger.info(f"Deleted old report JSON: {json_file.name}")
except Exception as e:
logger.warning(f"Failed to delete {json_file.name}: {e}")
if deleted_count > 0:
logger.info(f"Cleanup completed: deleted {deleted_count} old report(s)")
except Exception as e:
logger.error(f"Error during cleanup: {e}", exc_info=True)
class ReportScheduler:
"""Manages scheduled report generation."""
def __init__(self, config_path: Optional[str] = None):
"""
Initialize scheduler.
Args:
config_path: Path to configuration file
"""
if not SCHEDULER_AVAILABLE:
raise ImportError(
"APScheduler is required for scheduling. "
"Install it with: pip install apscheduler"
)
self.config = load_config(config_path)
scheduler_timezone = self.config['scheduler'].get('timezone', 'America/New_York')
# Use BackgroundScheduler for thread compatibility (when run from API server)
# Use BlockingScheduler when run standalone
self.use_background = True # Set to False if running standalone
if self.use_background:
self.scheduler = BackgroundScheduler(timezone=scheduler_timezone)
else:
self.scheduler = BlockingScheduler(timezone=scheduler_timezone)
self.scheduler_config = self.config['scheduler']
self.sharepoint_config = self.config.get('sharepoint', {})
self.report_config = self.config.get('report', {})
def generate_report_job(self):
"""Job function to generate report."""
logger.info("=" * 70)
logger.info("SCHEDULED REPORT GENERATION")
logger.info("=" * 70)
logger.info(f"Started at: {datetime.now()}")
try:
# Download from SharePoint if enabled
if self.sharepoint_config.get('enabled'):
logger.info("Downloading files from SharePoint...")
try:
downloaded = download_from_sharepoint(
site_url=self.sharepoint_config['site_url'],
folder_path=self.sharepoint_config.get('folder_path'),
file_path=self.sharepoint_config.get('file_path'),
local_dir=self.sharepoint_config.get('local_dir', 'reports'),
tenant_id=self.sharepoint_config.get('tenant_id'),
client_id=self.sharepoint_config.get('client_id'),
client_secret=self.sharepoint_config.get('client_secret'),
use_app_authentication=self.sharepoint_config.get('use_app_authentication', True),
file_pattern=self.sharepoint_config.get('file_pattern'),
overwrite=self.sharepoint_config.get('overwrite', True)
)
logger.info(f"Downloaded {len(downloaded)} file(s) from SharePoint")
except Exception as e:
logger.error(f"Failed to download from SharePoint: {e}")
# Continue with report generation even if download fails
# Generate report with timestamp
logger.info("Generating report...")
reports_dir = self.report_config.get('reports_dir', 'reports')
output_dir = Path(self.report_config.get('output_dir', 'output'))
# Create timestamped filename (same format as API server)
timestamp = datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
report_id = f"report-{timestamp}"
output_file = output_dir / f"{report_id}.json"
report_data = generate_report(
reports_dir=reports_dir,
output_file=str(output_file),
verbose=True
)
if report_data:
logger.info("✓ Scheduled report generation completed successfully")
# Cleanup old reports (keep last 10)
try:
cleanup_old_reports(output_dir, Path(reports_dir), max_reports=10)
except Exception as e:
logger.warning(f"Failed to cleanup old reports: {e}")
else:
logger.error("✗ Scheduled report generation failed")
except Exception as e:
logger.error(f"Error in scheduled report generation: {e}", exc_info=True)
def start(self):
"""Start the scheduler."""
if not self.scheduler_config.get('enabled'):
logger.warning("Scheduler is disabled in configuration")
return
schedule_type = self.scheduler_config.get('schedule_type', 'interval')
if schedule_type == 'interval':
# Schedule at regular intervals
interval_hours = self.scheduler_config.get('interval_hours', 24)
trigger = IntervalTrigger(hours=interval_hours)
logger.info(f"Scheduling reports every {interval_hours} hours")
elif schedule_type == 'cron':
# Schedule using cron expression
cron_expression = self.scheduler_config.get('cron_expression', '0 8 * * *')
# Parse cron expression (format: "minute hour day month day_of_week")
parts = cron_expression.split()
if len(parts) == 5:
trigger = CronTrigger(
minute=parts[0],
hour=parts[1],
day=parts[2],
month=parts[3],
day_of_week=parts[4]
)
else:
logger.error(f"Invalid cron expression: {cron_expression}")
return
logger.info(f"Scheduling reports with cron: {cron_expression}")
elif schedule_type == 'once':
# Run once at a specific time
# For "once", you'd typically use DateTrigger, but for simplicity,
# we'll just run it immediately
logger.info("Running report generation once (immediately)")
self.generate_report_job()
return
else:
logger.error(f"Unknown schedule type: {schedule_type}")
return
# Add job to scheduler
self.scheduler.add_job(
self.generate_report_job,
trigger=trigger,
id='generate_report',
name='Generate Vendor Report',
replace_existing=True
)
if self.use_background:
# BackgroundScheduler - just start it, don't block
self.scheduler.start()
logger.info("Scheduler started in background mode")
else:
# BlockingScheduler - block until interrupted
logger.info("Scheduler started. Press Ctrl+C to stop.")
try:
self.scheduler.start()
except KeyboardInterrupt:
logger.info("Scheduler stopped by user")
self.scheduler.shutdown()
if __name__ == "__main__":
import sys
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
config_path = sys.argv[1] if len(sys.argv) > 1 else None
scheduler = ReportScheduler(config_path=config_path)
scheduler.use_background = False # Use BlockingScheduler for standalone mode
scheduler.scheduler = BlockingScheduler(timezone=scheduler.config['scheduler'].get('timezone', 'America/New_York'))
scheduler.start()