import pandas as pd import os import sys import re from classifiers import classify_signal, get_device_type from utils import normalize_vfd_name, normalize_io_path, assign_partnumber, combine_desc from io_paths import get_io_path def _is_hub_module(tagname): """Check if a TAGNAME represents a hub module (FIO1H1, FIO2H3, etc.)""" import re tagname_str = str(tagname).upper() # Match patterns like FIO1H1, FIO2H3, FIOH1, etc. hub_pattern = re.compile(r'FIO(\d+)?H\d+', re.IGNORECASE) return bool(hub_pattern.search(tagname_str)) def process_data(desc_df, network_df, original_file, apf_df, m12dr_df, hub_df, sorter_hub_df, sio_df, ib16_df, ob16e_df, ib16s_df): validation_errors = [] # Ignore rows from DESC sheet with an empty or NaN TAGNAME original_desc_rows = len(desc_df) desc_df = desc_df.dropna(subset=['TAGNAME']) desc_df = desc_df[desc_df['TAGNAME'].astype(str).str.strip() != ''] rows_dropped = original_desc_rows - len(desc_df) if rows_dropped > 0: print(f"\nNOTE: Ignored {rows_dropped} rows from DESC sheet with an empty TAGNAME.") desc_df['NORMALIZED_TAGNAME'] = desc_df['TAGNAME'].apply(normalize_vfd_name) network_df['NORMALIZED_NAME'] = network_df['Name'].apply(normalize_vfd_name) if network_df['NORMALIZED_NAME'].duplicated().any(): print("\nWARNING: The following names in the 'NETWORK_PLC' sheet become duplicates after normalization (e.g., 'DEV01' and 'DEV1'). Using the first occurrence for IP and PartNumber mapping.") duplicated_names = network_df[network_df['NORMALIZED_NAME'].duplicated(keep=False)].sort_values('NORMALIZED_NAME') print(duplicated_names[['Name', 'NORMALIZED_NAME']].to_string(index=False)) unique_network_df = network_df.drop_duplicates(subset=['NORMALIZED_NAME'], keep='first') network_mapping = dict(zip(unique_network_df['NORMALIZED_NAME'], unique_network_df['IP'])) part_number_mapping = dict(zip(unique_network_df['NORMALIZED_NAME'], unique_network_df['PartNumber'])) # Create a mapping from DPM -> DPM_IP for unique DPM entries dpm_mapping = dict(zip(network_df['DPM'], network_df['DPM_IP'])) # Start with DESC data and add IP and PartNumber columns result_df = desc_df.copy() # Fill empty DESCA with "SPARE" result_df['DESCA'] = result_df['DESCA'].fillna('SPARE') result_df.loc[result_df['DESCA'] == '', 'DESCA'] = 'SPARE' # Map IP and PARTNUMBER, but skip hub modules (they connect via IO-Link to parent masters) def get_ip_for_device(normalized_tagname, tagname): # Hub modules don't have their own IP addresses if _is_hub_module(tagname): return '' return network_mapping.get(normalized_tagname, '') def get_partnumber_for_device(normalized_tagname, tagname): # Hub modules don't get part numbers from network sheet if _is_hub_module(tagname): return '' return part_number_mapping.get(normalized_tagname, '') result_df['IP'] = result_df.apply(lambda row: get_ip_for_device(row['NORMALIZED_TAGNAME'], row['TAGNAME']), axis=1) result_df['PARTNUMBER'] = result_df.apply(lambda row: get_partnumber_for_device(row['NORMALIZED_TAGNAME'], row['TAGNAME']), axis=1) # Add DPM column mapping for FIOM-DPM relationships (skip hub modules) def get_dpm_for_device(normalized_tagname, tagname): # Hub modules don't have DPM relationships if _is_hub_module(tagname): return '' return dpm_to_devices_mapping.get(normalized_tagname, '') dpm_to_devices_mapping = dict(zip(network_df['NORMALIZED_NAME'], network_df['DPM'])) result_df['DPM'] = result_df.apply(lambda row: get_dpm_for_device(row['NORMALIZED_TAGNAME'], row['TAGNAME']), axis=1) result_df['PARTNUMBER'] = result_df.apply(assign_partnumber, axis=1) result_df['DESC'] = result_df.apply(combine_desc, axis=1) # Add signal classification and IO path columns print("\nClassifying signals and adding IO paths...") # Add signal classification column result_df['SIGNAL'] = result_df.apply(lambda row: classify_signal(row['DESCA'], row['TAGNAME'], row['DESCB'], original_file), axis=1) # Add device type column result_df['DEVICE_TYPE'] = result_df['TAGNAME'].apply(get_device_type) # Add IO path column result_df['IO_PATH'] = result_df.apply( lambda row: get_io_path( row['TAGNAME'], row['TERM'], row['SIGNAL'], row['DEVICE_TYPE'], apf_df, m12dr_df, hub_df, sorter_hub_df, sio_df, ib16_df, ob16e_df, ib16s_df ), axis=1 ) # Print statistics about signal classification signal_counts = result_df['SIGNAL'].value_counts() print(f"\nSignal classification results:") for signal_type, count in signal_counts.items(): print(f" {signal_type}: {count}") # Show TAGNAMEs classified as UNKNOWN unknown_entries = result_df[result_df['SIGNAL'] == 'UNKNOWN'] if len(unknown_entries) > 0: print(f"\nFound {len(unknown_entries)} entries with UNKNOWN signal classification.") for _, row in unknown_entries.iterrows(): desca = row['DESCA'] if pd.notna(row['DESCA']) and row['DESCA'] != '' else 'N/A' validation_errors.append(f"Signal UNKNOWN for TAGNAME: {row['TAGNAME']} (DESCA: '{desca}')") device_counts = result_df['DEVICE_TYPE'].value_counts() print(f"\nDevice type distribution:") for device_type, count in device_counts.items(): print(f" {device_type}: {count}") # Count successful IO path mappings successful_mappings = result_df['IO_PATH'].notna().sum() total_rows = len(result_df) print(f"\nIO Path mapping results:") print(f" Successful mappings: {successful_mappings}/{total_rows} ({successful_mappings/total_rows*100:.1f}%)") # Log missing mappings for debugging missing_mappings = result_df[result_df['IO_PATH'].isna() & result_df['TERM'].notna()] if len(missing_mappings) > 0: print(f"\nFound {len(missing_mappings)} entries with missing IO path mappings.") for _, row in missing_mappings.iterrows(): tag, term, sig, dev = row['TAGNAME'], row['TERM'], row['SIGNAL'], row['DEVICE_TYPE'] desca = row['DESCA'] if pd.notna(row['DESCA']) and row['DESCA'] != '' else 'N/A' error_msg = f"On device '{tag}' ({dev}), cannot connect signal '{sig}' to channel number '{term}'. (DESCA: '{desca}')" validation_errors.append(error_msg) # Convert failed mappings to SPARE idx = result_df[(result_df['TAGNAME'] == tag) & (result_df['TERM'] == term)].index if len(idx) > 0: result_df.loc[idx, 'DESCA'] = 'SPARE' result_df.loc[idx, 'SIGNAL'] = 'SPARE' # Try to get SPARE path spare_path = get_io_path( tag, term, 'SPARE', dev, apf_df, m12dr_df, hub_df, sorter_hub_df, sio_df, ib16_df, ob16e_df, ib16s_df ) result_df.loc[idx, 'IO_PATH'] = spare_path # Log names in DESC but not in NETWORK (using normalized names for comparison) desc_names_norm = set(desc_df['NORMALIZED_TAGNAME'].unique()) network_names_norm = set(unique_network_df['NORMALIZED_NAME'].unique()) dpm_names = set(network_df['DPM'].unique()) missing_in_network_norm = desc_names_norm - network_names_norm if missing_in_network_norm: # Get the original tagnames that are missing to report them missing_desc_rows = desc_df[desc_df['NORMALIZED_TAGNAME'].isin(missing_in_network_norm)] original_missing_tagnames = set(missing_desc_rows['TAGNAME'].unique()) # Filter out hub modules (FIOH and FIO1H1, FIO2H2 patterns) for logging missing_in_network_filtered = {name for name in original_missing_tagnames if not _is_hub_module(name)} if missing_in_network_filtered: print(f"\nFound {len(missing_in_network_filtered)} TAGNAMEs present in DESC but not in NETWORK_PLC.") for name in sorted(list(missing_in_network_filtered)): validation_errors.append(f"TAGNAME '{name}' from DESC sheet not found in NETWORK_PLC sheet.") # Add names from NETWORK_PLC that are missing in DESC missing_in_desc_norm = network_names_norm - desc_names_norm if missing_in_desc_norm: # Get the original rows from network_df to add, ensuring uniqueness rows_to_add = unique_network_df[unique_network_df['NORMALIZED_NAME'].isin(missing_in_desc_norm)] new_rows = [] for _, row in rows_to_add.iterrows(): new_rows.append({ 'TAGNAME': row['Name'], # Use original name 'TERM': '', 'DESCA': '', 'DESCB': '', 'IP': row['IP'], 'PARTNUMBER': row['PartNumber'], 'DESC': '', 'DPM': row.get('DPM', '') # Include DPM relationship }) if new_rows: new_rows_df = pd.DataFrame(new_rows) result_df = pd.concat([result_df, new_rows_df], ignore_index=True) # Get all existing tagnames (including those just added) all_existing_names = set(result_df['TAGNAME'].unique()) # Add unique DPM names that are not already in the result missing_dpm_names = dpm_names - all_existing_names if missing_dpm_names: print(f"\nAdding unique DPM names not present in DESC or NETWORK_PLC.Name: {sorted(missing_dpm_names)}") # Create rows for missing DPM names dpm_rows = [] for dpm_name in missing_dpm_names: dpm_ip = dpm_mapping[dpm_name] dpm_rows.append({ 'TAGNAME': dpm_name, 'TERM': '', 'DESCA': '', 'DESCB': '', 'IP': dpm_ip, 'PARTNUMBER': '', # DPM entries don't have part numbers 'DESC': '', 'DPM': dpm_name # DPM devices reference themselves }) # Append DPM rows dpm_rows_df = pd.DataFrame(dpm_rows) result_df = pd.concat([result_df, dpm_rows_df], ignore_index=True) # Apply part number assignment to newly added DPM entries mask = result_df['TAGNAME'].isin(missing_dpm_names) result_df.loc[mask, 'PARTNUMBER'] = result_df.loc[mask].apply(assign_partnumber, axis=1) # Check for validation errors and terminate if any are found if validation_errors: print("\n" + "="*80) print("WARNING: The following issues were found but processing will continue:") print("="*80) for error in validation_errors: print(f"- {error}") print("="*80) print("\nContinuing with processing...") # Normalize TAGNAME, DESC, and IO_PATH in the final result before saving (only for VFDs) print("\nNormalizing TAGNAME, DESC, and IO_PATH columns for VFDs only in the final output...") result_df['TAGNAME'] = result_df['TAGNAME'].apply(normalize_vfd_name) result_df['DESC'] = result_df['DESC'].apply(normalize_vfd_name) result_df['IO_PATH'] = result_df['IO_PATH'].apply(normalize_io_path) # Drop the temporary normalized column before sorting if 'NORMALIZED_TAGNAME' in result_df.columns: result_df = result_df.drop(columns=['NORMALIZED_TAGNAME']) # Convert TAGNAME to string to prevent sorting errors with mixed types result_df['TAGNAME'] = result_df['TAGNAME'].astype(str) # Sort by PARTNUMBER, TAGNAME, and then TERM for better organization # Fill NaN values to ensure consistent sorting result_df['PARTNUMBER'] = result_df['PARTNUMBER'].fillna('') result_df['TERM'] = result_df['TERM'].fillna('') result_df = result_df.sort_values(by=['PARTNUMBER', 'TAGNAME', 'TERM']).reset_index(drop=True) # Remove exact duplicate rows before exporting to Excel/CSV before_dedup = len(result_df) result_df = result_df.drop_duplicates() after_dedup = len(result_df) if before_dedup != after_dedup: print(f"\nRemoved {before_dedup - after_dedup} duplicate rows (exact matches) before saving DESC_IP sheet.") print(f"\nFinal result has {len(result_df)} rows") print("Sample of merged data:") print(result_df.head(10)) # Create new Excel file name with full project prefix (e.g. MTN6_MCM04) project_match = re.search(r"([A-Z0-9]+_MCM\d+)", str(original_file), re.IGNORECASE) if project_match: subsystem = project_match.group(1).upper() else: # Fallback to MCM-only pattern for backward compatibility mcm_match = re.search(r"(MCM\d+)", str(original_file), re.IGNORECASE) subsystem = mcm_match.group(1).upper() if mcm_match else "MCM" new_file = f"{subsystem}_DESC_IP_MERGED.xlsx" try: # Copy all original sheets and add the new one with pd.ExcelWriter(new_file, engine='openpyxl') as writer: # Copy original sheets xl_original = pd.ExcelFile(original_file) for sheet_name in xl_original.sheet_names: df = pd.read_excel(xl_original, sheet_name=sheet_name) df.to_excel(writer, sheet_name=sheet_name, index=False) # Add the new merged sheet result_df.to_excel(writer, sheet_name='DESC_IP', index=False) print(f"\nNew Excel file created: {new_file}") print("The file contains all original sheets plus the new 'DESC_IP' sheet with merged data.") except Exception as e: # If that fails, just save the merged data alone result_df.to_excel(new_file, sheet_name='DESC_IP', index=False) print(f"\nNew Excel file created with merged data only: {new_file}")