729 lines
28 KiB
Python
729 lines
28 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Data Preprocessor - Clean and normalize Excel data before LLM processing
|
|
|
|
This module performs programmatic preprocessing to:
|
|
1. Parse Excel into structured format
|
|
2. Normalize vendor names
|
|
3. Normalize statuses and priorities
|
|
4. Parse dates into standard format
|
|
5. Calculate 24-hour windows
|
|
6. Pre-classify items
|
|
|
|
This reduces LLM errors and improves accuracy.
|
|
"""
|
|
|
|
import re
|
|
from datetime import datetime, timedelta
|
|
from typing import List, Dict, Optional, Tuple
|
|
from pathlib import Path
|
|
from collections import defaultdict
|
|
from zoneinfo import ZoneInfo
|
|
import pandas as pd
|
|
|
|
|
|
class DataPreprocessor:
|
|
"""Preprocesses Excel data before sending to LLM."""
|
|
|
|
def __init__(self, current_date: Optional[datetime] = None):
|
|
"""
|
|
Initialize preprocessor.
|
|
|
|
Args:
|
|
current_date: Current date for 24-hour calculations (defaults to now in Baltimore/Eastern timezone)
|
|
"""
|
|
# Use Baltimore/Eastern timezone (America/New_York)
|
|
baltimore_tz = ZoneInfo("America/New_York")
|
|
if current_date is None:
|
|
self.current_date = datetime.now(baltimore_tz)
|
|
else:
|
|
# If current_date is timezone-naive, assume it's in Baltimore time
|
|
if current_date.tzinfo is None:
|
|
self.current_date = current_date.replace(tzinfo=baltimore_tz)
|
|
else:
|
|
# Convert to Baltimore timezone
|
|
self.current_date = current_date.astimezone(baltimore_tz)
|
|
self.items: List[Dict] = []
|
|
self.vendor_normalization_map: Dict[str, str] = {}
|
|
self._vendor_groups: Dict[str, List[str]] = {}
|
|
|
|
def _build_vendor_normalization_map(self, items: List[Dict]) -> None:
|
|
"""
|
|
Build vendor normalization map by extracting distinct vendors and grouping similar ones.
|
|
|
|
Args:
|
|
items: List of items with vendor_raw field
|
|
"""
|
|
# Extract all distinct vendor names (case-insensitive)
|
|
vendor_variants: Dict[str, List[str]] = defaultdict(list)
|
|
|
|
for item in items:
|
|
vendor_raw = item.get('vendor_raw', '').strip()
|
|
if not vendor_raw:
|
|
continue
|
|
|
|
vendor_lower = vendor_raw.lower()
|
|
vendor_variants[vendor_lower].append(vendor_raw)
|
|
|
|
# Normalize each vendor group
|
|
for vendor_lower, variants in vendor_variants.items():
|
|
# Get distinct variants
|
|
distinct_variants = list(set(variants))
|
|
|
|
# Find the best normalized form
|
|
# Prefer variants with mixed case (like "AutStand") over all lowercase
|
|
best_variant = None
|
|
for variant in distinct_variants:
|
|
# Check if variant has mixed case (indicates intentional capitalization)
|
|
if variant != variant.lower() and variant != variant.upper():
|
|
best_variant = variant
|
|
break
|
|
|
|
# If no mixed case found, use most common variant
|
|
if not best_variant:
|
|
best_variant = max(distinct_variants, key=lambda v: (variants.count(v), len(v)))
|
|
|
|
# Normalize the vendor name
|
|
normalized = self._normalize_vendor_case(best_variant)
|
|
|
|
# Map all variants to normalized name
|
|
for variant in distinct_variants:
|
|
self.vendor_normalization_map[variant.lower()] = normalized
|
|
|
|
# Store variant group for reference
|
|
self._vendor_groups[normalized] = distinct_variants
|
|
|
|
def _normalize_vendor_case(self, vendor: str) -> str:
|
|
"""
|
|
Normalize vendor name case using intelligent rules.
|
|
|
|
Args:
|
|
vendor: Raw vendor name
|
|
|
|
Returns:
|
|
Normalized vendor name
|
|
"""
|
|
if not vendor:
|
|
return "MISC"
|
|
|
|
vendor = vendor.strip()
|
|
|
|
# Handle combined vendors (Autstand/Beumer, DCS/Autstand)
|
|
if '/' in vendor:
|
|
parts = []
|
|
for part in vendor.split('/'):
|
|
part = part.strip()
|
|
# Title case each part, but preserve acronyms (all caps)
|
|
if part.isupper() or len(part) <= 3:
|
|
parts.append(part.upper())
|
|
else:
|
|
parts.append(part.title())
|
|
return '/'.join(parts)
|
|
|
|
# Handle vendors in parentheses (e.g., "MFO (Amazon)")
|
|
if '(' in vendor and ')' in vendor:
|
|
main_part = vendor.split('(')[0].strip()
|
|
paren_part = vendor.split('(')[1].split(')')[0].strip()
|
|
normalized_main = self._normalize_vendor_case(main_part)
|
|
normalized_paren = paren_part.title()
|
|
return f"{normalized_main} ({normalized_paren})"
|
|
|
|
# Handle acronyms (all caps short names)
|
|
if vendor.isupper() or (len(vendor) <= 4 and vendor.isalpha()):
|
|
return vendor.upper()
|
|
|
|
# Default: title case
|
|
return vendor.title()
|
|
|
|
def normalize_vendor_name(self, vendor: str) -> str:
|
|
"""
|
|
Normalize vendor name using the built normalization map.
|
|
|
|
Args:
|
|
vendor: Raw vendor name from Excel
|
|
|
|
Returns:
|
|
Normalized vendor name
|
|
"""
|
|
if not vendor:
|
|
return "MISC"
|
|
|
|
vendor_lower = vendor.strip().lower()
|
|
|
|
# Check normalization map (built from actual data)
|
|
if vendor_lower in self.vendor_normalization_map:
|
|
return self.vendor_normalization_map[vendor_lower]
|
|
|
|
# Fallback: normalize case (for new vendors not seen before)
|
|
return self._normalize_vendor_case(vendor.strip())
|
|
|
|
def normalize_status(self, status: str) -> Tuple[str, bool]:
|
|
"""
|
|
Normalize status and determine if closed.
|
|
|
|
Args:
|
|
status: Raw status string
|
|
|
|
Returns:
|
|
Tuple of (normalized_status, is_closed)
|
|
"""
|
|
if not status:
|
|
return "Incomplete", False
|
|
|
|
status_lower = status.lower().strip()
|
|
|
|
# Check for incomplete status FIRST (before checking for "complete" substring)
|
|
# because "incomplete" contains "complete" as a substring!
|
|
if status_lower == 'incomplete' or status_lower.startswith('incomplete'):
|
|
return "Incomplete", False
|
|
|
|
# Check for closed status (exact match or starts with)
|
|
if status_lower == 'complete' or status_lower.startswith('complete') or 'complette' in status_lower:
|
|
return "Complete", True
|
|
|
|
# Check for monitor status
|
|
if 'monitor' in status_lower or 'montor' in status_lower:
|
|
return "Monitor", False
|
|
|
|
# Default to incomplete/open
|
|
return "Incomplete", False
|
|
|
|
def normalize_priority(self, priority: str) -> Tuple[str, Optional[str]]:
|
|
"""
|
|
Normalize priority and classify level.
|
|
|
|
Args:
|
|
priority: Raw priority string
|
|
|
|
Returns:
|
|
Tuple of (normalized_priority, priority_level)
|
|
priority_level: "very_high", "high", "medium", "low", "monitoring", "complete", None
|
|
"""
|
|
if not priority:
|
|
return "", None
|
|
|
|
priority_lower = priority.lower()
|
|
|
|
# Very High priority
|
|
if '(1) very high' in priority_lower or '(1) very hgh' in priority_lower:
|
|
return priority, "very_high"
|
|
if 'very high' in priority_lower or 'very hgh' in priority_lower:
|
|
return priority, "very_high"
|
|
|
|
# High priority (but not Very High)
|
|
if '(2) high' in priority_lower or '(2) hgh' in priority_lower:
|
|
return priority, "high"
|
|
if priority_lower.startswith('2) high') or priority_lower.startswith('2) hgh'):
|
|
return priority, "high"
|
|
if priority_lower == 'high' and 'very' not in priority_lower:
|
|
return priority, "high"
|
|
|
|
# Medium priority
|
|
if '(3) medium' in priority_lower:
|
|
return priority, "medium"
|
|
if priority_lower == 'medium':
|
|
return priority, "medium"
|
|
|
|
# Low priority
|
|
if '(4) low' in priority_lower:
|
|
return priority, "low"
|
|
if priority_lower == 'low':
|
|
return priority, "low"
|
|
|
|
# Monitoring priority
|
|
if '(5) monitoring' in priority_lower:
|
|
return priority, "monitoring"
|
|
|
|
# Complete priority
|
|
if '(6) complete' in priority_lower:
|
|
return priority, "complete"
|
|
|
|
return priority, None
|
|
|
|
def parse_date(self, date_str: str) -> Optional[datetime]:
|
|
"""
|
|
Parse date from various formats and return timezone-aware datetime in Baltimore/Eastern timezone.
|
|
|
|
Args:
|
|
date_str: Date string in various formats
|
|
|
|
Returns:
|
|
Parsed datetime in Baltimore timezone or None
|
|
"""
|
|
if not date_str or date_str.strip() == '':
|
|
return None
|
|
|
|
date_str = date_str.strip()
|
|
baltimore_tz = ZoneInfo("America/New_York")
|
|
|
|
# Try different formats
|
|
formats = [
|
|
"%m/%d/%y", # 10/14/25
|
|
"%m/%d/%Y", # 10/14/2025
|
|
"%Y-%m-%d %H:%M:%S", # 2025-10-17 00:00:00
|
|
"%Y-%m-%d", # 2025-10-17
|
|
]
|
|
|
|
for fmt in formats:
|
|
try:
|
|
parsed_date = datetime.strptime(date_str, fmt)
|
|
# Make timezone-aware in Baltimore timezone
|
|
if parsed_date.tzinfo is None:
|
|
return parsed_date.replace(tzinfo=baltimore_tz)
|
|
else:
|
|
return parsed_date.astimezone(baltimore_tz)
|
|
except ValueError:
|
|
continue
|
|
|
|
return None
|
|
|
|
def is_within_24_hours(self, date: Optional[datetime]) -> bool:
|
|
"""
|
|
Check if date falls within yesterday (previous calendar day) in Baltimore/Eastern timezone.
|
|
This checks if the date is on yesterday's date, regardless of the exact time.
|
|
|
|
Args:
|
|
date: Date to check (should be timezone-aware in Baltimore timezone)
|
|
|
|
Returns:
|
|
True if date is yesterday (previous calendar day)
|
|
"""
|
|
if not date:
|
|
return False
|
|
|
|
# Ensure both dates are timezone-aware in Baltimore timezone
|
|
baltimore_tz = ZoneInfo("America/New_York")
|
|
|
|
# Convert date to Baltimore timezone if needed
|
|
if date.tzinfo is None:
|
|
# If date is timezone-naive, assume it's in Baltimore time
|
|
date_baltimore = date.replace(tzinfo=baltimore_tz)
|
|
else:
|
|
# Convert to Baltimore timezone
|
|
date_baltimore = date.astimezone(baltimore_tz)
|
|
|
|
# Ensure current_date is also in Baltimore timezone (should already be, but defensive check)
|
|
if self.current_date.tzinfo is None:
|
|
current_baltimore = self.current_date.replace(tzinfo=baltimore_tz)
|
|
else:
|
|
current_baltimore = self.current_date.astimezone(baltimore_tz)
|
|
|
|
# Get yesterday's date (previous calendar day)
|
|
yesterday = current_baltimore - timedelta(days=1)
|
|
yesterday_date = yesterday.date()
|
|
date_to_check = date_baltimore.date()
|
|
|
|
# Check if the date falls on yesterday
|
|
return date_to_check == yesterday_date
|
|
|
|
def parse_excel_row(self, cols: List[str]) -> Optional[Dict]:
|
|
"""
|
|
Parse a single Excel row into structured format.
|
|
|
|
Args:
|
|
cols: List of column values (tab-separated)
|
|
|
|
Returns:
|
|
Structured item dict or None if invalid
|
|
"""
|
|
if len(cols) < 8:
|
|
return None
|
|
|
|
punchlist_name = cols[0].strip()
|
|
vendor_raw = cols[1].strip() if len(cols) > 1 else ""
|
|
priority_raw = cols[2].strip() if len(cols) > 2 else ""
|
|
description = cols[3].strip() if len(cols) > 3 else ""
|
|
date_identified_str = cols[4].strip() if len(cols) > 4 else ""
|
|
status_updates = cols[5].strip() if len(cols) > 5 else ""
|
|
issue_image = cols[6].strip() if len(cols) > 6 else ""
|
|
status_raw = cols[7].strip() if len(cols) > 7 else ""
|
|
date_completed_str = cols[8].strip() if len(cols) > 8 else ""
|
|
|
|
if not punchlist_name:
|
|
return None
|
|
|
|
# Normalize fields
|
|
vendor = self.normalize_vendor_name(vendor_raw)
|
|
status, is_closed = self.normalize_status(status_raw)
|
|
priority, priority_level = self.normalize_priority(priority_raw)
|
|
|
|
# Parse dates
|
|
date_identified = self.parse_date(date_identified_str)
|
|
date_completed = self.parse_date(date_completed_str)
|
|
|
|
# Calculate age
|
|
age_days = None
|
|
if date_identified:
|
|
age_days = (self.current_date - date_identified).days
|
|
|
|
# Check 24-hour updates
|
|
is_recent_added = date_identified and self.is_within_24_hours(date_identified) and not is_closed
|
|
is_recent_closed = date_completed and self.is_within_24_hours(date_completed) and is_closed
|
|
is_recent_monitor = status == "Monitor" and (date_identified and self.is_within_24_hours(date_identified) or
|
|
date_completed and self.is_within_24_hours(date_completed))
|
|
|
|
return {
|
|
'punchlist_name': punchlist_name,
|
|
'vendor': vendor,
|
|
'vendor_raw': vendor_raw, # Keep original for reference
|
|
'priority': priority,
|
|
'priority_level': priority_level,
|
|
'description': description,
|
|
'date_identified': date_identified,
|
|
'date_identified_str': date_identified_str, # Keep original
|
|
'date_completed': date_completed,
|
|
'date_completed_str': date_completed_str, # Keep original
|
|
'status': status,
|
|
'status_raw': status_raw, # Keep original
|
|
'is_closed': is_closed,
|
|
'status_updates': status_updates,
|
|
'issue_image': issue_image,
|
|
'age_days': age_days,
|
|
'is_recent_added': is_recent_added,
|
|
'is_recent_closed': is_recent_closed,
|
|
'is_recent_monitor': is_recent_monitor,
|
|
}
|
|
|
|
def process_excel_file(self, excel_path: str) -> List[Dict]:
|
|
"""
|
|
Process Excel file directly using pandas for reliable parsing.
|
|
|
|
Args:
|
|
excel_path: Path to Excel file
|
|
|
|
Returns:
|
|
List of structured item dictionaries
|
|
"""
|
|
items = []
|
|
|
|
try:
|
|
xl_file = pd.ExcelFile(excel_path)
|
|
|
|
for sheet_name in xl_file.sheet_names:
|
|
# Skip certain sheets that are known to be duplicates/backups
|
|
if sheet_name.lower() in ['sheet1', 'comments']:
|
|
continue
|
|
# Read sheet
|
|
df = pd.read_excel(xl_file, sheet_name=sheet_name)
|
|
|
|
# Handle empty dataframe
|
|
if df.empty:
|
|
continue
|
|
|
|
# Fill NaN values
|
|
df = df.fillna("")
|
|
|
|
# Process each row (first pass - collect raw vendor names)
|
|
raw_items = []
|
|
for _, row in df.iterrows():
|
|
# Get columns (handle different column names)
|
|
cols = []
|
|
for i in range(max(9, len(df.columns))):
|
|
if i < len(df.columns):
|
|
cols.append(str(row.iloc[i]) if pd.notna(row.iloc[i]) else "")
|
|
else:
|
|
cols.append("")
|
|
|
|
# Parse without normalization first
|
|
vendor_raw = cols[1].strip() if len(cols) > 1 else ""
|
|
if cols[0].strip(): # Has punchlist name
|
|
raw_items.append({'vendor_raw': vendor_raw, 'cols': cols})
|
|
|
|
# Build vendor normalization map from actual data
|
|
if raw_items:
|
|
self._build_vendor_normalization_map(raw_items)
|
|
|
|
# Second pass - parse with normalization
|
|
seen_items = set() # Track seen items by (punchlist_name, vendor) tuple
|
|
for raw_item in raw_items:
|
|
item = self.parse_excel_row(raw_item['cols'])
|
|
if item:
|
|
# Create unique key for deduplication
|
|
item_key = (item['punchlist_name'].strip().lower(), item['vendor'].strip().lower())
|
|
if item_key not in seen_items:
|
|
seen_items.add(item_key)
|
|
items.append(item)
|
|
|
|
except Exception as e:
|
|
print(f"Error processing Excel file {excel_path}: {e}")
|
|
|
|
self.items = items
|
|
return items
|
|
|
|
def process_excel_text(self, excel_text: str) -> List[Dict]:
|
|
"""
|
|
Process raw Excel text into structured items (fallback method).
|
|
|
|
Args:
|
|
excel_text: Raw text from Excel loader
|
|
|
|
Returns:
|
|
List of structured item dictionaries
|
|
"""
|
|
lines = excel_text.split('\n')
|
|
items = []
|
|
current_item_cols = None
|
|
|
|
for line in lines:
|
|
# Skip header lines
|
|
if line.startswith('FILENAME') or line.startswith('SHEET') or line.startswith('='):
|
|
continue
|
|
|
|
if not line.strip():
|
|
continue
|
|
|
|
# Try tab-separated first (more reliable)
|
|
cols = line.split('\t')
|
|
|
|
# If no tabs, try space-separated (LangChain output)
|
|
if len(cols) < 8:
|
|
# Split by multiple spaces
|
|
cols = re.split(r'\s{2,}', line)
|
|
|
|
# Check if this looks like a new item (has punchlist name in first column)
|
|
if len(cols) >= 8 and cols[0].strip():
|
|
# Save previous item if exists
|
|
if current_item_cols:
|
|
item = self.parse_excel_row(current_item_cols)
|
|
if item:
|
|
items.append(item)
|
|
|
|
# Start new item
|
|
current_item_cols = cols
|
|
elif current_item_cols and len(cols) > 0:
|
|
# Continuation line - merge with current item
|
|
# Usually status updates continue on next line
|
|
if len(current_item_cols) > 5:
|
|
current_item_cols[5] += " " + line.strip()
|
|
|
|
# Don't forget last item
|
|
if current_item_cols:
|
|
item = self.parse_excel_row(current_item_cols)
|
|
if item:
|
|
items.append(item)
|
|
|
|
self.items = items
|
|
return items
|
|
|
|
def get_preprocessed_summary(self) -> Dict:
|
|
"""
|
|
Generate summary statistics from preprocessed data.
|
|
|
|
Returns:
|
|
Summary dictionary with vendor counts, etc.
|
|
"""
|
|
vendors = defaultdict(lambda: {
|
|
'items': [],
|
|
'closed': 0,
|
|
'open': 0,
|
|
'monitor': 0,
|
|
'very_high': [],
|
|
'high': [],
|
|
'unaddressed': [],
|
|
'recent_added': [],
|
|
'recent_closed': [],
|
|
'recent_monitor': []
|
|
})
|
|
|
|
for item in self.items:
|
|
vendor = item['vendor']
|
|
vendors[vendor]['items'].append(item)
|
|
|
|
if item['is_closed']:
|
|
vendors[vendor]['closed'] += 1
|
|
elif item['status'] == 'Monitor':
|
|
vendors[vendor]['monitor'] += 1
|
|
else:
|
|
vendors[vendor]['open'] += 1
|
|
|
|
if item['priority_level'] == 'very_high':
|
|
vendors[vendor]['very_high'].append(item)
|
|
elif item['priority_level'] == 'high':
|
|
vendors[vendor]['high'].append(item)
|
|
|
|
# Unaddressed = not closed AND not in Monitor status (open/incomplete items that need action)
|
|
if not item['is_closed'] and item['status'] != 'Monitor':
|
|
vendors[vendor]['unaddressed'].append(item)
|
|
|
|
if item['is_recent_added']:
|
|
vendors[vendor]['recent_added'].append(item)
|
|
if item['is_recent_closed']:
|
|
vendors[vendor]['recent_closed'].append(item)
|
|
if item['is_recent_monitor']:
|
|
vendors[vendor]['recent_monitor'].append(item)
|
|
|
|
# Sort unaddressed by date (oldest first)
|
|
baltimore_tz = ZoneInfo("America/New_York")
|
|
max_datetime = datetime.max.replace(tzinfo=baltimore_tz)
|
|
for vendor in vendors.values():
|
|
vendor['unaddressed'].sort(key=lambda x: x['date_identified'] or max_datetime)
|
|
|
|
return dict(vendors)
|
|
|
|
def format_for_llm(self) -> str:
|
|
"""
|
|
Format preprocessed data for human inspection.
|
|
|
|
Returns:
|
|
Formatted string with normalized, structured data
|
|
"""
|
|
summary = self.get_preprocessed_summary()
|
|
output_lines = []
|
|
|
|
output_lines.append("PREPROCESSED EXCEL DATA")
|
|
output_lines.append("=" * 80)
|
|
# Show timezone-aware datetime with timezone info
|
|
if self.current_date.tzinfo:
|
|
output_lines.append(f"Current Date (Baltimore/Eastern): {self.current_date.strftime('%Y-%m-%d %H:%M:%S %Z')}")
|
|
else:
|
|
output_lines.append(f"Current Date: {self.current_date.strftime('%Y-%m-%d %H:%M:%S')}")
|
|
output_lines.append(f"Total Items: {len(self.items)}")
|
|
output_lines.append("")
|
|
|
|
for vendor_name, vendor_data in sorted(summary.items()):
|
|
output_lines.append(f"VENDOR: {vendor_name}")
|
|
output_lines.append("-" * 80)
|
|
output_lines.append(f"Total Items: {len(vendor_data['items'])}")
|
|
output_lines.append(f" Closed: {vendor_data['closed']}")
|
|
output_lines.append(f" Open: {vendor_data['open']}")
|
|
output_lines.append(f" Monitor: {vendor_data['monitor']}")
|
|
output_lines.append("")
|
|
|
|
# Recent updates
|
|
if vendor_data['recent_added'] or vendor_data['recent_closed'] or vendor_data['recent_monitor']:
|
|
output_lines.append("RECENT UPDATES (Yesterday's Date):")
|
|
for item in vendor_data['recent_added']:
|
|
output_lines.append(f" ADDED: {item['punchlist_name']} | {item['date_identified_str']} | {item['status']}")
|
|
for item in vendor_data['recent_closed']:
|
|
output_lines.append(f" CLOSED: {item['punchlist_name']} | {item['date_completed_str']} | {item['status']}")
|
|
for item in vendor_data['recent_monitor']:
|
|
output_lines.append(f" MONITOR: {item['punchlist_name']} | {item['date_identified_str']} | {item['status']}")
|
|
output_lines.append("")
|
|
|
|
# Oldest unaddressed
|
|
if vendor_data['unaddressed']:
|
|
output_lines.append("OLDEST UNADDRESSED (Top 3):")
|
|
for item in vendor_data['unaddressed'][:3]:
|
|
output_lines.append(f" {item['punchlist_name']} | Age: {item['age_days']} days | {item['date_identified_str']} | {item['status']}")
|
|
output_lines.append("")
|
|
|
|
# Priority items
|
|
if vendor_data['very_high']:
|
|
output_lines.append(f"VERY HIGH PRIORITY ({len(vendor_data['very_high'])} items):")
|
|
for item in vendor_data['very_high']:
|
|
output_lines.append(f" {item['punchlist_name']} | {item['status']} | {item['date_identified_str']}")
|
|
output_lines.append("")
|
|
|
|
if vendor_data['high']:
|
|
output_lines.append(f"HIGH PRIORITY ({len(vendor_data['high'])} items):")
|
|
for item in vendor_data['high']:
|
|
output_lines.append(f" {item['punchlist_name']} | {item['status']} | {item['date_identified_str']}")
|
|
output_lines.append("")
|
|
|
|
# All items
|
|
output_lines.append("ALL ITEMS:")
|
|
for item in vendor_data['items']:
|
|
output_lines.append(
|
|
f" {item['punchlist_name']} | "
|
|
f"Vendor: {item['vendor']} | "
|
|
f"Priority: {item['priority']} ({item['priority_level']}) | "
|
|
f"Status: {item['status']} ({'CLOSED' if item['is_closed'] else 'OPEN'}) | "
|
|
f"Date: {item['date_identified_str']} | "
|
|
f"Description: {item['description'][:50] if item['description'] else 'N/A'}..."
|
|
)
|
|
output_lines.append("")
|
|
output_lines.append("=" * 80)
|
|
output_lines.append("")
|
|
|
|
return "\n".join(output_lines)
|
|
|
|
|
|
def preprocess_excel_data(excel_text: str, current_date: Optional[datetime] = None) -> Tuple[str, Dict]:
|
|
"""
|
|
Preprocess Excel data and return formatted string for LLM.
|
|
|
|
Args:
|
|
excel_text: Raw Excel text from loader
|
|
current_date: Current date for calculations
|
|
|
|
Returns:
|
|
Tuple of (formatted_string, summary_dict)
|
|
"""
|
|
preprocessor = DataPreprocessor(current_date=current_date)
|
|
preprocessor.process_excel_text(excel_text)
|
|
formatted = preprocessor.format_for_llm()
|
|
summary = preprocessor.get_preprocessed_summary()
|
|
|
|
return formatted, summary
|
|
|
|
|
|
def preprocess_excel_files(reports_dir: str = "reports", current_date: Optional[datetime] = None) -> Tuple[str, Dict]:
|
|
"""
|
|
Preprocess Excel files directly (more reliable than text parsing).
|
|
|
|
Args:
|
|
reports_dir: Directory containing Excel files
|
|
current_date: Current date for calculations
|
|
|
|
Returns:
|
|
Tuple of (formatted_string, summary_dict)
|
|
"""
|
|
preprocessor = DataPreprocessor(current_date=current_date)
|
|
reports_path = Path(reports_dir)
|
|
|
|
if not reports_path.exists():
|
|
return f"Reports directory '{reports_dir}' not found.", {}
|
|
|
|
excel_files = list(reports_path.glob("*.xlsx")) + list(reports_path.glob("*.xls"))
|
|
|
|
if not excel_files:
|
|
return f"No Excel files found in '{reports_dir}' directory.", {}
|
|
|
|
# First pass: collect all items with raw vendor names
|
|
all_raw_items = []
|
|
for excel_file in excel_files:
|
|
try:
|
|
xl_file = pd.ExcelFile(str(excel_file))
|
|
for sheet_name in xl_file.sheet_names:
|
|
# Skip certain sheets that are known to be duplicates/backups
|
|
if sheet_name.lower() in ['sheet1', 'comments']:
|
|
continue
|
|
df = pd.read_excel(xl_file, sheet_name=sheet_name)
|
|
if df.empty:
|
|
continue
|
|
df = df.fillna("")
|
|
for _, row in df.iterrows():
|
|
cols = []
|
|
for i in range(max(9, len(df.columns))):
|
|
if i < len(df.columns):
|
|
cols.append(str(row.iloc[i]) if pd.notna(row.iloc[i]) else "")
|
|
else:
|
|
cols.append("")
|
|
if cols[0].strip(): # Has punchlist name
|
|
all_raw_items.append({'vendor_raw': cols[1].strip() if len(cols) > 1 else "", 'cols': cols})
|
|
except Exception as e:
|
|
print(f"Error reading {excel_file}: {e}")
|
|
|
|
# Build vendor normalization map from all collected data
|
|
if all_raw_items:
|
|
preprocessor._build_vendor_normalization_map(all_raw_items)
|
|
|
|
# Second pass: process with normalization
|
|
all_items = []
|
|
seen_items_global = set() # Track seen items across all files by (punchlist_name, vendor) tuple
|
|
for excel_file in excel_files:
|
|
items = preprocessor.process_excel_file(str(excel_file))
|
|
for item in items:
|
|
# Deduplicate across all files
|
|
item_key = (item['punchlist_name'].strip().lower(), item['vendor'].strip().lower())
|
|
if item_key not in seen_items_global:
|
|
seen_items_global.add(item_key)
|
|
all_items.append(item)
|
|
|
|
preprocessor.items = all_items
|
|
formatted = preprocessor.format_for_llm()
|
|
summary = preprocessor.get_preprocessed_summary()
|
|
|
|
return formatted, summary
|
|
|