2025-09-02 11:13:29 +04:00

302 lines
14 KiB
Python

import pandas as pd
import os
import sys
import re
from classifiers import classify_signal, get_device_type
from utils import normalize_vfd_name, normalize_io_path, assign_partnumber, combine_desc
from io_paths import get_io_path
def _is_hub_module(tagname):
"""Check if a TAGNAME represents a hub module (FIO1H1, FIO2H3, etc.)"""
import re
tagname_str = str(tagname).upper()
# Match patterns like FIO1H1, FIO2H3, FIOH1, etc.
hub_pattern = re.compile(r'FIO(\d+)?H\d+', re.IGNORECASE)
return bool(hub_pattern.search(tagname_str))
def process_data(desc_df, network_df, original_file, apf_df, m12dr_df, hub_df, sorter_hub_df, sio_df, ib16_df, ob16e_df, ib16s_df):
validation_errors = []
# Ignore rows from DESC sheet with an empty or NaN TAGNAME
original_desc_rows = len(desc_df)
desc_df = desc_df.dropna(subset=['TAGNAME'])
desc_df = desc_df[desc_df['TAGNAME'].astype(str).str.strip() != '']
rows_dropped = original_desc_rows - len(desc_df)
if rows_dropped > 0:
print(f"\nNOTE: Ignored {rows_dropped} rows from DESC sheet with an empty TAGNAME.")
desc_df['NORMALIZED_TAGNAME'] = desc_df['TAGNAME'].apply(normalize_vfd_name)
network_df['NORMALIZED_NAME'] = network_df['Name'].apply(normalize_vfd_name)
if network_df['NORMALIZED_NAME'].duplicated().any():
print("\nWARNING: The following names in the 'NETWORK_PLC' sheet become duplicates after normalization (e.g., 'DEV01' and 'DEV1'). Using the first occurrence for IP and PartNumber mapping.")
duplicated_names = network_df[network_df['NORMALIZED_NAME'].duplicated(keep=False)].sort_values('NORMALIZED_NAME')
print(duplicated_names[['Name', 'NORMALIZED_NAME']].to_string(index=False))
unique_network_df = network_df.drop_duplicates(subset=['NORMALIZED_NAME'], keep='first')
network_mapping = dict(zip(unique_network_df['NORMALIZED_NAME'], unique_network_df['IP']))
part_number_mapping = dict(zip(unique_network_df['NORMALIZED_NAME'], unique_network_df['PartNumber']))
# Create a mapping from DPM -> DPM_IP for unique DPM entries
dpm_mapping = dict(zip(network_df['DPM'], network_df['DPM_IP']))
# Start with DESC data and add IP and PartNumber columns
result_df = desc_df.copy()
# Fill empty DESCA with "SPARE"
result_df['DESCA'] = result_df['DESCA'].fillna('SPARE')
result_df.loc[result_df['DESCA'] == '', 'DESCA'] = 'SPARE'
# Map IP and PARTNUMBER, but skip hub modules (they connect via IO-Link to parent masters)
def get_ip_for_device(normalized_tagname, tagname):
# Hub modules don't have their own IP addresses
if _is_hub_module(tagname):
return ''
return network_mapping.get(normalized_tagname, '')
def get_partnumber_for_device(normalized_tagname, tagname):
# Hub modules don't get part numbers from network sheet
if _is_hub_module(tagname):
return ''
return part_number_mapping.get(normalized_tagname, '')
result_df['IP'] = result_df.apply(lambda row: get_ip_for_device(row['NORMALIZED_TAGNAME'], row['TAGNAME']), axis=1)
result_df['PARTNUMBER'] = result_df.apply(lambda row: get_partnumber_for_device(row['NORMALIZED_TAGNAME'], row['TAGNAME']), axis=1)
# Add DPM column mapping for FIOM-DPM relationships (skip hub modules)
def get_dpm_for_device(normalized_tagname, tagname):
# Hub modules don't have DPM relationships
if _is_hub_module(tagname):
return ''
return dpm_to_devices_mapping.get(normalized_tagname, '')
dpm_to_devices_mapping = dict(zip(network_df['NORMALIZED_NAME'], network_df['DPM']))
result_df['DPM'] = result_df.apply(lambda row: get_dpm_for_device(row['NORMALIZED_TAGNAME'], row['TAGNAME']), axis=1)
result_df['PARTNUMBER'] = result_df.apply(assign_partnumber, axis=1)
result_df['DESC'] = result_df.apply(combine_desc, axis=1)
# Add signal classification and IO path columns
print("\nClassifying signals and adding IO paths...")
# Add signal classification column
result_df['SIGNAL'] = result_df.apply(lambda row: classify_signal(row['DESCA'], row['TAGNAME'], row['DESCB'], original_file), axis=1)
# Add device type column
result_df['DEVICE_TYPE'] = result_df['TAGNAME'].apply(get_device_type)
# Add IO path column
result_df['IO_PATH'] = result_df.apply(
lambda row: get_io_path(
row['TAGNAME'],
row['TERM'],
row['SIGNAL'],
row['DEVICE_TYPE'],
apf_df,
m12dr_df,
hub_df,
sorter_hub_df,
sio_df,
ib16_df,
ob16e_df,
ib16s_df
), axis=1
)
# Print statistics about signal classification
signal_counts = result_df['SIGNAL'].value_counts()
print(f"\nSignal classification results:")
for signal_type, count in signal_counts.items():
print(f" {signal_type}: {count}")
# Show TAGNAMEs classified as UNKNOWN
unknown_entries = result_df[result_df['SIGNAL'] == 'UNKNOWN']
if len(unknown_entries) > 0:
print(f"\nFound {len(unknown_entries)} entries with UNKNOWN signal classification.")
for _, row in unknown_entries.iterrows():
desca = row['DESCA'] if pd.notna(row['DESCA']) and row['DESCA'] != '' else 'N/A'
validation_errors.append(f"Signal UNKNOWN for TAGNAME: {row['TAGNAME']} (DESCA: '{desca}')")
device_counts = result_df['DEVICE_TYPE'].value_counts()
print(f"\nDevice type distribution:")
for device_type, count in device_counts.items():
print(f" {device_type}: {count}")
# Count successful IO path mappings
successful_mappings = result_df['IO_PATH'].notna().sum()
total_rows = len(result_df)
print(f"\nIO Path mapping results:")
print(f" Successful mappings: {successful_mappings}/{total_rows} ({successful_mappings/total_rows*100:.1f}%)")
# Log missing mappings for debugging
missing_mappings = result_df[result_df['IO_PATH'].isna() & result_df['TERM'].notna()]
if len(missing_mappings) > 0:
print(f"\nFound {len(missing_mappings)} entries with missing IO path mappings.")
for _, row in missing_mappings.iterrows():
tag, term, sig, dev = row['TAGNAME'], row['TERM'], row['SIGNAL'], row['DEVICE_TYPE']
desca = row['DESCA'] if pd.notna(row['DESCA']) and row['DESCA'] != '' else 'N/A'
error_msg = f"On device '{tag}' ({dev}), cannot connect signal '{sig}' to channel number '{term}'. (DESCA: '{desca}')"
validation_errors.append(error_msg)
# Convert failed mappings to SPARE
idx = result_df[(result_df['TAGNAME'] == tag) & (result_df['TERM'] == term)].index
if len(idx) > 0:
result_df.loc[idx, 'DESCA'] = 'SPARE'
result_df.loc[idx, 'SIGNAL'] = 'SPARE'
# Try to get SPARE path
spare_path = get_io_path(
tag,
term,
'SPARE',
dev,
apf_df,
m12dr_df,
hub_df,
sorter_hub_df,
sio_df,
ib16_df,
ob16e_df,
ib16s_df
)
result_df.loc[idx, 'IO_PATH'] = spare_path
# Log names in DESC but not in NETWORK (using normalized names for comparison)
desc_names_norm = set(desc_df['NORMALIZED_TAGNAME'].unique())
network_names_norm = set(unique_network_df['NORMALIZED_NAME'].unique())
dpm_names = set(network_df['DPM'].unique())
missing_in_network_norm = desc_names_norm - network_names_norm
if missing_in_network_norm:
# Get the original tagnames that are missing to report them
missing_desc_rows = desc_df[desc_df['NORMALIZED_TAGNAME'].isin(missing_in_network_norm)]
original_missing_tagnames = set(missing_desc_rows['TAGNAME'].unique())
# Filter out hub modules (FIOH and FIO1H1, FIO2H2 patterns) for logging
missing_in_network_filtered = {name for name in original_missing_tagnames if not _is_hub_module(name)}
if missing_in_network_filtered:
print(f"\nFound {len(missing_in_network_filtered)} TAGNAMEs present in DESC but not in NETWORK_PLC.")
for name in sorted(list(missing_in_network_filtered)):
validation_errors.append(f"TAGNAME '{name}' from DESC sheet not found in NETWORK_PLC sheet.")
# Add names from NETWORK_PLC that are missing in DESC
missing_in_desc_norm = network_names_norm - desc_names_norm
if missing_in_desc_norm:
# Get the original rows from network_df to add, ensuring uniqueness
rows_to_add = unique_network_df[unique_network_df['NORMALIZED_NAME'].isin(missing_in_desc_norm)]
new_rows = []
for _, row in rows_to_add.iterrows():
new_rows.append({
'TAGNAME': row['Name'], # Use original name
'TERM': '', 'DESCA': '', 'DESCB': '',
'IP': row['IP'], 'PARTNUMBER': row['PartNumber'], 'DESC': '',
'DPM': row.get('DPM', '') # Include DPM relationship
})
if new_rows:
new_rows_df = pd.DataFrame(new_rows)
result_df = pd.concat([result_df, new_rows_df], ignore_index=True)
# Get all existing tagnames (including those just added)
all_existing_names = set(result_df['TAGNAME'].unique())
# Add unique DPM names that are not already in the result
missing_dpm_names = dpm_names - all_existing_names
if missing_dpm_names:
print(f"\nAdding unique DPM names not present in DESC or NETWORK_PLC.Name: {sorted(missing_dpm_names)}")
# Create rows for missing DPM names
dpm_rows = []
for dpm_name in missing_dpm_names:
dpm_ip = dpm_mapping[dpm_name]
dpm_rows.append({
'TAGNAME': dpm_name,
'TERM': '',
'DESCA': '',
'DESCB': '',
'IP': dpm_ip,
'PARTNUMBER': '', # DPM entries don't have part numbers
'DESC': '',
'DPM': dpm_name # DPM devices reference themselves
})
# Append DPM rows
dpm_rows_df = pd.DataFrame(dpm_rows)
result_df = pd.concat([result_df, dpm_rows_df], ignore_index=True)
# Apply part number assignment to newly added DPM entries
mask = result_df['TAGNAME'].isin(missing_dpm_names)
result_df.loc[mask, 'PARTNUMBER'] = result_df.loc[mask].apply(assign_partnumber, axis=1)
# Check for validation errors and terminate if any are found
if validation_errors:
print("\n" + "="*80)
print("WARNING: The following issues were found but processing will continue:")
print("="*80)
for error in validation_errors:
print(f"- {error}")
print("="*80)
print("\nContinuing with processing...")
# Normalize TAGNAME, DESC, and IO_PATH in the final result before saving (only for VFDs)
print("\nNormalizing TAGNAME, DESC, and IO_PATH columns for VFDs only in the final output...")
result_df['TAGNAME'] = result_df['TAGNAME'].apply(normalize_vfd_name)
result_df['DESC'] = result_df['DESC'].apply(normalize_vfd_name)
result_df['IO_PATH'] = result_df['IO_PATH'].apply(normalize_io_path)
# Drop the temporary normalized column before sorting
if 'NORMALIZED_TAGNAME' in result_df.columns:
result_df = result_df.drop(columns=['NORMALIZED_TAGNAME'])
# Convert TAGNAME to string to prevent sorting errors with mixed types
result_df['TAGNAME'] = result_df['TAGNAME'].astype(str)
# Sort by PARTNUMBER, TAGNAME, and then TERM for better organization
# Fill NaN values to ensure consistent sorting
result_df['PARTNUMBER'] = result_df['PARTNUMBER'].fillna('')
result_df['TERM'] = result_df['TERM'].fillna('')
result_df = result_df.sort_values(by=['PARTNUMBER', 'TAGNAME', 'TERM']).reset_index(drop=True)
# Remove exact duplicate rows before exporting to Excel/CSV
before_dedup = len(result_df)
result_df = result_df.drop_duplicates()
after_dedup = len(result_df)
if before_dedup != after_dedup:
print(f"\nRemoved {before_dedup - after_dedup} duplicate rows (exact matches) before saving DESC_IP sheet.")
print(f"\nFinal result has {len(result_df)} rows")
print("Sample of merged data:")
print(result_df.head(10))
# Create new Excel file name with full project prefix (e.g. MTN6_MCM04)
project_match = re.search(r"([A-Z0-9]+_MCM\d+)", str(original_file), re.IGNORECASE)
if project_match:
subsystem = project_match.group(1).upper()
else:
# Fallback to MCM-only pattern for backward compatibility
mcm_match = re.search(r"(MCM\d+)", str(original_file), re.IGNORECASE)
subsystem = mcm_match.group(1).upper() if mcm_match else "MCM"
new_file = f"{subsystem}_DESC_IP_MERGED.xlsx"
try:
# Copy all original sheets and add the new one
with pd.ExcelWriter(new_file, engine='openpyxl') as writer:
# Copy original sheets
xl_original = pd.ExcelFile(original_file)
for sheet_name in xl_original.sheet_names:
df = pd.read_excel(xl_original, sheet_name=sheet_name)
df.to_excel(writer, sheet_name=sheet_name, index=False)
# Add the new merged sheet
result_df.to_excel(writer, sheet_name='DESC_IP', index=False)
print(f"\nNew Excel file created: {new_file}")
print("The file contains all original sheets plus the new 'DESC_IP' sheet with merged data.")
except Exception as e:
# If that fails, just save the merged data alone
result_df.to_excel(new_file, sheet_name='DESC_IP', index=False)
print(f"\nNew Excel file created with merged data only: {new_file}")