vendor_report/data_preprocessor.py

745 lines
29 KiB
Python

#!/usr/bin/env python3
"""
Data Preprocessor - Clean and normalize Excel data before LLM processing
This module performs programmatic preprocessing to:
1. Parse Excel into structured format
2. Normalize vendor names
3. Normalize statuses and priorities
4. Parse dates into standard format
5. Calculate 24-hour windows
6. Pre-classify items
This reduces LLM errors and improves accuracy.
"""
import re
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Tuple
from pathlib import Path
from collections import defaultdict
from zoneinfo import ZoneInfo
import pandas as pd
class DataPreprocessor:
"""Preprocesses Excel data before sending to LLM."""
def __init__(self, current_date: Optional[datetime] = None):
"""
Initialize preprocessor.
Args:
current_date: Current date for 24-hour calculations (defaults to now in Baltimore/Eastern timezone)
"""
# Use Baltimore/Eastern timezone (America/New_York)
baltimore_tz = ZoneInfo("America/New_York")
if current_date is None:
self.current_date = datetime.now(baltimore_tz)
else:
# If current_date is timezone-naive, assume it's in Baltimore time
if current_date.tzinfo is None:
self.current_date = current_date.replace(tzinfo=baltimore_tz)
else:
# Convert to Baltimore timezone
self.current_date = current_date.astimezone(baltimore_tz)
self.items: List[Dict] = []
self.vendor_normalization_map: Dict[str, str] = {}
self._vendor_groups: Dict[str, List[str]] = {}
def _build_vendor_normalization_map(self, items: List[Dict]) -> None:
"""
Build vendor normalization map by extracting distinct vendors and grouping similar ones.
Args:
items: List of items with vendor_raw field
"""
# Extract all distinct vendor names (case-insensitive)
vendor_variants: Dict[str, List[str]] = defaultdict(list)
for item in items:
vendor_raw = item.get('vendor_raw', '').strip()
if not vendor_raw:
continue
vendor_lower = vendor_raw.lower()
vendor_variants[vendor_lower].append(vendor_raw)
# Normalize each vendor group
for vendor_lower, variants in vendor_variants.items():
# Get distinct variants
distinct_variants = list(set(variants))
# Find the best normalized form
# Prefer variants with mixed case (like "AutStand") over all lowercase
best_variant = None
for variant in distinct_variants:
# Check if variant has mixed case (indicates intentional capitalization)
if variant != variant.lower() and variant != variant.upper():
best_variant = variant
break
# If no mixed case found, use most common variant
if not best_variant:
best_variant = max(distinct_variants, key=lambda v: (variants.count(v), len(v)))
# Normalize the vendor name
normalized = self._normalize_vendor_case(best_variant)
# Map all variants to normalized name
for variant in distinct_variants:
self.vendor_normalization_map[variant.lower()] = normalized
# Store variant group for reference
self._vendor_groups[normalized] = distinct_variants
def _normalize_vendor_case(self, vendor: str) -> str:
"""
Normalize vendor name case using intelligent rules.
Args:
vendor: Raw vendor name
Returns:
Normalized vendor name
"""
if not vendor:
return "MISC"
vendor = vendor.strip()
# Handle combined vendors (Autstand/Beumer, DCS/Autstand)
if '/' in vendor:
parts = []
for part in vendor.split('/'):
part = part.strip()
# Title case each part, but preserve acronyms (all caps)
if part.isupper() or len(part) <= 3:
parts.append(part.upper())
else:
parts.append(part.title())
return '/'.join(parts)
# Handle vendors in parentheses (e.g., "MFO (Amazon)")
if '(' in vendor and ')' in vendor:
main_part = vendor.split('(')[0].strip()
paren_part = vendor.split('(')[1].split(')')[0].strip()
normalized_main = self._normalize_vendor_case(main_part)
normalized_paren = paren_part.title()
return f"{normalized_main} ({normalized_paren})"
# Handle acronyms (all caps short names)
if vendor.isupper() or (len(vendor) <= 4 and vendor.isalpha()):
return vendor.upper()
# Default: title case
return vendor.title()
def normalize_vendor_name(self, vendor: str) -> str:
"""
Normalize vendor name using the built normalization map.
Args:
vendor: Raw vendor name from Excel
Returns:
Normalized vendor name
"""
if not vendor:
return "MISC"
vendor_lower = vendor.strip().lower()
# Check normalization map (built from actual data)
if vendor_lower in self.vendor_normalization_map:
return self.vendor_normalization_map[vendor_lower]
# Fallback: normalize case (for new vendors not seen before)
return self._normalize_vendor_case(vendor.strip())
def normalize_status(self, status: str) -> Tuple[str, bool]:
"""
Normalize status and determine if closed.
Args:
status: Raw status string
Returns:
Tuple of (normalized_status, is_closed)
"""
if not status:
return "Incomplete", False
status_lower = status.lower().strip()
# Check for incomplete status FIRST (before checking for "complete" substring)
# because "incomplete" contains "complete" as a substring!
if status_lower == 'incomplete' or status_lower.startswith('incomplete'):
return "Incomplete", False
# Check for closed status (exact match or starts with)
if status_lower == 'complete' or status_lower.startswith('complete') or 'complette' in status_lower:
return "Complete", True
# Check for monitor status
if 'monitor' in status_lower or 'montor' in status_lower:
return "Monitor", False
# Default to incomplete/open
return "Incomplete", False
def normalize_priority(self, priority: str) -> Tuple[str, Optional[str]]:
"""
Normalize priority and classify level.
Args:
priority: Raw priority string
Returns:
Tuple of (normalized_priority, priority_level)
priority_level: "very_high", "high", "medium", "low", "monitoring", "complete", None
"""
if not priority:
return "", None
priority_lower = priority.lower()
# Very High priority
if '(1) very high' in priority_lower or '(1) very hgh' in priority_lower:
return priority, "very_high"
if 'very high' in priority_lower or 'very hgh' in priority_lower:
return priority, "very_high"
# High priority (but not Very High)
if '(2) high' in priority_lower or '(2) hgh' in priority_lower:
return priority, "high"
if priority_lower.startswith('2) high') or priority_lower.startswith('2) hgh'):
return priority, "high"
if priority_lower == 'high' and 'very' not in priority_lower:
return priority, "high"
# Medium priority
if '(3) medium' in priority_lower:
return priority, "medium"
if priority_lower == 'medium':
return priority, "medium"
# Low priority
if '(4) low' in priority_lower:
return priority, "low"
if priority_lower == 'low':
return priority, "low"
# Monitoring priority
if '(5) monitoring' in priority_lower:
return priority, "monitoring"
# Complete priority
if '(6) complete' in priority_lower:
return priority, "complete"
return priority, None
def parse_date(self, date_str: str) -> Optional[datetime]:
"""
Parse date from various formats and return timezone-aware datetime in Baltimore/Eastern timezone.
Args:
date_str: Date string in various formats
Returns:
Parsed datetime in Baltimore timezone or None
"""
if not date_str or date_str.strip() == '':
return None
date_str = date_str.strip()
baltimore_tz = ZoneInfo("America/New_York")
# Try different formats
formats = [
"%m/%d/%y", # 10/14/25
"%m/%d/%Y", # 10/14/2025
"%Y-%m-%d %H:%M:%S", # 2025-10-17 00:00:00
"%Y-%m-%d", # 2025-10-17
]
for fmt in formats:
try:
parsed_date = datetime.strptime(date_str, fmt)
# Make timezone-aware in Baltimore timezone
if parsed_date.tzinfo is None:
return parsed_date.replace(tzinfo=baltimore_tz)
else:
return parsed_date.astimezone(baltimore_tz)
except ValueError:
continue
return None
def is_within_24_hours(self, date: Optional[datetime]) -> bool:
"""
Check if date falls within yesterday (previous calendar day) in Baltimore/Eastern timezone.
This checks if the date is on yesterday's date, regardless of the exact time.
Args:
date: Date to check (should be timezone-aware in Baltimore timezone)
Returns:
True if date is yesterday (previous calendar day)
"""
if not date:
return False
# Ensure both dates are timezone-aware in Baltimore timezone
baltimore_tz = ZoneInfo("America/New_York")
# Convert date to Baltimore timezone if needed
if date.tzinfo is None:
# If date is timezone-naive, assume it's in Baltimore time
date_baltimore = date.replace(tzinfo=baltimore_tz)
else:
# Convert to Baltimore timezone
date_baltimore = date.astimezone(baltimore_tz)
# Ensure current_date is also in Baltimore timezone (should already be, but defensive check)
if self.current_date.tzinfo is None:
current_baltimore = self.current_date.replace(tzinfo=baltimore_tz)
else:
current_baltimore = self.current_date.astimezone(baltimore_tz)
# Get yesterday's date (previous calendar day)
yesterday = current_baltimore - timedelta(days=1)
yesterday_date = yesterday.date()
date_to_check = date_baltimore.date()
# Check if the date falls on yesterday
return date_to_check == yesterday_date
def parse_excel_row(self, cols: List[str]) -> Optional[Dict]:
"""
Parse a single Excel row into structured format.
Args:
cols: List of column values (tab-separated)
Returns:
Structured item dict or None if invalid
"""
if len(cols) < 8:
return None
punchlist_name = cols[0].strip()
vendor_raw = cols[1].strip() if len(cols) > 1 else ""
priority_raw = cols[2].strip() if len(cols) > 2 else ""
description = cols[3].strip() if len(cols) > 3 else ""
date_identified_str = cols[4].strip() if len(cols) > 4 else ""
status_updates = cols[5].strip() if len(cols) > 5 else ""
issue_image = cols[6].strip() if len(cols) > 6 else ""
status_raw = cols[7].strip() if len(cols) > 7 else ""
date_completed_str = cols[8].strip() if len(cols) > 8 else ""
if not punchlist_name:
return None
# Normalize fields
vendor = self.normalize_vendor_name(vendor_raw)
status, is_closed = self.normalize_status(status_raw)
priority, priority_level = self.normalize_priority(priority_raw)
# Parse dates
date_identified = self.parse_date(date_identified_str)
date_completed = self.parse_date(date_completed_str)
# Calculate age
age_days = None
if date_identified:
age_days = (self.current_date - date_identified).days
# Check 24-hour updates
is_recent_added = date_identified and self.is_within_24_hours(date_identified) and not is_closed
is_recent_closed = date_completed and self.is_within_24_hours(date_completed) and is_closed
is_recent_monitor = status == "Monitor" and (date_identified and self.is_within_24_hours(date_identified) or
date_completed and self.is_within_24_hours(date_completed))
return {
'punchlist_name': punchlist_name,
'vendor': vendor,
'vendor_raw': vendor_raw, # Keep original for reference
'priority': priority,
'priority_level': priority_level,
'description': description,
'date_identified': date_identified,
'date_identified_str': date_identified_str, # Keep original
'date_completed': date_completed,
'date_completed_str': date_completed_str, # Keep original
'status': status,
'status_raw': status_raw, # Keep original
'is_closed': is_closed,
'status_updates': status_updates,
'issue_image': issue_image,
'age_days': age_days,
'is_recent_added': is_recent_added,
'is_recent_closed': is_recent_closed,
'is_recent_monitor': is_recent_monitor,
}
def process_excel_file(self, excel_path: str) -> List[Dict]:
"""
Process Excel file directly using pandas for reliable parsing.
Args:
excel_path: Path to Excel file
Returns:
List of structured item dictionaries
"""
items = []
try:
xl_file = pd.ExcelFile(excel_path)
for sheet_name in xl_file.sheet_names:
# Skip certain sheets that are known to be duplicates/backups
if sheet_name.lower() in ['sheet1', 'comments']:
continue
# Read sheet
df = pd.read_excel(xl_file, sheet_name=sheet_name)
# Handle empty dataframe
if df.empty:
continue
# Fill NaN values
df = df.fillna("")
# Process each row (first pass - collect raw vendor names)
raw_items = []
for _, row in df.iterrows():
# Get columns (handle different column names)
cols = []
for i in range(max(9, len(df.columns))):
if i < len(df.columns):
cols.append(str(row.iloc[i]) if pd.notna(row.iloc[i]) else "")
else:
cols.append("")
# Parse without normalization first
vendor_raw = cols[1].strip() if len(cols) > 1 else ""
if cols[0].strip(): # Has punchlist name
raw_items.append({'vendor_raw': vendor_raw, 'cols': cols})
# Build vendor normalization map from actual data
if raw_items:
self._build_vendor_normalization_map(raw_items)
# Second pass - parse with normalization
seen_items = set() # Track seen items by (punchlist_name, vendor) tuple
for raw_item in raw_items:
item = self.parse_excel_row(raw_item['cols'])
if item:
# Create unique key for deduplication
item_key = (item['punchlist_name'].strip().lower(), item['vendor'].strip().lower())
if item_key not in seen_items:
seen_items.add(item_key)
items.append(item)
except Exception as e:
print(f"Error processing Excel file {excel_path}: {e}")
self.items = items
return items
def process_excel_text(self, excel_text: str) -> List[Dict]:
"""
Process raw Excel text into structured items (fallback method).
Args:
excel_text: Raw text from Excel loader
Returns:
List of structured item dictionaries
"""
lines = excel_text.split('\n')
items = []
current_item_cols = None
for line in lines:
# Skip header lines
if line.startswith('FILENAME') or line.startswith('SHEET') or line.startswith('='):
continue
if not line.strip():
continue
# Try tab-separated first (more reliable)
cols = line.split('\t')
# If no tabs, try space-separated (LangChain output)
if len(cols) < 8:
# Split by multiple spaces
cols = re.split(r'\s{2,}', line)
# Check if this looks like a new item (has punchlist name in first column)
if len(cols) >= 8 and cols[0].strip():
# Save previous item if exists
if current_item_cols:
item = self.parse_excel_row(current_item_cols)
if item:
items.append(item)
# Start new item
current_item_cols = cols
elif current_item_cols and len(cols) > 0:
# Continuation line - merge with current item
# Usually status updates continue on next line
if len(current_item_cols) > 5:
current_item_cols[5] += " " + line.strip()
# Don't forget last item
if current_item_cols:
item = self.parse_excel_row(current_item_cols)
if item:
items.append(item)
self.items = items
return items
def get_preprocessed_summary(self) -> Dict:
"""
Generate summary statistics from preprocessed data.
Returns:
Summary dictionary with vendor counts, etc.
"""
vendors = defaultdict(lambda: {
'items': [],
'closed': 0,
'open': 0,
'monitor': 0,
'very_high': [],
'high': [],
'unaddressed': [],
'recent_added': [],
'recent_closed': [],
'recent_monitor': []
})
for item in self.items:
vendor = item['vendor']
vendors[vendor]['items'].append(item)
if item['is_closed']:
vendors[vendor]['closed'] += 1
elif item['status'] == 'Monitor':
vendors[vendor]['monitor'] += 1
else:
vendors[vendor]['open'] += 1
if item['priority_level'] == 'very_high':
vendors[vendor]['very_high'].append(item)
elif item['priority_level'] == 'high':
vendors[vendor]['high'].append(item)
# Unaddressed = not closed AND not in Monitor status (open/incomplete items that need action)
if not item['is_closed'] and item['status'] != 'Monitor':
vendors[vendor]['unaddressed'].append(item)
if item['is_recent_added']:
vendors[vendor]['recent_added'].append(item)
if item['is_recent_closed']:
vendors[vendor]['recent_closed'].append(item)
if item['is_recent_monitor']:
vendors[vendor]['recent_monitor'].append(item)
# Sort unaddressed by date (oldest first)
baltimore_tz = ZoneInfo("America/New_York")
max_datetime = datetime.max.replace(tzinfo=baltimore_tz)
for vendor in vendors.values():
vendor['unaddressed'].sort(key=lambda x: x['date_identified'] or max_datetime)
return dict(vendors)
def format_for_llm(self) -> str:
"""
Format preprocessed data for human inspection.
Returns:
Formatted string with normalized, structured data
"""
summary = self.get_preprocessed_summary()
output_lines = []
output_lines.append("PREPROCESSED EXCEL DATA")
output_lines.append("=" * 80)
# Show timezone-aware datetime with timezone info
if self.current_date.tzinfo:
output_lines.append(f"Current Date (Baltimore/Eastern): {self.current_date.strftime('%Y-%m-%d %H:%M:%S %Z')}")
else:
output_lines.append(f"Current Date: {self.current_date.strftime('%Y-%m-%d %H:%M:%S')}")
output_lines.append(f"Total Items: {len(self.items)}")
output_lines.append("")
for vendor_name, vendor_data in sorted(summary.items()):
output_lines.append(f"VENDOR: {vendor_name}")
output_lines.append("-" * 80)
output_lines.append(f"Total Items: {len(vendor_data['items'])}")
output_lines.append(f" Closed: {vendor_data['closed']}")
output_lines.append(f" Open: {vendor_data['open']}")
output_lines.append(f" Monitor: {vendor_data['monitor']}")
output_lines.append("")
# Recent updates
if vendor_data['recent_added'] or vendor_data['recent_closed'] or vendor_data['recent_monitor']:
output_lines.append("RECENT UPDATES (Yesterday's Date):")
for item in vendor_data['recent_added']:
output_lines.append(f" ADDED: {item['punchlist_name']} | {item['date_identified_str']} | {item['status']}")
for item in vendor_data['recent_closed']:
output_lines.append(f" CLOSED: {item['punchlist_name']} | {item['date_completed_str']} | {item['status']}")
for item in vendor_data['recent_monitor']:
output_lines.append(f" MONITOR: {item['punchlist_name']} | {item['date_identified_str']} | {item['status']}")
output_lines.append("")
# Oldest unaddressed
if vendor_data['unaddressed']:
output_lines.append("OLDEST UNADDRESSED (Top 3):")
for item in vendor_data['unaddressed'][:3]:
output_lines.append(f" {item['punchlist_name']} | Age: {item['age_days']} days | {item['date_identified_str']} | {item['status']}")
output_lines.append("")
# Priority items
if vendor_data['very_high']:
output_lines.append(f"VERY HIGH PRIORITY ({len(vendor_data['very_high'])} items):")
for item in vendor_data['very_high']:
output_lines.append(f" {item['punchlist_name']} | {item['status']} | {item['date_identified_str']}")
output_lines.append("")
if vendor_data['high']:
output_lines.append(f"HIGH PRIORITY ({len(vendor_data['high'])} items):")
for item in vendor_data['high']:
output_lines.append(f" {item['punchlist_name']} | {item['status']} | {item['date_identified_str']}")
output_lines.append("")
# All items
output_lines.append("ALL ITEMS:")
for item in vendor_data['items']:
output_lines.append(
f" {item['punchlist_name']} | "
f"Vendor: {item['vendor']} | "
f"Priority: {item['priority']} ({item['priority_level']}) | "
f"Status: {item['status']} ({'CLOSED' if item['is_closed'] else 'OPEN'}) | "
f"Date: {item['date_identified_str']} | "
f"Description: {item['description'][:50] if item['description'] else 'N/A'}..."
)
output_lines.append("")
output_lines.append("=" * 80)
output_lines.append("")
return "\n".join(output_lines)
def preprocess_excel_data(excel_text: str, current_date: Optional[datetime] = None) -> Tuple[str, Dict]:
"""
Preprocess Excel data and return formatted string for LLM.
Args:
excel_text: Raw Excel text from loader
current_date: Current date for calculations
Returns:
Tuple of (formatted_string, summary_dict)
"""
preprocessor = DataPreprocessor(current_date=current_date)
preprocessor.process_excel_text(excel_text)
formatted = preprocessor.format_for_llm()
summary = preprocessor.get_preprocessed_summary()
return formatted, summary
def preprocess_excel_files(reports_dir: str = "reports", current_date: Optional[datetime] = None) -> Tuple[str, Dict]:
"""
Preprocess Excel files directly (more reliable than text parsing).
Args:
reports_dir: Directory containing Excel files
current_date: Current date for calculations
Returns:
Tuple of (formatted_string, summary_dict)
"""
preprocessor = DataPreprocessor(current_date=current_date)
reports_path = Path(reports_dir)
if not reports_path.exists():
return f"Reports directory '{reports_dir}' not found.", {}
excel_files = list(reports_path.glob("*.xlsx")) + list(reports_path.glob("*.xls"))
if not excel_files:
return f"No Excel files found in '{reports_dir}' directory.", {}
# Log which files will be processed
import logging
logger = logging.getLogger(__name__)
logger.info(f"Processing {len(excel_files)} Excel file(s) from {reports_dir}:")
for excel_file in excel_files:
file_size = excel_file.stat().st_size
mtime = excel_file.stat().st_mtime
mtime_str = datetime.fromtimestamp(mtime).strftime('%Y-%m-%d %H:%M:%S')
logger.info(f" - {excel_file.name} ({file_size} bytes, modified: {mtime_str})")
# WARNING: If multiple files found, this will combine data from all files
if len(excel_files) > 1:
logger.warning(f"WARNING: Found {len(excel_files)} Excel file(s). Report will combine data from ALL files!")
logger.warning("This may cause incorrect results. Only ONE file should exist in the reports directory.")
logger.warning(f"Files found: {[f.name for f in excel_files]}")
# First pass: collect all items with raw vendor names
all_raw_items = []
for excel_file in excel_files:
try:
xl_file = pd.ExcelFile(str(excel_file))
for sheet_name in xl_file.sheet_names:
# Skip certain sheets that are known to be duplicates/backups
if sheet_name.lower() in ['sheet1', 'comments']:
continue
df = pd.read_excel(xl_file, sheet_name=sheet_name)
if df.empty:
continue
df = df.fillna("")
for _, row in df.iterrows():
cols = []
for i in range(max(9, len(df.columns))):
if i < len(df.columns):
cols.append(str(row.iloc[i]) if pd.notna(row.iloc[i]) else "")
else:
cols.append("")
if cols[0].strip(): # Has punchlist name
all_raw_items.append({'vendor_raw': cols[1].strip() if len(cols) > 1 else "", 'cols': cols})
except Exception as e:
print(f"Error reading {excel_file}: {e}")
# Build vendor normalization map from all collected data
if all_raw_items:
preprocessor._build_vendor_normalization_map(all_raw_items)
# Second pass: process with normalization
all_items = []
seen_items_global = set() # Track seen items across all files by (punchlist_name, vendor) tuple
for excel_file in excel_files:
items = preprocessor.process_excel_file(str(excel_file))
for item in items:
# Deduplicate across all files
item_key = (item['punchlist_name'].strip().lower(), item['vendor'].strip().lower())
if item_key not in seen_items_global:
seen_items_global.add(item_key)
all_items.append(item)
preprocessor.items = all_items
formatted = preprocessor.format_for_llm()
summary = preprocessor.get_preprocessed_summary()
return formatted, summary