vendor_report/report_generator.py
ilia gu cf28d462b2 Fix status handling and deduplication
- Fix incomplete status detection: check for 'incomplete' before 'complete' to avoid substring match bug
- Change incomplete status badge color to yellow/orange (warning) instead of red
- Remove redundant 'Open' status card, keep only actual statuses (Closed, Monitor, Incomplete)
- Add deduplication at Excel processing level to prevent duplicate items
- Skip Sheet1 and Comments sheets which contain duplicate data
- Improve HTML deduplication when combining items for All tab
- Fix open_count to use incomplete_count for consistency
2025-11-08 18:20:06 +04:00

255 lines
10 KiB
Python

#!/usr/bin/env python3
"""
Report Generator
Generates vendor reports directly from preprocessed Excel data.
"""
import json
from pathlib import Path
from datetime import datetime
from typing import Optional, Dict, List
from zoneinfo import ZoneInfo
from data_preprocessor import preprocess_excel_files
from models import FullReport, VendorMetrics, VendorUpdates24h, PunchlistItem
from html_generator import generate_html_report
def convert_item_to_punchlist_item(item: Dict) -> PunchlistItem:
"""Convert preprocessed item dict to PunchlistItem Pydantic model."""
return PunchlistItem(
punchlist_name=item.get('punchlist_name', ''),
description=item.get('description') or None,
priority=item.get('priority') or None,
date_identified=item.get('date_identified_str') or None,
date_completed=item.get('date_completed_str') or None,
status=item.get('status', 'Incomplete'),
status_updates=item.get('status_updates') or None,
issue_image=item.get('issue_image') or None,
age_days=item.get('age_days')
)
def generate_report(
reports_dir: str = "reports",
output_file: Optional[str] = None,
verbose: bool = True
) -> dict:
"""
Generate vendor report directly from preprocessed data.
Args:
reports_dir: Directory containing Excel files
output_file: Optional path to save JSON output
verbose: Whether to print progress messages
Returns:
Dictionary containing the generated report
"""
if verbose:
print("=" * 70)
print("REPORT GENERATION")
print("=" * 70)
print(f"Loading and preprocessing Excel files from '{reports_dir}'...")
# Preprocess Excel files using Baltimore/Eastern timezone
baltimore_tz = ZoneInfo("America/New_York")
current_date_baltimore = datetime.now(baltimore_tz)
if verbose:
print(f"Using Baltimore/Eastern timezone (America/New_York) for 24h calculations")
print(f"Current time: {current_date_baltimore.strftime('%Y-%m-%d %H:%M:%S %Z')}")
preprocessed_data, summary = preprocess_excel_files(reports_dir, current_date=current_date_baltimore)
if not summary:
print(f"Error: No data processed")
return {}
# Save preprocessed data for inspection
preprocessed_output_path = Path("output/preprocessed_data.txt")
preprocessed_output_path.parent.mkdir(parents=True, exist_ok=True)
with open(preprocessed_output_path, 'w', encoding='utf-8') as f:
f.write(preprocessed_data)
if verbose:
total_items = sum(len(v['items']) for v in summary.values())
print(f"✓ Processed {total_items} items from {len(summary)} vendors")
print(f"✓ Preprocessed data saved to: {preprocessed_output_path}")
print("Generating report directly from preprocessed data...")
# Build vendors list
vendors = []
for vendor_name, vendor_data in sorted(summary.items()):
# Build 24-hour updates
updates_24h = VendorUpdates24h(
added=[convert_item_to_punchlist_item(item) for item in vendor_data['recent_added']],
closed=[convert_item_to_punchlist_item(item) for item in vendor_data['recent_closed']],
changed_to_monitor=[convert_item_to_punchlist_item(item) for item in vendor_data['recent_monitor']]
)
# Get oldest 3 unaddressed (already sorted)
oldest_unaddressed = [
convert_item_to_punchlist_item(item)
for item in vendor_data['unaddressed'][:3]
]
# Get very high priority items
very_high_items = [
convert_item_to_punchlist_item(item)
for item in vendor_data['very_high']
]
# Get high priority items
high_items = [
convert_item_to_punchlist_item(item)
for item in vendor_data['high']
]
# Get all items grouped by status for tabs
all_items = vendor_data.get('items', [])
# Use status field - preprocessor sets status to 'Complete', 'Monitor', or 'Incomplete'
# Also check is_closed flag as backup
closed_items = [convert_item_to_punchlist_item(item) for item in all_items
if item.get('status', '').lower() == 'complete' or item.get('is_closed', False)]
monitor_items = [convert_item_to_punchlist_item(item) for item in all_items
if item.get('status', '').lower() == 'monitor']
incomplete_items = [convert_item_to_punchlist_item(item) for item in all_items
if item.get('status', '').lower() == 'incomplete' and not item.get('is_closed', False)]
# Open items = all non-closed items (Monitor + Incomplete)
open_items = [convert_item_to_punchlist_item(item) for item in all_items
if not item.get('is_closed', False)]
# Calculate counts
incomplete_count = len(incomplete_items)
# Open count = all non-closed items (Monitor + Incomplete)
open_count = len(open_items)
# Create vendor metrics
vendor_metrics = VendorMetrics(
vendor_name=vendor_name,
total_items=len(vendor_data['items']),
closed_count=vendor_data['closed'],
open_count=open_count, # Use incomplete_count for consistency
monitor_count=vendor_data['monitor'],
updates_24h=updates_24h,
oldest_unaddressed=oldest_unaddressed,
very_high_priority_items=very_high_items,
high_priority_items=high_items
)
# Add status-grouped items to vendor metrics (will be serialized to dict)
vendor_dict = vendor_metrics.model_dump()
# Convert Pydantic models to dicts
vendor_dict['closed_items'] = [item.model_dump() for item in closed_items]
vendor_dict['monitor_items'] = [item.model_dump() for item in monitor_items]
vendor_dict['open_items'] = [item.model_dump() for item in open_items]
vendor_dict['incomplete_items'] = [item.model_dump() for item in incomplete_items]
vendor_dict['incomplete_count'] = incomplete_count
vendors.append(vendor_dict)
# Create full report
report = FullReport(
report_generated_at=datetime.now().isoformat(),
vendors=vendors,
summary={
"total_vendors": len(vendors),
"total_items": sum(v.get('total_items', 0) if isinstance(v, dict) else v.total_items for v in vendors),
"total_closed": sum(v.get('closed_count', 0) if isinstance(v, dict) else v.closed_count for v in vendors),
"total_open": sum(v.get('open_count', 0) if isinstance(v, dict) else v.open_count for v in vendors),
"total_monitor": sum(v.get('monitor_count', 0) if isinstance(v, dict) else v.monitor_count for v in vendors),
"total_incomplete": sum(v.get('incomplete_count', 0) if isinstance(v, dict) else 0 for v in vendors)
}
)
# Convert to dict - vendors already have closed_items, monitor_items, open_items from above
report_data = report.model_dump()
# Restore the status-grouped items that Pydantic might have stripped
# (FullReport validation may have removed extra fields from vendors)
for i, vendor_dict in enumerate(vendors):
if isinstance(vendor_dict, dict):
# Ensure status-grouped items are preserved
if 'closed_items' in vendor_dict:
report_data['vendors'][i]['closed_items'] = vendor_dict['closed_items']
if 'monitor_items' in vendor_dict:
report_data['vendors'][i]['monitor_items'] = vendor_dict['monitor_items']
if 'open_items' in vendor_dict:
report_data['vendors'][i]['open_items'] = vendor_dict['open_items']
if 'incomplete_items' in vendor_dict:
report_data['vendors'][i]['incomplete_items'] = vendor_dict['incomplete_items']
if 'incomplete_count' in vendor_dict:
report_data['vendors'][i]['incomplete_count'] = vendor_dict['incomplete_count']
# Save to file if specified
if output_file:
output_path = Path(output_file)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(report_data, f, indent=2, ensure_ascii=False)
if verbose:
file_size = output_path.stat().st_size / 1024
print(f"✓ JSON report saved to: {output_path} ({file_size:.1f} KB)")
# Generate HTML report
if verbose:
print("Generating HTML report...")
html_path = generate_html_report(str(output_path))
if verbose:
html_size = Path(html_path).stat().st_size / 1024
print(f"✓ HTML report saved to: {html_path} ({html_size:.1f} KB)")
if verbose:
print()
print("=" * 70)
print("✓ Report generated successfully!")
print(f" Vendors: {len(vendors)}")
print(f" Total items: {report_data['summary']['total_items']}")
print("=" * 70)
return report_data
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Generate vendor reports from Excel files")
parser.add_argument(
"--reports-dir",
type=str,
default="reports",
help="Directory containing Excel files (default: reports)"
)
parser.add_argument(
"--output",
type=str,
default="output/report.json",
help="Output JSON file path (default: output/report.json)"
)
parser.add_argument(
"--verbose",
action="store_true",
default=True,
help="Print verbose output"
)
args = parser.parse_args()
report = generate_report(
reports_dir=args.reports_dir,
output_file=args.output,
verbose=args.verbose
)
if report and "error" not in report:
print("\n✓ Report generation complete!")
else:
print("\n✗ Report generation failed.")
import sys
sys.exit(1)