#!/usr/bin/env python3 """ Data Preprocessor - Clean and normalize Excel data before LLM processing This module performs programmatic preprocessing to: 1. Parse Excel into structured format 2. Normalize vendor names 3. Normalize statuses and priorities 4. Parse dates into standard format 5. Calculate 24-hour windows 6. Pre-classify items This reduces LLM errors and improves accuracy. """ import re from datetime import datetime, timedelta from typing import List, Dict, Optional, Tuple from pathlib import Path from collections import defaultdict from zoneinfo import ZoneInfo import pandas as pd class DataPreprocessor: """Preprocesses Excel data before sending to LLM.""" def __init__(self, current_date: Optional[datetime] = None): """ Initialize preprocessor. Args: current_date: Current date for 24-hour calculations (defaults to now in Baltimore/Eastern timezone) """ # Use Baltimore/Eastern timezone (America/New_York) baltimore_tz = ZoneInfo("America/New_York") if current_date is None: self.current_date = datetime.now(baltimore_tz) else: # If current_date is timezone-naive, assume it's in Baltimore time if current_date.tzinfo is None: self.current_date = current_date.replace(tzinfo=baltimore_tz) else: # Convert to Baltimore timezone self.current_date = current_date.astimezone(baltimore_tz) self.items: List[Dict] = [] self.vendor_normalization_map: Dict[str, str] = {} self._vendor_groups: Dict[str, List[str]] = {} def _build_vendor_normalization_map(self, items: List[Dict]) -> None: """ Build vendor normalization map by extracting distinct vendors and grouping similar ones. Args: items: List of items with vendor_raw field """ # Extract all distinct vendor names (case-insensitive) vendor_variants: Dict[str, List[str]] = defaultdict(list) for item in items: vendor_raw = item.get('vendor_raw', '').strip() if not vendor_raw: continue vendor_lower = vendor_raw.lower() vendor_variants[vendor_lower].append(vendor_raw) # Normalize each vendor group for vendor_lower, variants in vendor_variants.items(): # Get distinct variants distinct_variants = list(set(variants)) # Find the best normalized form # Prefer variants with mixed case (like "AutStand") over all lowercase best_variant = None for variant in distinct_variants: # Check if variant has mixed case (indicates intentional capitalization) if variant != variant.lower() and variant != variant.upper(): best_variant = variant break # If no mixed case found, use most common variant if not best_variant: best_variant = max(distinct_variants, key=lambda v: (variants.count(v), len(v))) # Normalize the vendor name normalized = self._normalize_vendor_case(best_variant) # Map all variants to normalized name for variant in distinct_variants: self.vendor_normalization_map[variant.lower()] = normalized # Store variant group for reference self._vendor_groups[normalized] = distinct_variants def _normalize_vendor_case(self, vendor: str) -> str: """ Normalize vendor name case using intelligent rules. Args: vendor: Raw vendor name Returns: Normalized vendor name """ if not vendor: return "MISC" vendor = vendor.strip() # Handle combined vendors (Autstand/Beumer, DCS/Autstand) if '/' in vendor: parts = [] for part in vendor.split('/'): part = part.strip() # Title case each part, but preserve acronyms (all caps) if part.isupper() or len(part) <= 3: parts.append(part.upper()) else: parts.append(part.title()) return '/'.join(parts) # Handle vendors in parentheses (e.g., "MFO (Amazon)") if '(' in vendor and ')' in vendor: main_part = vendor.split('(')[0].strip() paren_part = vendor.split('(')[1].split(')')[0].strip() normalized_main = self._normalize_vendor_case(main_part) normalized_paren = paren_part.title() return f"{normalized_main} ({normalized_paren})" # Handle acronyms (all caps short names) if vendor.isupper() or (len(vendor) <= 4 and vendor.isalpha()): return vendor.upper() # Default: title case return vendor.title() def normalize_vendor_name(self, vendor: str) -> str: """ Normalize vendor name using the built normalization map. Args: vendor: Raw vendor name from Excel Returns: Normalized vendor name """ if not vendor: return "MISC" vendor_lower = vendor.strip().lower() # Check normalization map (built from actual data) if vendor_lower in self.vendor_normalization_map: return self.vendor_normalization_map[vendor_lower] # Fallback: normalize case (for new vendors not seen before) return self._normalize_vendor_case(vendor.strip()) def normalize_status(self, status: str) -> Tuple[str, bool]: """ Normalize status and determine if closed. Args: status: Raw status string Returns: Tuple of (normalized_status, is_closed) """ if not status: return "Incomplete", False status_lower = status.lower().strip() # Check for incomplete status FIRST (before checking for "complete" substring) # because "incomplete" contains "complete" as a substring! if status_lower == 'incomplete' or status_lower.startswith('incomplete'): return "Incomplete", False # Check for closed status (exact match or starts with) if status_lower == 'complete' or status_lower.startswith('complete') or 'complette' in status_lower: return "Complete", True # Check for monitor status if 'monitor' in status_lower or 'montor' in status_lower: return "Monitor", False # Default to incomplete/open return "Incomplete", False def normalize_priority(self, priority: str) -> Tuple[str, Optional[str]]: """ Normalize priority and classify level. Args: priority: Raw priority string Returns: Tuple of (normalized_priority, priority_level) priority_level: "very_high", "high", "medium", "low", "monitoring", "complete", None """ if not priority: return "", None priority_lower = priority.lower() # Very High priority if '(1) very high' in priority_lower or '(1) very hgh' in priority_lower: return priority, "very_high" if 'very high' in priority_lower or 'very hgh' in priority_lower: return priority, "very_high" # High priority (but not Very High) if '(2) high' in priority_lower or '(2) hgh' in priority_lower: return priority, "high" if priority_lower.startswith('2) high') or priority_lower.startswith('2) hgh'): return priority, "high" if priority_lower == 'high' and 'very' not in priority_lower: return priority, "high" # Medium priority if '(3) medium' in priority_lower: return priority, "medium" if priority_lower == 'medium': return priority, "medium" # Low priority if '(4) low' in priority_lower: return priority, "low" if priority_lower == 'low': return priority, "low" # Monitoring priority if '(5) monitoring' in priority_lower: return priority, "monitoring" # Complete priority if '(6) complete' in priority_lower: return priority, "complete" return priority, None def parse_date(self, date_str: str) -> Optional[datetime]: """ Parse date from various formats and return timezone-aware datetime in Baltimore/Eastern timezone. Args: date_str: Date string in various formats Returns: Parsed datetime in Baltimore timezone or None """ if not date_str or date_str.strip() == '': return None date_str = date_str.strip() baltimore_tz = ZoneInfo("America/New_York") # Try different formats formats = [ "%m/%d/%y", # 10/14/25 "%m/%d/%Y", # 10/14/2025 "%Y-%m-%d %H:%M:%S", # 2025-10-17 00:00:00 "%Y-%m-%d", # 2025-10-17 ] for fmt in formats: try: parsed_date = datetime.strptime(date_str, fmt) # Make timezone-aware in Baltimore timezone if parsed_date.tzinfo is None: return parsed_date.replace(tzinfo=baltimore_tz) else: return parsed_date.astimezone(baltimore_tz) except ValueError: continue return None def is_within_24_hours(self, date: Optional[datetime]) -> bool: """ Check if date falls within yesterday (previous calendar day) in Baltimore/Eastern timezone. This checks if the date is on yesterday's date, regardless of the exact time. Args: date: Date to check (should be timezone-aware in Baltimore timezone) Returns: True if date is yesterday (previous calendar day) """ if not date: return False # Ensure both dates are timezone-aware in Baltimore timezone baltimore_tz = ZoneInfo("America/New_York") # Convert date to Baltimore timezone if needed if date.tzinfo is None: # If date is timezone-naive, assume it's in Baltimore time date_baltimore = date.replace(tzinfo=baltimore_tz) else: # Convert to Baltimore timezone date_baltimore = date.astimezone(baltimore_tz) # Ensure current_date is also in Baltimore timezone (should already be, but defensive check) if self.current_date.tzinfo is None: current_baltimore = self.current_date.replace(tzinfo=baltimore_tz) else: current_baltimore = self.current_date.astimezone(baltimore_tz) # Get yesterday's date (previous calendar day) yesterday = current_baltimore - timedelta(days=1) yesterday_date = yesterday.date() date_to_check = date_baltimore.date() # Check if the date falls on yesterday return date_to_check == yesterday_date def parse_excel_row(self, cols: List[str]) -> Optional[Dict]: """ Parse a single Excel row into structured format. Args: cols: List of column values (tab-separated) Returns: Structured item dict or None if invalid """ if len(cols) < 8: return None punchlist_name = cols[0].strip() vendor_raw = cols[1].strip() if len(cols) > 1 else "" priority_raw = cols[2].strip() if len(cols) > 2 else "" description = cols[3].strip() if len(cols) > 3 else "" date_identified_str = cols[4].strip() if len(cols) > 4 else "" status_updates = cols[5].strip() if len(cols) > 5 else "" issue_image = cols[6].strip() if len(cols) > 6 else "" status_raw = cols[7].strip() if len(cols) > 7 else "" date_completed_str = cols[8].strip() if len(cols) > 8 else "" if not punchlist_name: return None # Normalize fields vendor = self.normalize_vendor_name(vendor_raw) status, is_closed = self.normalize_status(status_raw) priority, priority_level = self.normalize_priority(priority_raw) # Parse dates date_identified = self.parse_date(date_identified_str) date_completed = self.parse_date(date_completed_str) # Calculate age age_days = None if date_identified: age_days = (self.current_date - date_identified).days # Check 24-hour updates is_recent_added = date_identified and self.is_within_24_hours(date_identified) and not is_closed is_recent_closed = date_completed and self.is_within_24_hours(date_completed) and is_closed is_recent_monitor = status == "Monitor" and (date_identified and self.is_within_24_hours(date_identified) or date_completed and self.is_within_24_hours(date_completed)) return { 'punchlist_name': punchlist_name, 'vendor': vendor, 'vendor_raw': vendor_raw, # Keep original for reference 'priority': priority, 'priority_level': priority_level, 'description': description, 'date_identified': date_identified, 'date_identified_str': date_identified_str, # Keep original 'date_completed': date_completed, 'date_completed_str': date_completed_str, # Keep original 'status': status, 'status_raw': status_raw, # Keep original 'is_closed': is_closed, 'status_updates': status_updates, 'issue_image': issue_image, 'age_days': age_days, 'is_recent_added': is_recent_added, 'is_recent_closed': is_recent_closed, 'is_recent_monitor': is_recent_monitor, } def process_excel_file(self, excel_path: str) -> List[Dict]: """ Process Excel file directly using pandas for reliable parsing. Args: excel_path: Path to Excel file Returns: List of structured item dictionaries """ items = [] try: xl_file = pd.ExcelFile(excel_path) for sheet_name in xl_file.sheet_names: # Skip certain sheets that are known to be duplicates/backups if sheet_name.lower() in ['sheet1', 'comments']: continue # Read sheet df = pd.read_excel(xl_file, sheet_name=sheet_name) # Handle empty dataframe if df.empty: continue # Fill NaN values df = df.fillna("") # Process each row (first pass - collect raw vendor names) raw_items = [] for _, row in df.iterrows(): # Get columns (handle different column names) cols = [] for i in range(max(9, len(df.columns))): if i < len(df.columns): cols.append(str(row.iloc[i]) if pd.notna(row.iloc[i]) else "") else: cols.append("") # Parse without normalization first vendor_raw = cols[1].strip() if len(cols) > 1 else "" if cols[0].strip(): # Has punchlist name raw_items.append({'vendor_raw': vendor_raw, 'cols': cols}) # Build vendor normalization map from actual data if raw_items: self._build_vendor_normalization_map(raw_items) # Second pass - parse with normalization seen_items = set() # Track seen items by (punchlist_name, vendor) tuple for raw_item in raw_items: item = self.parse_excel_row(raw_item['cols']) if item: # Create unique key for deduplication item_key = (item['punchlist_name'].strip().lower(), item['vendor'].strip().lower()) if item_key not in seen_items: seen_items.add(item_key) items.append(item) except Exception as e: print(f"Error processing Excel file {excel_path}: {e}") self.items = items return items def process_excel_text(self, excel_text: str) -> List[Dict]: """ Process raw Excel text into structured items (fallback method). Args: excel_text: Raw text from Excel loader Returns: List of structured item dictionaries """ lines = excel_text.split('\n') items = [] current_item_cols = None for line in lines: # Skip header lines if line.startswith('FILENAME') or line.startswith('SHEET') or line.startswith('='): continue if not line.strip(): continue # Try tab-separated first (more reliable) cols = line.split('\t') # If no tabs, try space-separated (LangChain output) if len(cols) < 8: # Split by multiple spaces cols = re.split(r'\s{2,}', line) # Check if this looks like a new item (has punchlist name in first column) if len(cols) >= 8 and cols[0].strip(): # Save previous item if exists if current_item_cols: item = self.parse_excel_row(current_item_cols) if item: items.append(item) # Start new item current_item_cols = cols elif current_item_cols and len(cols) > 0: # Continuation line - merge with current item # Usually status updates continue on next line if len(current_item_cols) > 5: current_item_cols[5] += " " + line.strip() # Don't forget last item if current_item_cols: item = self.parse_excel_row(current_item_cols) if item: items.append(item) self.items = items return items def get_preprocessed_summary(self) -> Dict: """ Generate summary statistics from preprocessed data. Returns: Summary dictionary with vendor counts, etc. """ vendors = defaultdict(lambda: { 'items': [], 'closed': 0, 'open': 0, 'monitor': 0, 'very_high': [], 'high': [], 'unaddressed': [], 'recent_added': [], 'recent_closed': [], 'recent_monitor': [] }) for item in self.items: vendor = item['vendor'] vendors[vendor]['items'].append(item) if item['is_closed']: vendors[vendor]['closed'] += 1 elif item['status'] == 'Monitor': vendors[vendor]['monitor'] += 1 else: vendors[vendor]['open'] += 1 if item['priority_level'] == 'very_high': vendors[vendor]['very_high'].append(item) elif item['priority_level'] == 'high': vendors[vendor]['high'].append(item) # Unaddressed = not closed AND not in Monitor status (open/incomplete items that need action) if not item['is_closed'] and item['status'] != 'Monitor': vendors[vendor]['unaddressed'].append(item) if item['is_recent_added']: vendors[vendor]['recent_added'].append(item) if item['is_recent_closed']: vendors[vendor]['recent_closed'].append(item) if item['is_recent_monitor']: vendors[vendor]['recent_monitor'].append(item) # Sort unaddressed by date (oldest first) baltimore_tz = ZoneInfo("America/New_York") max_datetime = datetime.max.replace(tzinfo=baltimore_tz) for vendor in vendors.values(): vendor['unaddressed'].sort(key=lambda x: x['date_identified'] or max_datetime) return dict(vendors) def format_for_llm(self) -> str: """ Format preprocessed data for human inspection. Returns: Formatted string with normalized, structured data """ summary = self.get_preprocessed_summary() output_lines = [] output_lines.append("PREPROCESSED EXCEL DATA") output_lines.append("=" * 80) # Show timezone-aware datetime with timezone info if self.current_date.tzinfo: output_lines.append(f"Current Date (Baltimore/Eastern): {self.current_date.strftime('%Y-%m-%d %H:%M:%S %Z')}") else: output_lines.append(f"Current Date: {self.current_date.strftime('%Y-%m-%d %H:%M:%S')}") output_lines.append(f"Total Items: {len(self.items)}") output_lines.append("") for vendor_name, vendor_data in sorted(summary.items()): output_lines.append(f"VENDOR: {vendor_name}") output_lines.append("-" * 80) output_lines.append(f"Total Items: {len(vendor_data['items'])}") output_lines.append(f" Closed: {vendor_data['closed']}") output_lines.append(f" Open: {vendor_data['open']}") output_lines.append(f" Monitor: {vendor_data['monitor']}") output_lines.append("") # Recent updates if vendor_data['recent_added'] or vendor_data['recent_closed'] or vendor_data['recent_monitor']: output_lines.append("RECENT UPDATES (Yesterday's Date):") for item in vendor_data['recent_added']: output_lines.append(f" ADDED: {item['punchlist_name']} | {item['date_identified_str']} | {item['status']}") for item in vendor_data['recent_closed']: output_lines.append(f" CLOSED: {item['punchlist_name']} | {item['date_completed_str']} | {item['status']}") for item in vendor_data['recent_monitor']: output_lines.append(f" MONITOR: {item['punchlist_name']} | {item['date_identified_str']} | {item['status']}") output_lines.append("") # Oldest unaddressed if vendor_data['unaddressed']: output_lines.append("OLDEST UNADDRESSED (Top 3):") for item in vendor_data['unaddressed'][:3]: output_lines.append(f" {item['punchlist_name']} | Age: {item['age_days']} days | {item['date_identified_str']} | {item['status']}") output_lines.append("") # Priority items if vendor_data['very_high']: output_lines.append(f"VERY HIGH PRIORITY ({len(vendor_data['very_high'])} items):") for item in vendor_data['very_high']: output_lines.append(f" {item['punchlist_name']} | {item['status']} | {item['date_identified_str']}") output_lines.append("") if vendor_data['high']: output_lines.append(f"HIGH PRIORITY ({len(vendor_data['high'])} items):") for item in vendor_data['high']: output_lines.append(f" {item['punchlist_name']} | {item['status']} | {item['date_identified_str']}") output_lines.append("") # All items output_lines.append("ALL ITEMS:") for item in vendor_data['items']: output_lines.append( f" {item['punchlist_name']} | " f"Vendor: {item['vendor']} | " f"Priority: {item['priority']} ({item['priority_level']}) | " f"Status: {item['status']} ({'CLOSED' if item['is_closed'] else 'OPEN'}) | " f"Date: {item['date_identified_str']} | " f"Description: {item['description'][:50] if item['description'] else 'N/A'}..." ) output_lines.append("") output_lines.append("=" * 80) output_lines.append("") return "\n".join(output_lines) def preprocess_excel_data(excel_text: str, current_date: Optional[datetime] = None) -> Tuple[str, Dict]: """ Preprocess Excel data and return formatted string for LLM. Args: excel_text: Raw Excel text from loader current_date: Current date for calculations Returns: Tuple of (formatted_string, summary_dict) """ preprocessor = DataPreprocessor(current_date=current_date) preprocessor.process_excel_text(excel_text) formatted = preprocessor.format_for_llm() summary = preprocessor.get_preprocessed_summary() return formatted, summary def preprocess_excel_files(reports_dir: str = "reports", current_date: Optional[datetime] = None) -> Tuple[str, Dict]: """ Preprocess Excel files directly (more reliable than text parsing). Args: reports_dir: Directory containing Excel files current_date: Current date for calculations Returns: Tuple of (formatted_string, summary_dict) """ preprocessor = DataPreprocessor(current_date=current_date) reports_path = Path(reports_dir) if not reports_path.exists(): return f"Reports directory '{reports_dir}' not found.", {} excel_files = list(reports_path.glob("*.xlsx")) + list(reports_path.glob("*.xls")) if not excel_files: return f"No Excel files found in '{reports_dir}' directory.", {} # First pass: collect all items with raw vendor names all_raw_items = [] for excel_file in excel_files: try: xl_file = pd.ExcelFile(str(excel_file)) for sheet_name in xl_file.sheet_names: # Skip certain sheets that are known to be duplicates/backups if sheet_name.lower() in ['sheet1', 'comments']: continue df = pd.read_excel(xl_file, sheet_name=sheet_name) if df.empty: continue df = df.fillna("") for _, row in df.iterrows(): cols = [] for i in range(max(9, len(df.columns))): if i < len(df.columns): cols.append(str(row.iloc[i]) if pd.notna(row.iloc[i]) else "") else: cols.append("") if cols[0].strip(): # Has punchlist name all_raw_items.append({'vendor_raw': cols[1].strip() if len(cols) > 1 else "", 'cols': cols}) except Exception as e: print(f"Error reading {excel_file}: {e}") # Build vendor normalization map from all collected data if all_raw_items: preprocessor._build_vendor_normalization_map(all_raw_items) # Second pass: process with normalization all_items = [] seen_items_global = set() # Track seen items across all files by (punchlist_name, vendor) tuple for excel_file in excel_files: items = preprocessor.process_excel_file(str(excel_file)) for item in items: # Deduplicate across all files item_key = (item['punchlist_name'].strip().lower(), item['vendor'].strip().lower()) if item_key not in seen_items_global: seen_items_global.add(item_key) all_items.append(item) preprocessor.items = all_items formatted = preprocessor.format_for_llm() summary = preprocessor.get_preprocessed_summary() return formatted, summary