302 lines
14 KiB
Python
302 lines
14 KiB
Python
import pandas as pd
|
|
import os
|
|
import sys
|
|
import re
|
|
|
|
from classifiers import classify_signal, get_device_type
|
|
from utils import normalize_vfd_name, normalize_io_path, assign_partnumber, combine_desc
|
|
from io_paths import get_io_path
|
|
|
|
def _is_hub_module(tagname):
|
|
"""Check if a TAGNAME represents a hub module (FIO1H1, FIO2H3, etc.)"""
|
|
import re
|
|
tagname_str = str(tagname).upper()
|
|
# Match patterns like FIO1H1, FIO2H3, FIOH1, etc.
|
|
hub_pattern = re.compile(r'FIO(\d+)?H\d+', re.IGNORECASE)
|
|
return bool(hub_pattern.search(tagname_str))
|
|
|
|
def process_data(desc_df, network_df, original_file, apf_df, m12dr_df, hub_df, sorter_hub_df, sio_df, ib16_df, ob16e_df, ib16s_df):
|
|
validation_errors = []
|
|
|
|
# Ignore rows from DESC sheet with an empty or NaN TAGNAME
|
|
original_desc_rows = len(desc_df)
|
|
desc_df = desc_df.dropna(subset=['TAGNAME'])
|
|
desc_df = desc_df[desc_df['TAGNAME'].astype(str).str.strip() != '']
|
|
rows_dropped = original_desc_rows - len(desc_df)
|
|
if rows_dropped > 0:
|
|
print(f"\nNOTE: Ignored {rows_dropped} rows from DESC sheet with an empty TAGNAME.")
|
|
|
|
desc_df['NORMALIZED_TAGNAME'] = desc_df['TAGNAME'].apply(normalize_vfd_name)
|
|
network_df['NORMALIZED_NAME'] = network_df['Name'].apply(normalize_vfd_name)
|
|
|
|
if network_df['NORMALIZED_NAME'].duplicated().any():
|
|
print("\nWARNING: The following names in the 'NETWORK_PLC' sheet become duplicates after normalization (e.g., 'DEV01' and 'DEV1'). Using the first occurrence for IP and PartNumber mapping.")
|
|
duplicated_names = network_df[network_df['NORMALIZED_NAME'].duplicated(keep=False)].sort_values('NORMALIZED_NAME')
|
|
print(duplicated_names[['Name', 'NORMALIZED_NAME']].to_string(index=False))
|
|
|
|
unique_network_df = network_df.drop_duplicates(subset=['NORMALIZED_NAME'], keep='first')
|
|
network_mapping = dict(zip(unique_network_df['NORMALIZED_NAME'], unique_network_df['IP']))
|
|
part_number_mapping = dict(zip(unique_network_df['NORMALIZED_NAME'], unique_network_df['PartNumber']))
|
|
|
|
# Create a mapping from DPM -> DPM_IP for unique DPM entries
|
|
dpm_mapping = dict(zip(network_df['DPM'], network_df['DPM_IP']))
|
|
|
|
# Start with DESC data and add IP and PartNumber columns
|
|
result_df = desc_df.copy()
|
|
|
|
# Fill empty DESCA with "SPARE"
|
|
result_df['DESCA'] = result_df['DESCA'].fillna('SPARE')
|
|
result_df.loc[result_df['DESCA'] == '', 'DESCA'] = 'SPARE'
|
|
|
|
# Map IP and PARTNUMBER, but skip hub modules (they connect via IO-Link to parent masters)
|
|
def get_ip_for_device(normalized_tagname, tagname):
|
|
# Hub modules don't have their own IP addresses
|
|
if _is_hub_module(tagname):
|
|
return ''
|
|
return network_mapping.get(normalized_tagname, '')
|
|
|
|
def get_partnumber_for_device(normalized_tagname, tagname):
|
|
# Hub modules don't get part numbers from network sheet
|
|
if _is_hub_module(tagname):
|
|
return ''
|
|
return part_number_mapping.get(normalized_tagname, '')
|
|
|
|
result_df['IP'] = result_df.apply(lambda row: get_ip_for_device(row['NORMALIZED_TAGNAME'], row['TAGNAME']), axis=1)
|
|
result_df['PARTNUMBER'] = result_df.apply(lambda row: get_partnumber_for_device(row['NORMALIZED_TAGNAME'], row['TAGNAME']), axis=1)
|
|
|
|
# Add DPM column mapping for FIOM-DPM relationships (skip hub modules)
|
|
def get_dpm_for_device(normalized_tagname, tagname):
|
|
# Hub modules don't have DPM relationships
|
|
if _is_hub_module(tagname):
|
|
return ''
|
|
return dpm_to_devices_mapping.get(normalized_tagname, '')
|
|
|
|
dpm_to_devices_mapping = dict(zip(network_df['NORMALIZED_NAME'], network_df['DPM']))
|
|
result_df['DPM'] = result_df.apply(lambda row: get_dpm_for_device(row['NORMALIZED_TAGNAME'], row['TAGNAME']), axis=1)
|
|
|
|
result_df['PARTNUMBER'] = result_df.apply(assign_partnumber, axis=1)
|
|
|
|
result_df['DESC'] = result_df.apply(combine_desc, axis=1)
|
|
|
|
# Add signal classification and IO path columns
|
|
print("\nClassifying signals and adding IO paths...")
|
|
|
|
# Add signal classification column
|
|
result_df['SIGNAL'] = result_df.apply(lambda row: classify_signal(row['DESCA'], row['TAGNAME'], row['DESCB'], original_file), axis=1)
|
|
|
|
# Add device type column
|
|
result_df['DEVICE_TYPE'] = result_df['TAGNAME'].apply(get_device_type)
|
|
|
|
# Add IO path column
|
|
result_df['IO_PATH'] = result_df.apply(
|
|
lambda row: get_io_path(
|
|
row['TAGNAME'],
|
|
row['TERM'],
|
|
row['SIGNAL'],
|
|
row['DEVICE_TYPE'],
|
|
apf_df,
|
|
m12dr_df,
|
|
hub_df,
|
|
sorter_hub_df,
|
|
sio_df,
|
|
ib16_df,
|
|
ob16e_df,
|
|
ib16s_df
|
|
), axis=1
|
|
)
|
|
|
|
# Print statistics about signal classification
|
|
signal_counts = result_df['SIGNAL'].value_counts()
|
|
print(f"\nSignal classification results:")
|
|
for signal_type, count in signal_counts.items():
|
|
print(f" {signal_type}: {count}")
|
|
|
|
# Show TAGNAMEs classified as UNKNOWN
|
|
unknown_entries = result_df[result_df['SIGNAL'] == 'UNKNOWN']
|
|
if len(unknown_entries) > 0:
|
|
print(f"\nFound {len(unknown_entries)} entries with UNKNOWN signal classification.")
|
|
for _, row in unknown_entries.iterrows():
|
|
desca = row['DESCA'] if pd.notna(row['DESCA']) and row['DESCA'] != '' else 'N/A'
|
|
validation_errors.append(f"Signal UNKNOWN for TAGNAME: {row['TAGNAME']} (DESCA: '{desca}')")
|
|
|
|
device_counts = result_df['DEVICE_TYPE'].value_counts()
|
|
print(f"\nDevice type distribution:")
|
|
for device_type, count in device_counts.items():
|
|
print(f" {device_type}: {count}")
|
|
|
|
# Count successful IO path mappings
|
|
successful_mappings = result_df['IO_PATH'].notna().sum()
|
|
total_rows = len(result_df)
|
|
print(f"\nIO Path mapping results:")
|
|
print(f" Successful mappings: {successful_mappings}/{total_rows} ({successful_mappings/total_rows*100:.1f}%)")
|
|
|
|
# Log missing mappings for debugging
|
|
missing_mappings = result_df[result_df['IO_PATH'].isna() & result_df['TERM'].notna()]
|
|
if len(missing_mappings) > 0:
|
|
print(f"\nFound {len(missing_mappings)} entries with missing IO path mappings.")
|
|
for _, row in missing_mappings.iterrows():
|
|
tag, term, sig, dev = row['TAGNAME'], row['TERM'], row['SIGNAL'], row['DEVICE_TYPE']
|
|
desca = row['DESCA'] if pd.notna(row['DESCA']) and row['DESCA'] != '' else 'N/A'
|
|
error_msg = f"On device '{tag}' ({dev}), cannot connect signal '{sig}' to channel number '{term}'. (DESCA: '{desca}')"
|
|
validation_errors.append(error_msg)
|
|
|
|
# Convert failed mappings to SPARE
|
|
idx = result_df[(result_df['TAGNAME'] == tag) & (result_df['TERM'] == term)].index
|
|
if len(idx) > 0:
|
|
result_df.loc[idx, 'DESCA'] = 'SPARE'
|
|
result_df.loc[idx, 'SIGNAL'] = 'SPARE'
|
|
# Try to get SPARE path
|
|
spare_path = get_io_path(
|
|
tag,
|
|
term,
|
|
'SPARE',
|
|
dev,
|
|
apf_df,
|
|
m12dr_df,
|
|
hub_df,
|
|
sorter_hub_df,
|
|
sio_df,
|
|
ib16_df,
|
|
ob16e_df,
|
|
ib16s_df
|
|
)
|
|
result_df.loc[idx, 'IO_PATH'] = spare_path
|
|
|
|
# Log names in DESC but not in NETWORK (using normalized names for comparison)
|
|
desc_names_norm = set(desc_df['NORMALIZED_TAGNAME'].unique())
|
|
network_names_norm = set(unique_network_df['NORMALIZED_NAME'].unique())
|
|
dpm_names = set(network_df['DPM'].unique())
|
|
|
|
missing_in_network_norm = desc_names_norm - network_names_norm
|
|
if missing_in_network_norm:
|
|
# Get the original tagnames that are missing to report them
|
|
missing_desc_rows = desc_df[desc_df['NORMALIZED_TAGNAME'].isin(missing_in_network_norm)]
|
|
original_missing_tagnames = set(missing_desc_rows['TAGNAME'].unique())
|
|
|
|
# Filter out hub modules (FIOH and FIO1H1, FIO2H2 patterns) for logging
|
|
missing_in_network_filtered = {name for name in original_missing_tagnames if not _is_hub_module(name)}
|
|
if missing_in_network_filtered:
|
|
print(f"\nFound {len(missing_in_network_filtered)} TAGNAMEs present in DESC but not in NETWORK_PLC.")
|
|
for name in sorted(list(missing_in_network_filtered)):
|
|
validation_errors.append(f"TAGNAME '{name}' from DESC sheet not found in NETWORK_PLC sheet.")
|
|
|
|
# Add names from NETWORK_PLC that are missing in DESC
|
|
missing_in_desc_norm = network_names_norm - desc_names_norm
|
|
if missing_in_desc_norm:
|
|
# Get the original rows from network_df to add, ensuring uniqueness
|
|
rows_to_add = unique_network_df[unique_network_df['NORMALIZED_NAME'].isin(missing_in_desc_norm)]
|
|
|
|
new_rows = []
|
|
for _, row in rows_to_add.iterrows():
|
|
new_rows.append({
|
|
'TAGNAME': row['Name'], # Use original name
|
|
'TERM': '', 'DESCA': '', 'DESCB': '',
|
|
'IP': row['IP'], 'PARTNUMBER': row['PartNumber'], 'DESC': '',
|
|
'DPM': row.get('DPM', '') # Include DPM relationship
|
|
})
|
|
|
|
if new_rows:
|
|
new_rows_df = pd.DataFrame(new_rows)
|
|
result_df = pd.concat([result_df, new_rows_df], ignore_index=True)
|
|
|
|
# Get all existing tagnames (including those just added)
|
|
all_existing_names = set(result_df['TAGNAME'].unique())
|
|
|
|
# Add unique DPM names that are not already in the result
|
|
missing_dpm_names = dpm_names - all_existing_names
|
|
|
|
if missing_dpm_names:
|
|
print(f"\nAdding unique DPM names not present in DESC or NETWORK_PLC.Name: {sorted(missing_dpm_names)}")
|
|
# Create rows for missing DPM names
|
|
dpm_rows = []
|
|
for dpm_name in missing_dpm_names:
|
|
dpm_ip = dpm_mapping[dpm_name]
|
|
dpm_rows.append({
|
|
'TAGNAME': dpm_name,
|
|
'TERM': '',
|
|
'DESCA': '',
|
|
'DESCB': '',
|
|
'IP': dpm_ip,
|
|
'PARTNUMBER': '', # DPM entries don't have part numbers
|
|
'DESC': '',
|
|
'DPM': dpm_name # DPM devices reference themselves
|
|
})
|
|
|
|
# Append DPM rows
|
|
dpm_rows_df = pd.DataFrame(dpm_rows)
|
|
result_df = pd.concat([result_df, dpm_rows_df], ignore_index=True)
|
|
|
|
# Apply part number assignment to newly added DPM entries
|
|
mask = result_df['TAGNAME'].isin(missing_dpm_names)
|
|
result_df.loc[mask, 'PARTNUMBER'] = result_df.loc[mask].apply(assign_partnumber, axis=1)
|
|
|
|
# Check for validation errors and terminate if any are found
|
|
if validation_errors:
|
|
print("\n" + "="*80)
|
|
print("WARNING: The following issues were found but processing will continue:")
|
|
print("="*80)
|
|
for error in validation_errors:
|
|
print(f"- {error}")
|
|
print("="*80)
|
|
print("\nContinuing with processing...")
|
|
|
|
# Normalize TAGNAME, DESC, and IO_PATH in the final result before saving (only for VFDs)
|
|
print("\nNormalizing TAGNAME, DESC, and IO_PATH columns for VFDs only in the final output...")
|
|
result_df['TAGNAME'] = result_df['TAGNAME'].apply(normalize_vfd_name)
|
|
result_df['DESC'] = result_df['DESC'].apply(normalize_vfd_name)
|
|
result_df['IO_PATH'] = result_df['IO_PATH'].apply(normalize_io_path)
|
|
|
|
# Drop the temporary normalized column before sorting
|
|
if 'NORMALIZED_TAGNAME' in result_df.columns:
|
|
result_df = result_df.drop(columns=['NORMALIZED_TAGNAME'])
|
|
|
|
# Convert TAGNAME to string to prevent sorting errors with mixed types
|
|
result_df['TAGNAME'] = result_df['TAGNAME'].astype(str)
|
|
|
|
# Sort by PARTNUMBER, TAGNAME, and then TERM for better organization
|
|
# Fill NaN values to ensure consistent sorting
|
|
result_df['PARTNUMBER'] = result_df['PARTNUMBER'].fillna('')
|
|
result_df['TERM'] = result_df['TERM'].fillna('')
|
|
result_df = result_df.sort_values(by=['PARTNUMBER', 'TAGNAME', 'TERM']).reset_index(drop=True)
|
|
|
|
# Remove exact duplicate rows before exporting to Excel/CSV
|
|
before_dedup = len(result_df)
|
|
result_df = result_df.drop_duplicates()
|
|
after_dedup = len(result_df)
|
|
if before_dedup != after_dedup:
|
|
print(f"\nRemoved {before_dedup - after_dedup} duplicate rows (exact matches) before saving DESC_IP sheet.")
|
|
|
|
print(f"\nFinal result has {len(result_df)} rows")
|
|
print("Sample of merged data:")
|
|
print(result_df.head(10))
|
|
|
|
# Create new Excel file name with full project prefix (e.g. MTN6_MCM04)
|
|
project_match = re.search(r"([A-Z0-9]+_MCM\d+)", str(original_file), re.IGNORECASE)
|
|
if project_match:
|
|
subsystem = project_match.group(1).upper()
|
|
else:
|
|
# Fallback to MCM-only pattern for backward compatibility
|
|
mcm_match = re.search(r"(MCM\d+)", str(original_file), re.IGNORECASE)
|
|
subsystem = mcm_match.group(1).upper() if mcm_match else "MCM"
|
|
|
|
new_file = f"{subsystem}_DESC_IP_MERGED.xlsx"
|
|
|
|
try:
|
|
# Copy all original sheets and add the new one
|
|
with pd.ExcelWriter(new_file, engine='openpyxl') as writer:
|
|
# Copy original sheets
|
|
xl_original = pd.ExcelFile(original_file)
|
|
for sheet_name in xl_original.sheet_names:
|
|
df = pd.read_excel(xl_original, sheet_name=sheet_name)
|
|
df.to_excel(writer, sheet_name=sheet_name, index=False)
|
|
|
|
# Add the new merged sheet
|
|
result_df.to_excel(writer, sheet_name='DESC_IP', index=False)
|
|
|
|
print(f"\nNew Excel file created: {new_file}")
|
|
print("The file contains all original sheets plus the new 'DESC_IP' sheet with merged data.")
|
|
|
|
except Exception as e:
|
|
# If that fails, just save the merged data alone
|
|
result_df.to_excel(new_file, sheet_name='DESC_IP', index=False)
|
|
print(f"\nNew Excel file created with merged data only: {new_file}") |