Scripts/AutoCAD/Compare/check_io_vs_devices.py
2025-12-09 15:50:16 +04:00

327 lines
9.9 KiB
Python

import argparse
import re
import sys
from pathlib import Path
import pandas as pd
# ==== CONFIG YOU CAN TWEAK IF NEEDED ====
# IO rows matching these regex patterns will be ignored
SKIP_PATTERNS = [
r"^PDP.*_CB", # PDP*_CB* pattern
r"^PDP.*_FWM", # PDP*_FWM* pattern
r"^PDP.*_PWM", # PDP*_PWM* pattern
r".*_.*_VSU$", # *_*_VSU pattern
]
def canonical(tag: str) -> str:
"""
Apply all agreed normalization rules to a device/assigned tag.
"""
if not isinstance(tag, str):
return None
t = tag.strip()
if not t:
return None
# 1) EPC: keep up to ..._EPC<number>
# Examples:
# NCP1_3_EPC1_1 -> NCP1_3_EPC1
# NCP1_3_EPC2_99 -> NCP1_3_EPC2
m = re.search(r"(.*_EPC\d+)_.*", t)
if m:
t = m.group(1)
# 2) VFD variants -> base _VFD
# UL23_25_VFD_DISC -> UL23_25_VFD
# S02_1_VFD_STO1 -> S02_1_VFD
t = re.sub(r"_VFD_DISC$", "_VFD", t)
t = re.sub(r"_VFD_DSIC$", "_VFD", t)
t = re.sub(r"_VFD_STO\d*$", "_VFD", t)
# 3) SS buttons: SPB_LT / SPB / STPB -> base SS
# NCS2_12_SS2_SPB_LT -> NCS2_12_SS2
# NCR1_2_SS1_SPB -> NCR1_2_SS1
# NCR1_2_SS1_STPB -> NCR1_2_SS1
t = re.sub(r"_SPB_LT$", "", t)
t = re.sub(r"_STPB$", "", t)
t = re.sub(r"_SPB$", "", t)
# 4) Generic PB: PB_LT / PB -> base
# NCP1_3_JR1_PB -> NCP1_3_JR1
# NCP1_3_JR1_PB_LT -> NCP1_3_JR1
t = re.sub(r"_PB_LT$", "", t)
t = re.sub(r"_PB$", "", t)
# 5) STO suffix on SR tags -> base SR
# _SR1_STO1 -> _SR1
t = re.sub(r"(_SR\d+)_STO\d+$", r"\1", t)
# 6) R/S endings -> base
# S02_2_LRPE6_R -> S02_2_LRPE6
# S02_2_LRPE6_S -> S02_2_LRPE6
t = re.sub(r"_(R|S)$", "", t)
# 7) EX_ESTOP variants -> base EX
# *_*_EX_ESTOP* -> *_*_EX
t = re.sub(r"_EX_ESTOP.*$", "_EX", t)
return t or None
def load_io_series(io_path: Path, io_column: str | None) -> pd.Series:
"""
Load IO Excel and return a cleaned Series of Assigned Device strings.
If io_column is None, uses the first column.
"""
df = pd.read_excel(io_path)
if io_column:
if io_column not in df.columns:
raise SystemExit(
f"ERROR: IO column '{io_column}' not found. Available: {list(df.columns)}"
)
series = df[io_column]
else:
# Use first column
first_col = df.columns[0]
series = df[first_col]
series = series.astype(str).str.strip()
# Drop SPARE rows
mask_spare = series.str.contains("SPARE", case=False, na=False)
# Drop rows matching SKIP_PATTERNS (e.g. PDP*_CB*, PDP*_FWM*)
mask_skip_pattern = False
for pattern in SKIP_PATTERNS:
mask_skip_pattern = mask_skip_pattern | series.str.match(pattern, na=False)
filtered = series[~(mask_spare | mask_skip_pattern)]
return filtered
def load_device_tags(dev_path: Path, dev_column: str | None) -> pd.Series:
"""
Load device Excel and return Series of device tags.
Default column is 'P_TAG1' if exists, else first column.
Filters out VSU devices (matching *_*_VSU pattern).
"""
df = pd.read_excel(dev_path)
if dev_column:
if dev_column not in df.columns:
raise SystemExit(
f"ERROR: Device column '{dev_column}' not found. Available: {list(df.columns)}"
)
series = df[dev_column]
else:
if "P_TAG1" in df.columns:
series = df["P_TAG1"]
else:
first_col = df.columns[0]
series = df[first_col]
series = series.astype(str).str.strip()
# Filter out VSU devices (matching *_*_VSU pattern)
mask_vsu = series.str.match(r".*_.*_VSU$", na=False)
filtered = series[~mask_vsu]
return filtered
def build_io_map(assigned_series: pd.Series) -> dict[str, set[str]]:
"""
Build mapping: canonical_tag -> set of raw IO assigned device strings.
"""
io_canon = assigned_series.map(canonical)
io_map: dict[str, set[str]] = {}
for raw, c in zip(assigned_series, io_canon):
if not c:
continue
io_map.setdefault(c, set()).add(raw)
return io_map
def detect_duplicate_assignments(assigned_series: pd.Series) -> pd.DataFrame:
"""
Return dataframe of raw IO assigned device strings that appear more than once.
This keeps PB/PB_LT, EPC variants, etc. separate and only flags exact duplicates.
"""
cleaned = assigned_series.dropna().str.strip()
cleaned = cleaned[cleaned != ""]
counts = cleaned.value_counts()
duplicates = counts[counts > 1]
if duplicates.empty:
return pd.DataFrame(columns=["Assigned_Device", "Occurrences"])
rows = []
for tag_value, occ in duplicates.items():
rows.append({"Assigned_Device": tag_value, "Occurrences": occ})
return pd.DataFrame(rows)
def find_io_without_devices(io_map: dict[str, set[str]], dev_canon: pd.Series) -> pd.DataFrame:
"""
Return dataframe of IO canonical tags that do not exist in the device list.
"""
dev_canon_set = set(dev_canon.dropna())
rows = []
for canon_tag, raw_values in sorted(io_map.items()):
if canon_tag not in dev_canon_set:
rows.append(
{
"Canonical_Tag": canon_tag,
"IO_Assigned_Devices": ", ".join(sorted(raw_values)),
"Occurrences": len(raw_values),
}
)
return pd.DataFrame(rows)
def compare(io_path: Path,
dev_path: Path,
out_path: Path,
io_column: str | None = None,
dev_column: str | None = None) -> None:
"""
Main compare routine: IO vs Devices, export Excel, print summary.
"""
print(f"IO file: {io_path}")
print(f"Devices file: {dev_path}")
print(f"Output file: {out_path}")
print("Loading data...")
assigned_series = load_io_series(io_path, io_column)
dev_tags = load_device_tags(dev_path, dev_column)
print(f"IO rows after filters: {len(assigned_series)}")
print(f"Device tags: {len(dev_tags)}")
io_map = build_io_map(assigned_series)
duplicate_df = detect_duplicate_assignments(assigned_series)
dev_canon = dev_tags.map(canonical)
io_only_df = find_io_without_devices(io_map, dev_canon)
present_flags: list[str] = []
matching_assigned: list[str] = []
for d_tag, d_c in zip(dev_tags, dev_canon):
if d_c in io_map:
present_flags.append("YES")
matching_assigned.append(", ".join(sorted(io_map[d_c])))
else:
present_flags.append("NO")
matching_assigned.append("")
res_df = pd.DataFrame({
"Device_Tag": dev_tags,
"Canonical_Tag_Used_For_Check": dev_canon,
"Present_In_IO": present_flags,
"Matching_IO_Assigned_Devices": matching_assigned,
})
# Save result (main sheet + duplicates if any)
out_path = out_path.with_suffix(".xlsx")
with pd.ExcelWriter(out_path, engine="openpyxl") as writer:
res_df.to_excel(writer, index=False, sheet_name="Devices_vs_IO")
if not duplicate_df.empty:
duplicate_df.to_excel(
writer, index=False, sheet_name="Duplicate_IO_Assignments"
)
if not io_only_df.empty:
io_only_df.to_excel(
writer, index=False, sheet_name="IO_Only_Assignments"
)
print(f"\nResult saved to: {out_path}")
# Summary
missing_df = res_df[res_df["Present_In_IO"] == "NO"]
total_devices = len(res_df)
missing_count = len(missing_df)
found_count = total_devices - missing_count
print("\n===== SUMMARY =====")
print(f"Total devices: {total_devices}")
print(f"Found in IO: {found_count}")
print(f"Missing: {missing_count}")
print(f"IO-only tags: {len(io_only_df)}")
if missing_count > 0:
print("\nDevices NOT found in IO (after all rules):")
for tag in sorted(missing_df["Device_Tag"].tolist()):
print(" -", tag)
if duplicate_df.empty:
print("\nDuplicate IO assignments: none detected.")
else:
print("\nDuplicate IO assignments detected (same raw value repeated):")
for _, row in duplicate_df.sort_values("Assigned_Device").iterrows():
print(
f" - {row['Assigned_Device']} ({row['Occurrences']} occurrences)"
)
if io_only_df.empty:
print("\nIO-only assignments: none detected.")
else:
print("\nIO-only assignments (no matching device tag):")
for _, row in io_only_df.iterrows():
print(
f" - {row['Canonical_Tag']} ({row['Occurrences']} occurrences): "
f"{row['IO_Assigned_Devices']}"
)
def main(argv=None):
parser = argparse.ArgumentParser(
description="Check that all devices from device list are present in IO file, "
"using EPC/VFD/PB/SS/R-S rules."
)
parser.add_argument("io_file", help="Input/Output Excel file (IO)")
parser.add_argument("devices_file", help="Devices Excel file")
parser.add_argument("output_file", help="Output Excel file path (will be .xlsx)")
parser.add_argument(
"--io-column",
help="Column name in IO file to use as Assigned Device (default: first column)",
default=None,
)
parser.add_argument(
"--dev-column",
help="Column name in devices file (default: P_TAG1 if exists, else first column)",
default=None,
)
args = parser.parse_args(argv)
io_path = Path(args.io_file)
dev_path = Path(args.devices_file)
out_path = Path(args.output_file)
if not io_path.is_file():
raise SystemExit(f"ERROR: IO file not found: {io_path}")
if not dev_path.is_file():
raise SystemExit(f"ERROR: Devices file not found: {dev_path}")
compare(
io_path=io_path,
dev_path=dev_path,
out_path=out_path,
io_column=args.io_column,
dev_column=args.dev_column,
)
if __name__ == "__main__":
main()