#!/usr/bin/env python3 """ Report Generator Generates vendor reports directly from preprocessed Excel data. """ import json from pathlib import Path from datetime import datetime from typing import Optional, Dict, List from zoneinfo import ZoneInfo from data_preprocessor import preprocess_excel_files from models import FullReport, VendorMetrics, VendorUpdates24h, PunchlistItem from html_generator import generate_html_report def convert_item_to_punchlist_item(item: Dict) -> PunchlistItem: """Convert preprocessed item dict to PunchlistItem Pydantic model.""" return PunchlistItem( punchlist_name=item.get('punchlist_name', ''), description=item.get('description') or None, priority=item.get('priority') or None, date_identified=item.get('date_identified_str') or None, date_completed=item.get('date_completed_str') or None, status=item.get('status', 'Incomplete'), status_updates=item.get('status_updates') or None, issue_image=item.get('issue_image') or None, age_days=item.get('age_days') ) def generate_report( reports_dir: str = "reports", output_file: Optional[str] = None, verbose: bool = True ) -> dict: """ Generate vendor report directly from preprocessed data. Args: reports_dir: Directory containing Excel files output_file: Optional path to save JSON output verbose: Whether to print progress messages Returns: Dictionary containing the generated report """ if verbose: print("=" * 70) print("REPORT GENERATION") print("=" * 70) print(f"Loading and preprocessing Excel files from '{reports_dir}'...") # Preprocess Excel files using Baltimore/Eastern timezone baltimore_tz = ZoneInfo("America/New_York") current_date_baltimore = datetime.now(baltimore_tz) if verbose: print(f"Using Baltimore/Eastern timezone (America/New_York) for 24h calculations") print(f"Current time: {current_date_baltimore.strftime('%Y-%m-%d %H:%M:%S %Z')}") preprocessed_data, summary = preprocess_excel_files(reports_dir, current_date=current_date_baltimore) if not summary: print(f"Error: No data processed") return {} # Save preprocessed data for inspection preprocessed_output_path = Path("output/preprocessed_data.txt") preprocessed_output_path.parent.mkdir(parents=True, exist_ok=True) with open(preprocessed_output_path, 'w', encoding='utf-8') as f: f.write(preprocessed_data) if verbose: total_items = sum(len(v['items']) for v in summary.values()) print(f"✓ Processed {total_items} items from {len(summary)} vendors") print(f"✓ Preprocessed data saved to: {preprocessed_output_path}") print("Generating report directly from preprocessed data...") # Build vendors list vendors = [] for vendor_name, vendor_data in sorted(summary.items()): # Build 24-hour updates updates_24h = VendorUpdates24h( added=[convert_item_to_punchlist_item(item) for item in vendor_data['recent_added']], closed=[convert_item_to_punchlist_item(item) for item in vendor_data['recent_closed']], changed_to_monitor=[convert_item_to_punchlist_item(item) for item in vendor_data['recent_monitor']] ) # Get oldest 3 unaddressed (already sorted) oldest_unaddressed = [ convert_item_to_punchlist_item(item) for item in vendor_data['unaddressed'][:3] ] # Get critical priority items critical_items = [ convert_item_to_punchlist_item(item) for item in vendor_data['critical'] ] # Get high priority items high_items = [ convert_item_to_punchlist_item(item) for item in vendor_data['high'] ] # Get all items grouped by status for tabs all_items = vendor_data.get('items', []) # Use status field - preprocessor sets status to 'Complete', 'Monitor', or 'Incomplete' # Also check is_closed flag as backup closed_items = [convert_item_to_punchlist_item(item) for item in all_items if item.get('status', '').lower() == 'complete' or item.get('is_closed', False)] monitor_items = [convert_item_to_punchlist_item(item) for item in all_items if item.get('status', '').lower() == 'monitor'] incomplete_items = [convert_item_to_punchlist_item(item) for item in all_items if item.get('status', '').lower() == 'incomplete' and not item.get('is_closed', False)] # Open items = all non-closed items (Monitor + Incomplete) open_items = [convert_item_to_punchlist_item(item) for item in all_items if not item.get('is_closed', False)] # Calculate counts incomplete_count = len(incomplete_items) # Open count = all non-closed items (Monitor + Incomplete) open_count = len(open_items) # Create vendor metrics vendor_metrics = VendorMetrics( vendor_name=vendor_name, total_items=len(vendor_data['items']), closed_count=vendor_data['closed'], open_count=open_count, # Use incomplete_count for consistency monitor_count=vendor_data['monitor'], updates_24h=updates_24h, oldest_unaddressed=oldest_unaddressed, critical_priority_items=critical_items, high_priority_items=high_items ) # Add status-grouped items to vendor metrics (will be serialized to dict) vendor_dict = vendor_metrics.model_dump() # Convert Pydantic models to dicts vendor_dict['closed_items'] = [item.model_dump() for item in closed_items] vendor_dict['monitor_items'] = [item.model_dump() for item in monitor_items] vendor_dict['open_items'] = [item.model_dump() for item in open_items] vendor_dict['incomplete_items'] = [item.model_dump() for item in incomplete_items] vendor_dict['incomplete_count'] = incomplete_count vendors.append(vendor_dict) # Create full report report = FullReport( report_generated_at=datetime.now().isoformat(), vendors=vendors, summary={ "total_vendors": len(vendors), "total_items": sum(v.get('total_items', 0) if isinstance(v, dict) else v.total_items for v in vendors), "total_closed": sum(v.get('closed_count', 0) if isinstance(v, dict) else v.closed_count for v in vendors), "total_open": sum(v.get('open_count', 0) if isinstance(v, dict) else v.open_count for v in vendors), "total_monitor": sum(v.get('monitor_count', 0) if isinstance(v, dict) else v.monitor_count for v in vendors), "total_incomplete": sum(v.get('incomplete_count', 0) if isinstance(v, dict) else 0 for v in vendors) } ) # Convert to dict - vendors already have closed_items, monitor_items, open_items from above report_data = report.model_dump() # Restore the status-grouped items that Pydantic might have stripped # (FullReport validation may have removed extra fields from vendors) for i, vendor_dict in enumerate(vendors): if isinstance(vendor_dict, dict): # Ensure status-grouped items are preserved if 'closed_items' in vendor_dict: report_data['vendors'][i]['closed_items'] = vendor_dict['closed_items'] if 'monitor_items' in vendor_dict: report_data['vendors'][i]['monitor_items'] = vendor_dict['monitor_items'] if 'open_items' in vendor_dict: report_data['vendors'][i]['open_items'] = vendor_dict['open_items'] if 'incomplete_items' in vendor_dict: report_data['vendors'][i]['incomplete_items'] = vendor_dict['incomplete_items'] if 'incomplete_count' in vendor_dict: report_data['vendors'][i]['incomplete_count'] = vendor_dict['incomplete_count'] # Save to file if specified if output_file: output_path = Path(output_file) output_path.parent.mkdir(parents=True, exist_ok=True) with open(output_path, 'w', encoding='utf-8') as f: json.dump(report_data, f, indent=2, ensure_ascii=False) if verbose: file_size = output_path.stat().st_size / 1024 print(f"✓ JSON report saved to: {output_path} ({file_size:.1f} KB)") # Generate HTML report if verbose: print("Generating HTML report...") html_path = generate_html_report(str(output_path)) if verbose: html_size = Path(html_path).stat().st_size / 1024 print(f"✓ HTML report saved to: {html_path} ({html_size:.1f} KB)") if verbose: print() print("=" * 70) print("✓ Report generated successfully!") print(f" Vendors: {len(vendors)}") print(f" Total items: {report_data['summary']['total_items']}") print("=" * 70) return report_data if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Generate vendor reports from Excel files") parser.add_argument( "--reports-dir", type=str, default="reports", help="Directory containing Excel files (default: reports)" ) parser.add_argument( "--output", type=str, default="output/report.json", help="Output JSON file path (default: output/report.json)" ) parser.add_argument( "--verbose", action="store_true", default=True, help="Print verbose output" ) args = parser.parse_args() report = generate_report( reports_dir=args.reports_dir, output_file=args.output, verbose=args.verbose ) if report and "error" not in report: print("\n✓ Report generation complete!") else: print("\n✗ Report generation failed.") import sys sys.exit(1)