vendor_report/report_generator.py
2025-11-05 22:40:20 +04:00

241 lines
9.3 KiB
Python

#!/usr/bin/env python3
"""
Report Generator - Direct Generation (No LLM Required)
Generates vendor reports directly from preprocessed Excel data.
All requirements fulfilled programmatically - no AI needed!
"""
import json
from pathlib import Path
from datetime import datetime
from typing import Optional, Dict, List
from zoneinfo import ZoneInfo
from data_preprocessor import preprocess_excel_files
from models import FullReport, VendorMetrics, VendorUpdates24h, PunchlistItem
from html_generator import generate_html_report
def convert_item_to_punchlist_item(item: Dict) -> PunchlistItem:
"""Convert preprocessed item dict to PunchlistItem Pydantic model."""
return PunchlistItem(
punchlist_name=item.get('punchlist_name', ''),
description=item.get('description') or None,
priority=item.get('priority') or None,
date_identified=item.get('date_identified_str') or None,
date_completed=item.get('date_completed_str') or None,
status=item.get('status', 'Incomplete'),
status_updates=item.get('status_updates') or None,
issue_image=item.get('issue_image') or None,
age_days=item.get('age_days')
)
def generate_report(
reports_dir: str = "reports",
output_file: Optional[str] = None,
verbose: bool = True
) -> dict:
"""
Generate vendor report directly from preprocessed data - NO LLM required!
Args:
reports_dir: Directory containing Excel files
output_file: Optional path to save JSON output
verbose: Whether to print progress messages
Returns:
Dictionary containing the generated report
"""
if verbose:
print("=" * 70)
print("DIRECT REPORT GENERATION (No LLM Required)")
print("=" * 70)
print(f"Loading and preprocessing Excel files from '{reports_dir}'...")
# Preprocess Excel files using Baltimore/Eastern timezone
baltimore_tz = ZoneInfo("America/New_York")
current_date_baltimore = datetime.now(baltimore_tz)
if verbose:
print(f"Using Baltimore/Eastern timezone (America/New_York) for 24h calculations")
print(f"Current time: {current_date_baltimore.strftime('%Y-%m-%d %H:%M:%S %Z')}")
preprocessed_data, summary = preprocess_excel_files(reports_dir, current_date=current_date_baltimore)
if not summary:
print(f"Error: No data processed")
return {}
# Save preprocessed data for inspection
preprocessed_output_path = Path("output/preprocessed_data.txt")
preprocessed_output_path.parent.mkdir(parents=True, exist_ok=True)
with open(preprocessed_output_path, 'w', encoding='utf-8') as f:
f.write(preprocessed_data)
if verbose:
total_items = sum(len(v['items']) for v in summary.values())
print(f"✓ Processed {total_items} items from {len(summary)} vendors")
print(f"✓ Preprocessed data saved to: {preprocessed_output_path}")
print("Generating report directly from preprocessed data...")
# Build vendors list
vendors = []
for vendor_name, vendor_data in sorted(summary.items()):
# Build 24-hour updates
updates_24h = VendorUpdates24h(
added=[convert_item_to_punchlist_item(item) for item in vendor_data['recent_added']],
closed=[convert_item_to_punchlist_item(item) for item in vendor_data['recent_closed']],
changed_to_monitor=[convert_item_to_punchlist_item(item) for item in vendor_data['recent_monitor']]
)
# Get oldest 3 unaddressed (already sorted)
oldest_unaddressed = [
convert_item_to_punchlist_item(item)
for item in vendor_data['unaddressed'][:3]
]
# Get very high priority items
very_high_items = [
convert_item_to_punchlist_item(item)
for item in vendor_data['very_high']
]
# Get high priority items
high_items = [
convert_item_to_punchlist_item(item)
for item in vendor_data['high']
]
# Get all items grouped by status for tabs
all_items = vendor_data.get('items', [])
# Use status field - preprocessor sets status to 'Complete', 'Monitor', or 'Incomplete'
# Also check is_closed flag as backup
closed_items = [convert_item_to_punchlist_item(item) for item in all_items
if item.get('status', '').lower() == 'complete' or item.get('is_closed', False)]
monitor_items = [convert_item_to_punchlist_item(item) for item in all_items
if item.get('status', '').lower() == 'monitor']
open_items = [convert_item_to_punchlist_item(item) for item in all_items
if item.get('status', '').lower() == 'incomplete' and not item.get('is_closed', False)]
# Create vendor metrics
vendor_metrics = VendorMetrics(
vendor_name=vendor_name,
total_items=len(vendor_data['items']),
closed_count=vendor_data['closed'],
open_count=vendor_data['open'],
monitor_count=vendor_data['monitor'],
updates_24h=updates_24h,
oldest_unaddressed=oldest_unaddressed,
very_high_priority_items=very_high_items,
high_priority_items=high_items
)
# Add status-grouped items to vendor metrics (will be serialized to dict)
vendor_dict = vendor_metrics.model_dump()
# Convert Pydantic models to dicts
vendor_dict['closed_items'] = [item.model_dump() for item in closed_items]
vendor_dict['monitor_items'] = [item.model_dump() for item in monitor_items]
vendor_dict['open_items'] = [item.model_dump() for item in open_items]
vendors.append(vendor_dict)
# Create full report
report = FullReport(
report_generated_at=datetime.now().isoformat(),
vendors=vendors,
summary={
"total_vendors": len(vendors),
"total_items": sum(v.get('total_items', 0) if isinstance(v, dict) else v.total_items for v in vendors),
"total_closed": sum(v.get('closed_count', 0) if isinstance(v, dict) else v.closed_count for v in vendors),
"total_open": sum(v.get('open_count', 0) if isinstance(v, dict) else v.open_count for v in vendors),
"total_monitor": sum(v.get('monitor_count', 0) if isinstance(v, dict) else v.monitor_count for v in vendors)
}
)
# Convert to dict - vendors already have closed_items, monitor_items, open_items from above
report_data = report.model_dump()
# Restore the status-grouped items that Pydantic might have stripped
# (FullReport validation may have removed extra fields from vendors)
for i, vendor_dict in enumerate(vendors):
if isinstance(vendor_dict, dict):
# Ensure status-grouped items are preserved
if 'closed_items' in vendor_dict:
report_data['vendors'][i]['closed_items'] = vendor_dict['closed_items']
if 'monitor_items' in vendor_dict:
report_data['vendors'][i]['monitor_items'] = vendor_dict['monitor_items']
if 'open_items' in vendor_dict:
report_data['vendors'][i]['open_items'] = vendor_dict['open_items']
# Save to file if specified
if output_file:
output_path = Path(output_file)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(report_data, f, indent=2, ensure_ascii=False)
if verbose:
file_size = output_path.stat().st_size / 1024
print(f"✓ JSON report saved to: {output_path} ({file_size:.1f} KB)")
# Generate HTML report
if verbose:
print("Generating HTML report...")
html_path = generate_html_report(str(output_path))
if verbose:
html_size = Path(html_path).stat().st_size / 1024
print(f"✓ HTML report saved to: {html_path} ({html_size:.1f} KB)")
if verbose:
print()
print("=" * 70)
print("✓ Report generated successfully!")
print(f" Vendors: {len(vendors)}")
print(f" Total items: {report_data['summary']['total_items']}")
print("=" * 70)
return report_data
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Generate vendor reports from Excel files (no LLM required)")
parser.add_argument(
"--reports-dir",
type=str,
default="reports",
help="Directory containing Excel files (default: reports)"
)
parser.add_argument(
"--output",
type=str,
default="output/report.json",
help="Output JSON file path (default: output/report.json)"
)
parser.add_argument(
"--verbose",
action="store_true",
default=True,
help="Print verbose output"
)
args = parser.parse_args()
report = generate_report(
reports_dir=args.reports_dir,
output_file=args.output,
verbose=args.verbose
)
if report and "error" not in report:
print("\n✓ Report generation complete!")
else:
print("\n✗ Report generation failed.")
import sys
sys.exit(1)