255 lines
10 KiB
Python
255 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Report Generator
|
|
|
|
Generates vendor reports directly from preprocessed Excel data.
|
|
"""
|
|
|
|
import json
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import Optional, Dict, List
|
|
from zoneinfo import ZoneInfo
|
|
|
|
from data_preprocessor import preprocess_excel_files
|
|
from models import FullReport, VendorMetrics, VendorUpdates24h, PunchlistItem
|
|
from html_generator import generate_html_report
|
|
|
|
|
|
def convert_item_to_punchlist_item(item: Dict) -> PunchlistItem:
|
|
"""Convert preprocessed item dict to PunchlistItem Pydantic model."""
|
|
return PunchlistItem(
|
|
punchlist_name=item.get('punchlist_name', ''),
|
|
description=item.get('description') or None,
|
|
priority=item.get('priority') or None,
|
|
date_identified=item.get('date_identified_str') or None,
|
|
date_completed=item.get('date_completed_str') or None,
|
|
status=item.get('status', 'Incomplete'),
|
|
status_updates=item.get('status_updates') or None,
|
|
issue_image=item.get('issue_image') or None,
|
|
age_days=item.get('age_days')
|
|
)
|
|
|
|
|
|
def generate_report(
|
|
reports_dir: str = "reports",
|
|
output_file: Optional[str] = None,
|
|
verbose: bool = True
|
|
) -> dict:
|
|
"""
|
|
Generate vendor report directly from preprocessed data.
|
|
|
|
Args:
|
|
reports_dir: Directory containing Excel files
|
|
output_file: Optional path to save JSON output
|
|
verbose: Whether to print progress messages
|
|
|
|
Returns:
|
|
Dictionary containing the generated report
|
|
"""
|
|
if verbose:
|
|
print("=" * 70)
|
|
print("REPORT GENERATION")
|
|
print("=" * 70)
|
|
print(f"Loading and preprocessing Excel files from '{reports_dir}'...")
|
|
|
|
# Preprocess Excel files using Baltimore/Eastern timezone
|
|
baltimore_tz = ZoneInfo("America/New_York")
|
|
current_date_baltimore = datetime.now(baltimore_tz)
|
|
if verbose:
|
|
print(f"Using Baltimore/Eastern timezone (America/New_York) for 24h calculations")
|
|
print(f"Current time: {current_date_baltimore.strftime('%Y-%m-%d %H:%M:%S %Z')}")
|
|
preprocessed_data, summary = preprocess_excel_files(reports_dir, current_date=current_date_baltimore)
|
|
|
|
if not summary:
|
|
print(f"Error: No data processed")
|
|
return {}
|
|
|
|
# Save preprocessed data for inspection
|
|
preprocessed_output_path = Path("output/preprocessed_data.txt")
|
|
preprocessed_output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(preprocessed_output_path, 'w', encoding='utf-8') as f:
|
|
f.write(preprocessed_data)
|
|
|
|
if verbose:
|
|
total_items = sum(len(v['items']) for v in summary.values())
|
|
print(f"✓ Processed {total_items} items from {len(summary)} vendors")
|
|
print(f"✓ Preprocessed data saved to: {preprocessed_output_path}")
|
|
print("Generating report directly from preprocessed data...")
|
|
|
|
# Build vendors list
|
|
vendors = []
|
|
|
|
for vendor_name, vendor_data in sorted(summary.items()):
|
|
# Build 24-hour updates
|
|
updates_24h = VendorUpdates24h(
|
|
added=[convert_item_to_punchlist_item(item) for item in vendor_data['recent_added']],
|
|
closed=[convert_item_to_punchlist_item(item) for item in vendor_data['recent_closed']],
|
|
changed_to_monitor=[convert_item_to_punchlist_item(item) for item in vendor_data['recent_monitor']]
|
|
)
|
|
|
|
# Get oldest 3 unaddressed (already sorted)
|
|
oldest_unaddressed = [
|
|
convert_item_to_punchlist_item(item)
|
|
for item in vendor_data['unaddressed'][:3]
|
|
]
|
|
|
|
# Get critical priority items
|
|
critical_items = [
|
|
convert_item_to_punchlist_item(item)
|
|
for item in vendor_data['critical']
|
|
]
|
|
|
|
# Get high priority items
|
|
high_items = [
|
|
convert_item_to_punchlist_item(item)
|
|
for item in vendor_data['high']
|
|
]
|
|
|
|
# Get all items grouped by status for tabs
|
|
all_items = vendor_data.get('items', [])
|
|
# Use status field - preprocessor sets status to 'Complete', 'Monitor', or 'Incomplete'
|
|
# Also check is_closed flag as backup
|
|
closed_items = [convert_item_to_punchlist_item(item) for item in all_items
|
|
if item.get('status', '').lower() == 'complete' or item.get('is_closed', False)]
|
|
monitor_items = [convert_item_to_punchlist_item(item) for item in all_items
|
|
if item.get('status', '').lower() == 'monitor']
|
|
incomplete_items = [convert_item_to_punchlist_item(item) for item in all_items
|
|
if item.get('status', '').lower() == 'incomplete' and not item.get('is_closed', False)]
|
|
# Open items = all non-closed items (Monitor + Incomplete)
|
|
open_items = [convert_item_to_punchlist_item(item) for item in all_items
|
|
if not item.get('is_closed', False)]
|
|
|
|
# Calculate counts
|
|
incomplete_count = len(incomplete_items)
|
|
# Open count = all non-closed items (Monitor + Incomplete)
|
|
open_count = len(open_items)
|
|
|
|
# Create vendor metrics
|
|
vendor_metrics = VendorMetrics(
|
|
vendor_name=vendor_name,
|
|
total_items=len(vendor_data['items']),
|
|
closed_count=vendor_data['closed'],
|
|
open_count=open_count, # Use incomplete_count for consistency
|
|
monitor_count=vendor_data['monitor'],
|
|
updates_24h=updates_24h,
|
|
oldest_unaddressed=oldest_unaddressed,
|
|
critical_priority_items=critical_items,
|
|
high_priority_items=high_items
|
|
)
|
|
|
|
# Add status-grouped items to vendor metrics (will be serialized to dict)
|
|
vendor_dict = vendor_metrics.model_dump()
|
|
# Convert Pydantic models to dicts
|
|
vendor_dict['closed_items'] = [item.model_dump() for item in closed_items]
|
|
vendor_dict['monitor_items'] = [item.model_dump() for item in monitor_items]
|
|
vendor_dict['open_items'] = [item.model_dump() for item in open_items]
|
|
vendor_dict['incomplete_items'] = [item.model_dump() for item in incomplete_items]
|
|
vendor_dict['incomplete_count'] = incomplete_count
|
|
|
|
vendors.append(vendor_dict)
|
|
|
|
# Create full report
|
|
report = FullReport(
|
|
report_generated_at=datetime.now().isoformat(),
|
|
vendors=vendors,
|
|
summary={
|
|
"total_vendors": len(vendors),
|
|
"total_items": sum(v.get('total_items', 0) if isinstance(v, dict) else v.total_items for v in vendors),
|
|
"total_closed": sum(v.get('closed_count', 0) if isinstance(v, dict) else v.closed_count for v in vendors),
|
|
"total_open": sum(v.get('open_count', 0) if isinstance(v, dict) else v.open_count for v in vendors),
|
|
"total_monitor": sum(v.get('monitor_count', 0) if isinstance(v, dict) else v.monitor_count for v in vendors),
|
|
"total_incomplete": sum(v.get('incomplete_count', 0) if isinstance(v, dict) else 0 for v in vendors)
|
|
}
|
|
)
|
|
|
|
# Convert to dict - vendors already have closed_items, monitor_items, open_items from above
|
|
report_data = report.model_dump()
|
|
|
|
# Restore the status-grouped items that Pydantic might have stripped
|
|
# (FullReport validation may have removed extra fields from vendors)
|
|
for i, vendor_dict in enumerate(vendors):
|
|
if isinstance(vendor_dict, dict):
|
|
# Ensure status-grouped items are preserved
|
|
if 'closed_items' in vendor_dict:
|
|
report_data['vendors'][i]['closed_items'] = vendor_dict['closed_items']
|
|
if 'monitor_items' in vendor_dict:
|
|
report_data['vendors'][i]['monitor_items'] = vendor_dict['monitor_items']
|
|
if 'open_items' in vendor_dict:
|
|
report_data['vendors'][i]['open_items'] = vendor_dict['open_items']
|
|
if 'incomplete_items' in vendor_dict:
|
|
report_data['vendors'][i]['incomplete_items'] = vendor_dict['incomplete_items']
|
|
if 'incomplete_count' in vendor_dict:
|
|
report_data['vendors'][i]['incomplete_count'] = vendor_dict['incomplete_count']
|
|
|
|
# Save to file if specified
|
|
if output_file:
|
|
output_path = Path(output_file)
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|
json.dump(report_data, f, indent=2, ensure_ascii=False)
|
|
|
|
if verbose:
|
|
file_size = output_path.stat().st_size / 1024
|
|
print(f"✓ JSON report saved to: {output_path} ({file_size:.1f} KB)")
|
|
|
|
# Generate HTML report
|
|
if verbose:
|
|
print("Generating HTML report...")
|
|
|
|
html_path = generate_html_report(str(output_path))
|
|
|
|
if verbose:
|
|
html_size = Path(html_path).stat().st_size / 1024
|
|
print(f"✓ HTML report saved to: {html_path} ({html_size:.1f} KB)")
|
|
|
|
if verbose:
|
|
print()
|
|
print("=" * 70)
|
|
print("✓ Report generated successfully!")
|
|
print(f" Vendors: {len(vendors)}")
|
|
print(f" Total items: {report_data['summary']['total_items']}")
|
|
print("=" * 70)
|
|
|
|
return report_data
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="Generate vendor reports from Excel files")
|
|
parser.add_argument(
|
|
"--reports-dir",
|
|
type=str,
|
|
default="reports",
|
|
help="Directory containing Excel files (default: reports)"
|
|
)
|
|
parser.add_argument(
|
|
"--output",
|
|
type=str,
|
|
default="output/report.json",
|
|
help="Output JSON file path (default: output/report.json)"
|
|
)
|
|
parser.add_argument(
|
|
"--verbose",
|
|
action="store_true",
|
|
default=True,
|
|
help="Print verbose output"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
report = generate_report(
|
|
reports_dir=args.reports_dir,
|
|
output_file=args.output,
|
|
verbose=args.verbose
|
|
)
|
|
|
|
if report and "error" not in report:
|
|
print("\n✓ Report generation complete!")
|
|
else:
|
|
print("\n✗ Report generation failed.")
|
|
import sys
|
|
sys.exit(1)
|
|
|