vendor_report/scheduler.py
2025-11-06 20:50:19 +04:00

174 lines
6.6 KiB
Python

#!/usr/bin/env python3
"""
Report Scheduler
Schedules automatic report generation with optional SharePoint downloads.
"""
import logging
from datetime import datetime
from typing import Optional
from pathlib import Path
try:
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
SCHEDULER_AVAILABLE = True
except ImportError:
SCHEDULER_AVAILABLE = False
logging.warning("APScheduler not installed. Scheduling features disabled.")
from config import load_config
from report_generator import generate_report
from sharepoint_downloader import download_from_sharepoint
logger = logging.getLogger(__name__)
class ReportScheduler:
"""Manages scheduled report generation."""
def __init__(self, config_path: Optional[str] = None):
"""
Initialize scheduler.
Args:
config_path: Path to configuration file
"""
if not SCHEDULER_AVAILABLE:
raise ImportError(
"APScheduler is required for scheduling. "
"Install it with: pip install apscheduler"
)
self.config = load_config(config_path)
self.scheduler = BlockingScheduler(timezone=self.config['scheduler']['timezone'])
self.scheduler_config = self.config['scheduler']
self.sharepoint_config = self.config.get('sharepoint', {})
self.report_config = self.config.get('report', {})
def generate_report_job(self):
"""Job function to generate report."""
logger.info("=" * 70)
logger.info("SCHEDULED REPORT GENERATION")
logger.info("=" * 70)
logger.info(f"Started at: {datetime.now()}")
try:
# Download from SharePoint if enabled
if self.sharepoint_config.get('enabled'):
logger.info("Downloading files from SharePoint...")
try:
downloaded = download_from_sharepoint(
site_url=self.sharepoint_config['site_url'],
folder_path=self.sharepoint_config.get('folder_path'),
file_path=self.sharepoint_config.get('file_path'),
local_dir=self.sharepoint_config.get('local_dir', 'reports'),
username=self.sharepoint_config.get('username'),
password=self.sharepoint_config.get('password'),
client_id=self.sharepoint_config.get('client_id'),
client_secret=self.sharepoint_config.get('client_secret'),
use_app_authentication=self.sharepoint_config.get('use_app_authentication', False),
file_pattern=self.sharepoint_config.get('file_pattern'),
overwrite=self.sharepoint_config.get('overwrite', True)
)
logger.info(f"Downloaded {len(downloaded)} file(s) from SharePoint")
except Exception as e:
logger.error(f"Failed to download from SharePoint: {e}")
# Continue with report generation even if download fails
# Generate report
logger.info("Generating report...")
reports_dir = self.report_config.get('reports_dir', 'reports')
output_file = Path(self.report_config.get('output_dir', 'output')) / 'report.json'
report_data = generate_report(
reports_dir=reports_dir,
output_file=str(output_file),
verbose=True
)
if report_data:
logger.info("✓ Scheduled report generation completed successfully")
else:
logger.error("✗ Scheduled report generation failed")
except Exception as e:
logger.error(f"Error in scheduled report generation: {e}", exc_info=True)
def start(self):
"""Start the scheduler."""
if not self.scheduler_config.get('enabled'):
logger.warning("Scheduler is disabled in configuration")
return
schedule_type = self.scheduler_config.get('schedule_type', 'interval')
if schedule_type == 'interval':
# Schedule at regular intervals
interval_hours = self.scheduler_config.get('interval_hours', 24)
trigger = IntervalTrigger(hours=interval_hours)
logger.info(f"Scheduling reports every {interval_hours} hours")
elif schedule_type == 'cron':
# Schedule using cron expression
cron_expression = self.scheduler_config.get('cron_expression', '0 8 * * *')
# Parse cron expression (format: "minute hour day month day_of_week")
parts = cron_expression.split()
if len(parts) == 5:
trigger = CronTrigger(
minute=parts[0],
hour=parts[1],
day=parts[2],
month=parts[3],
day_of_week=parts[4]
)
else:
logger.error(f"Invalid cron expression: {cron_expression}")
return
logger.info(f"Scheduling reports with cron: {cron_expression}")
elif schedule_type == 'once':
# Run once at a specific time
# For "once", you'd typically use DateTrigger, but for simplicity,
# we'll just run it immediately
logger.info("Running report generation once (immediately)")
self.generate_report_job()
return
else:
logger.error(f"Unknown schedule type: {schedule_type}")
return
# Add job to scheduler
self.scheduler.add_job(
self.generate_report_job,
trigger=trigger,
id='generate_report',
name='Generate Vendor Report',
replace_existing=True
)
logger.info("Scheduler started. Press Ctrl+C to stop.")
try:
self.scheduler.start()
except KeyboardInterrupt:
logger.info("Scheduler stopped by user")
if __name__ == "__main__":
import sys
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
config_path = sys.argv[1] if len(sys.argv) > 1 else None
scheduler = ReportScheduler(config_path=config_path)
scheduler.start()