2025-09-17 11:20:20 +04:00

201 lines
9.1 KiB
Python

import pandas as pd
import re
import sys
def post_process_io_data(input_file, output_file):
# Read the merged Excel file
print(f"Reading input file: {input_file}")
try:
df = pd.read_excel(input_file, sheet_name='DESC_IP')
except ValueError as e:
print(f"ERROR: Could not find 'DESC_IP' sheet in {input_file}")
sys.exit(1)
except Exception as e:
print(f"ERROR: Failed to read {input_file}: {str(e)}")
sys.exit(1)
# Store original stats for validation
original_total = len(df)
no_desca_mask = df['DESCA'].fillna('').str.strip().eq('')
count_no_desca = no_desca_mask.sum()
# Exclude rows containing ZMX, EXTENDO, or DPM *only* when they appear in DESCB
exclusion_pattern = r'ZMX|EXTENDO|DPM'
exclusion_mask = df['DESCB'].astype(str).str.contains(exclusion_pattern, case=False, na=False)
excluded_count = exclusion_mask.sum()
if excluded_count:
print(f"Removed {excluded_count} rows containing ZMXE/EXTENDO/DPM")
df = df[~exclusion_mask]
# Reset index to ensure proper sequential processing
df = df.reset_index(drop=True)
# ------------------------------------------------------------------
# Remove SPARE rows that directly follow a FIO[number]H row
# ------------------------------------------------------------------
rows_to_remove_spare_after_fioh = []
# Match FIO followed by any number and then H (e.g., FIO1H, FIO2H, etc.)
fioh_mask_initial = df['DESCA'].astype(str).str.contains(r'FIO\d+H', case=False, na=False)
for idx in df[fioh_mask_initial].index:
if idx + 1 < len(df):
next_desc = str(df.at[idx + 1, 'DESC']).strip().upper()
if next_desc == 'SPARE':
rows_to_remove_spare_after_fioh.append(idx + 1)
if rows_to_remove_spare_after_fioh:
print(f"Removed {len(rows_to_remove_spare_after_fioh)} SPARE rows following FIO[number]H entries")
df = df.drop(rows_to_remove_spare_after_fioh).reset_index(drop=True)
# ------------------------------------------------------------------
# Remove SPARE rows for M12DR devices containing VS
# ------------------------------------------------------------------
m12dr_vs_spare_mask = (
df['TAGNAME'].astype(str).str.contains(r'M12DR', case=False, na=False) &
df['TAGNAME'].astype(str).str.contains(r'VS', case=False, na=False) &
((df['DESCA'].astype(str).str.strip().str.upper() == 'SPARE') |
(df['DESC'].astype(str).str.strip().str.upper() == 'SPARE'))
)
removed_m12dr_vs_spare_count = m12dr_vs_spare_mask.sum()
if removed_m12dr_vs_spare_count:
print(f"Removed {removed_m12dr_vs_spare_count} SPARE rows for M12DR devices containing VS")
df = df[~m12dr_vs_spare_mask].reset_index(drop=True)
# ------------------------------------------------------------------
# Remove SPARE rows that directly follow IOLink entries with SOL in DESCA
# ------------------------------------------------------------------
rows_to_remove_spare_after_iolink_sol = []
for idx in range(len(df) - 1):
try:
current_signal = str(df.at[idx, 'SIGNAL']).strip()
current_iopath = str(df.at[idx, 'IO_PATH']).strip()
current_desca = str(df.at[idx, 'DESCA']).strip().upper()
next_desc = str(df.at[idx + 1, 'DESC']).strip().upper()
# Check both SIGNAL column and IO_PATH column for IOLink
is_iolink = (current_signal == 'IOLink' or 'IOLink' in current_iopath)
if (is_iolink and
'SOL' in current_desca and
next_desc == 'SPARE'):
rows_to_remove_spare_after_iolink_sol.append(idx + 1)
except Exception as e:
print(f"Debug: Error at idx {idx}: {e}")
continue
if rows_to_remove_spare_after_iolink_sol:
print(f"Removed {len(rows_to_remove_spare_after_iolink_sol)} SPARE rows following IOLink SOL entries")
df = df.drop(rows_to_remove_spare_after_iolink_sol).reset_index(drop=True)
# Remove ALL IOLink rows and generate beacon light entries first
iolink_mask = df['SIGNAL'] == 'IOLink'
original_iolink_count = iolink_mask.sum()
# Identify beacon light rows among the IOLink rows (for expansion before removal)
# Look for BCN in DESCA field for IOLink beacons
beacon_mask = iolink_mask & df['DESCA'].str.contains(r'BCN', case=False, na=False)
# For now, treat all IOLink BCN entries as 3-stack beacons (can be refined later)
beacon3_mask = beacon_mask
beacon2_mask = iolink_mask & (df['DESCA'].str.contains('never_match', case=False, na=False)) # Empty mask for 2-stack
beacon3_rows = df[beacon3_mask].copy()
beacon2_rows = df[beacon2_mask].copy()
# Identify solenoid IOLINK rows (for expansion before removal)
solenoid_mask = iolink_mask & df['DESCA'].str.contains(r'SOL', case=False, na=False)
solenoid_rows = df[solenoid_mask].copy()
# Remove ALL IOLink rows (not just beacon ones)
removed_iolink_count = iolink_mask.sum()
df = df[~iolink_mask]
if removed_iolink_count:
print(f"Removed {removed_iolink_count} IOLink rows (including beacons, FIOH channels, and all other IOLink entries)")
# For each beacon light row create three new rows (GREEN, AMBER, BLUE)
new_rows = []
colors = ['GREEN', 'AMBER', 'BLUE']
for _, row in beacon3_rows.iterrows():
base_identifier = str(row['DESCA']).strip()
base_path = f"{base_identifier}:O.ProcessDataOut"
desc_prefix = base_identifier
for idx, color in enumerate(colors, start=1):
new_row = row.copy()
new_row['IO_PATH'] = f"{base_path}.Segment_{idx}_Animation_Type.0"
new_row['DESC'] = f"{desc_prefix} {color} SEGMENT"
new_row['SIGNAL'] = ''
new_rows.append(new_row)
# Handle 2-stack beacon rows
colors2 = ['GREEN', 'BLUE']
for _, row in beacon2_rows.iterrows():
base_identifier = str(row['DESCA']).strip()
base_path = f"{base_identifier}:O.ProcessDataOut"
desc_prefix = base_identifier
for idx, color in enumerate(colors2, start=1):
new_row = row.copy()
new_row['IO_PATH'] = f"{base_path}.Segment_{idx}_Animation_Type.0"
new_row['DESC'] = f"{desc_prefix} {color} SEGMENT"
new_row['SIGNAL'] = ''
new_rows.append(new_row)
if new_rows:
print(f"Added {len(new_rows)} rows for beacon lights")
df = pd.concat([df, pd.DataFrame(new_rows)], ignore_index=True)
# Generate solenoid valve entries for each SOL IOLINK
sol_new_rows = []
for _, row in solenoid_rows.iterrows():
base_identifier = str(row['DESCA']).strip()
base_path = f"{base_identifier}:O.ProcessDataOut"
# Generate valves with only solenoid_14, mapping valve numbers to module numbers and directions
# Valve 1 -> Module 1 LEFT, Valve 2 -> Module 1 RIGHT, Valve 3 -> Module 2 LEFT, etc.
for valve_num in range(1, 9): # Generate 8 valves
module_num = (valve_num + 1) // 2 # Module 1 for valves 1&2, Module 2 for valves 3&4, etc.
direction = "LEFT" if valve_num % 2 == 1 else "RIGHT" # Odd valves = LEFT, Even valves = RIGHT
new_row = row.copy()
new_row['IO_PATH'] = f"{base_path}.Valve_{valve_num}_solenoid_14"
new_row['DESC'] = f"{base_identifier} DIVERT MODULE {module_num} {direction}"
new_row['SIGNAL'] = 'O'
sol_new_rows.append(new_row)
if sol_new_rows:
print(f"Added {len(sol_new_rows)} rows for solenoid valves")
df = pd.concat([df, pd.DataFrame(sol_new_rows)], ignore_index=True)
# Add subsystem to all rows - extract only the MCM part, not the project prefix
mcm_match = re.search(r'(MCM\d+)', input_file, re.IGNORECASE)
if mcm_match:
subsystem = mcm_match.group(1).upper()
else:
subsystem = 'UNKNOWN'
df.insert(0, 'Subsystem', subsystem)
# Rename columns
df = df.rename(columns={
'IO_PATH': 'Name',
'DESC': 'Description'
})
# --------------------------------------------------------------
# Remove residual IOLink rows (Name contains 'IOLink')
# --------------------------------------------------------------
iolink_path_mask = df['Name'].str.contains(r'IOLink', case=False, na=False)
removed_iolink_count = iolink_path_mask.sum()
if removed_iolink_count:
print(f"Removed {removed_iolink_count} residual IOLink rows")
df = df[~iolink_path_mask].reset_index(drop=True)
# Remove rows where both name and description are empty or NaN
before_blank = len(df)
blank_mask = df['Name'].fillna('').str.strip().eq('') & df['Description'].fillna('').str.strip().eq('')
df = df[~blank_mask]
removed_blank = before_blank - len(df)
if removed_blank:
print(f"Removed {removed_blank} blank name/description rows")
# Save to CSV with headers
print(f"Saving output file: {output_file}")
final_df = df[['Name', 'Description', 'Subsystem']]
final_df.to_csv(output_file, index=False, header=True)
print("Processing complete!")