315 lines
9.5 KiB
Python
315 lines
9.5 KiB
Python
import argparse
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
import pandas as pd
|
|
|
|
|
|
# ==== CONFIG YOU CAN TWEAK IF NEEDED ====
|
|
# IO rows matching these regex patterns will be ignored
|
|
SKIP_PATTERNS = [
|
|
r"^PDP.*_CB", # PDP*_CB* pattern
|
|
r"^PDP.*_FWM", # PDP*_FWM* pattern
|
|
r"^PDP.*_PWM", # PDP*_PWM* pattern
|
|
]
|
|
|
|
|
|
def canonical(tag: str) -> str:
|
|
"""
|
|
Apply all agreed normalization rules to a device/assigned tag.
|
|
"""
|
|
if not isinstance(tag, str):
|
|
return None
|
|
t = tag.strip()
|
|
if not t:
|
|
return None
|
|
|
|
# 1) EPC: keep up to ..._EPC<number>
|
|
# Examples:
|
|
# NCP1_3_EPC1_1 -> NCP1_3_EPC1
|
|
# NCP1_3_EPC2_99 -> NCP1_3_EPC2
|
|
m = re.search(r"(.*_EPC\d+)_.*", t)
|
|
if m:
|
|
t = m.group(1)
|
|
|
|
# 2) VFD variants -> base _VFD
|
|
# UL23_25_VFD_DISC -> UL23_25_VFD
|
|
# S02_1_VFD_STO1 -> S02_1_VFD
|
|
t = re.sub(r"_VFD_DISC$", "_VFD", t)
|
|
t = re.sub(r"_VFD_DSIC$", "_VFD", t)
|
|
t = re.sub(r"_VFD_STO\d*$", "_VFD", t)
|
|
|
|
# 3) SS buttons: SPB_LT / SPB / STPB -> base SS
|
|
# NCS2_12_SS2_SPB_LT -> NCS2_12_SS2
|
|
# NCR1_2_SS1_SPB -> NCR1_2_SS1
|
|
# NCR1_2_SS1_STPB -> NCR1_2_SS1
|
|
t = re.sub(r"_SPB_LT$", "", t)
|
|
t = re.sub(r"_STPB$", "", t)
|
|
t = re.sub(r"_SPB$", "", t)
|
|
|
|
# 4) Generic PB: PB_LT / PB -> base
|
|
# NCP1_3_JR1_PB -> NCP1_3_JR1
|
|
# NCP1_3_JR1_PB_LT -> NCP1_3_JR1
|
|
t = re.sub(r"_PB_LT$", "", t)
|
|
t = re.sub(r"_PB$", "", t)
|
|
|
|
# 5) STO suffix on SR tags -> base SR
|
|
# _SR1_STO1 -> _SR1
|
|
t = re.sub(r"(_SR\d+)_STO\d+$", r"\1", t)
|
|
|
|
# 6) R/S endings -> base
|
|
# S02_2_LRPE6_R -> S02_2_LRPE6
|
|
# S02_2_LRPE6_S -> S02_2_LRPE6
|
|
t = re.sub(r"_(R|S)$", "", t)
|
|
|
|
return t or None
|
|
|
|
|
|
def load_io_series(io_path: Path, io_column: str | None) -> pd.Series:
|
|
"""
|
|
Load IO Excel and return a cleaned Series of Assigned Device strings.
|
|
If io_column is None, uses the first column.
|
|
"""
|
|
df = pd.read_excel(io_path)
|
|
|
|
if io_column:
|
|
if io_column not in df.columns:
|
|
raise SystemExit(
|
|
f"ERROR: IO column '{io_column}' not found. Available: {list(df.columns)}"
|
|
)
|
|
series = df[io_column]
|
|
else:
|
|
# Use first column
|
|
first_col = df.columns[0]
|
|
series = df[first_col]
|
|
|
|
series = series.astype(str).str.strip()
|
|
|
|
# Drop SPARE rows
|
|
mask_spare = series.str.contains("SPARE", case=False, na=False)
|
|
|
|
# Drop rows matching SKIP_PATTERNS (e.g. PDP*_CB*, PDP*_FWM*)
|
|
mask_skip_pattern = False
|
|
for pattern in SKIP_PATTERNS:
|
|
mask_skip_pattern = mask_skip_pattern | series.str.match(pattern, na=False)
|
|
|
|
filtered = series[~(mask_spare | mask_skip_pattern)]
|
|
|
|
return filtered
|
|
|
|
|
|
def load_device_tags(dev_path: Path, dev_column: str | None) -> pd.Series:
|
|
"""
|
|
Load device Excel and return Series of device tags.
|
|
Default column is 'P_TAG1' if exists, else first column.
|
|
"""
|
|
df = pd.read_excel(dev_path)
|
|
|
|
if dev_column:
|
|
if dev_column not in df.columns:
|
|
raise SystemExit(
|
|
f"ERROR: Device column '{dev_column}' not found. Available: {list(df.columns)}"
|
|
)
|
|
series = df[dev_column]
|
|
else:
|
|
if "P_TAG1" in df.columns:
|
|
series = df["P_TAG1"]
|
|
else:
|
|
first_col = df.columns[0]
|
|
series = df[first_col]
|
|
|
|
return series.astype(str).str.strip()
|
|
|
|
|
|
def build_io_map(assigned_series: pd.Series) -> dict[str, set[str]]:
|
|
"""
|
|
Build mapping: canonical_tag -> set of raw IO assigned device strings.
|
|
"""
|
|
io_canon = assigned_series.map(canonical)
|
|
io_map: dict[str, set[str]] = {}
|
|
|
|
for raw, c in zip(assigned_series, io_canon):
|
|
if not c:
|
|
continue
|
|
io_map.setdefault(c, set()).add(raw)
|
|
|
|
return io_map
|
|
|
|
|
|
def detect_duplicate_assignments(assigned_series: pd.Series) -> pd.DataFrame:
|
|
"""
|
|
Return dataframe of raw IO assigned device strings that appear more than once.
|
|
This keeps PB/PB_LT, EPC variants, etc. separate and only flags exact duplicates.
|
|
"""
|
|
cleaned = assigned_series.dropna().str.strip()
|
|
cleaned = cleaned[cleaned != ""]
|
|
|
|
counts = cleaned.value_counts()
|
|
duplicates = counts[counts > 1]
|
|
if duplicates.empty:
|
|
return pd.DataFrame(columns=["Assigned_Device", "Occurrences"])
|
|
|
|
rows = []
|
|
for tag_value, occ in duplicates.items():
|
|
rows.append({"Assigned_Device": tag_value, "Occurrences": occ})
|
|
|
|
return pd.DataFrame(rows)
|
|
|
|
|
|
def find_io_without_devices(io_map: dict[str, set[str]], dev_canon: pd.Series) -> pd.DataFrame:
|
|
"""
|
|
Return dataframe of IO canonical tags that do not exist in the device list.
|
|
"""
|
|
dev_canon_set = set(dev_canon.dropna())
|
|
|
|
rows = []
|
|
for canon_tag, raw_values in sorted(io_map.items()):
|
|
if canon_tag not in dev_canon_set:
|
|
rows.append(
|
|
{
|
|
"Canonical_Tag": canon_tag,
|
|
"IO_Assigned_Devices": ", ".join(sorted(raw_values)),
|
|
"Occurrences": len(raw_values),
|
|
}
|
|
)
|
|
|
|
return pd.DataFrame(rows)
|
|
|
|
|
|
def compare(io_path: Path,
|
|
dev_path: Path,
|
|
out_path: Path,
|
|
io_column: str | None = None,
|
|
dev_column: str | None = None) -> None:
|
|
"""
|
|
Main compare routine: IO vs Devices, export Excel, print summary.
|
|
"""
|
|
print(f"IO file: {io_path}")
|
|
print(f"Devices file: {dev_path}")
|
|
print(f"Output file: {out_path}")
|
|
print("Loading data...")
|
|
|
|
assigned_series = load_io_series(io_path, io_column)
|
|
dev_tags = load_device_tags(dev_path, dev_column)
|
|
|
|
print(f"IO rows after filters: {len(assigned_series)}")
|
|
print(f"Device tags: {len(dev_tags)}")
|
|
|
|
io_map = build_io_map(assigned_series)
|
|
duplicate_df = detect_duplicate_assignments(assigned_series)
|
|
dev_canon = dev_tags.map(canonical)
|
|
io_only_df = find_io_without_devices(io_map, dev_canon)
|
|
|
|
present_flags: list[str] = []
|
|
matching_assigned: list[str] = []
|
|
|
|
for d_tag, d_c in zip(dev_tags, dev_canon):
|
|
if d_c in io_map:
|
|
present_flags.append("YES")
|
|
matching_assigned.append(", ".join(sorted(io_map[d_c])))
|
|
else:
|
|
present_flags.append("NO")
|
|
matching_assigned.append("")
|
|
|
|
res_df = pd.DataFrame({
|
|
"Device_Tag": dev_tags,
|
|
"Canonical_Tag_Used_For_Check": dev_canon,
|
|
"Present_In_IO": present_flags,
|
|
"Matching_IO_Assigned_Devices": matching_assigned,
|
|
})
|
|
|
|
# Save result (main sheet + duplicates if any)
|
|
out_path = out_path.with_suffix(".xlsx")
|
|
with pd.ExcelWriter(out_path, engine="openpyxl") as writer:
|
|
res_df.to_excel(writer, index=False, sheet_name="Devices_vs_IO")
|
|
if not duplicate_df.empty:
|
|
duplicate_df.to_excel(
|
|
writer, index=False, sheet_name="Duplicate_IO_Assignments"
|
|
)
|
|
if not io_only_df.empty:
|
|
io_only_df.to_excel(
|
|
writer, index=False, sheet_name="IO_Only_Assignments"
|
|
)
|
|
print(f"\nResult saved to: {out_path}")
|
|
|
|
# Summary
|
|
missing_df = res_df[res_df["Present_In_IO"] == "NO"]
|
|
total_devices = len(res_df)
|
|
missing_count = len(missing_df)
|
|
found_count = total_devices - missing_count
|
|
|
|
print("\n===== SUMMARY =====")
|
|
print(f"Total devices: {total_devices}")
|
|
print(f"Found in IO: {found_count}")
|
|
print(f"Missing: {missing_count}")
|
|
print(f"IO-only tags: {len(io_only_df)}")
|
|
|
|
if missing_count > 0:
|
|
print("\nDevices NOT found in IO (after all rules):")
|
|
for tag in sorted(missing_df["Device_Tag"].tolist()):
|
|
print(" -", tag)
|
|
|
|
if duplicate_df.empty:
|
|
print("\nDuplicate IO assignments: none detected.")
|
|
else:
|
|
print("\nDuplicate IO assignments detected (same raw value repeated):")
|
|
for _, row in duplicate_df.sort_values("Assigned_Device").iterrows():
|
|
print(
|
|
f" - {row['Assigned_Device']} ({row['Occurrences']} occurrences)"
|
|
)
|
|
|
|
if io_only_df.empty:
|
|
print("\nIO-only assignments: none detected.")
|
|
else:
|
|
print("\nIO-only assignments (no matching device tag):")
|
|
for _, row in io_only_df.iterrows():
|
|
print(
|
|
f" - {row['Canonical_Tag']} ({row['Occurrences']} occurrences): "
|
|
f"{row['IO_Assigned_Devices']}"
|
|
)
|
|
|
|
|
|
def main(argv=None):
|
|
parser = argparse.ArgumentParser(
|
|
description="Check that all devices from device list are present in IO file, "
|
|
"using EPC/VFD/PB/SS/R-S rules."
|
|
)
|
|
parser.add_argument("io_file", help="Input/Output Excel file (IO)")
|
|
parser.add_argument("devices_file", help="Devices Excel file")
|
|
parser.add_argument("output_file", help="Output Excel file path (will be .xlsx)")
|
|
|
|
parser.add_argument(
|
|
"--io-column",
|
|
help="Column name in IO file to use as Assigned Device (default: first column)",
|
|
default=None,
|
|
)
|
|
parser.add_argument(
|
|
"--dev-column",
|
|
help="Column name in devices file (default: P_TAG1 if exists, else first column)",
|
|
default=None,
|
|
)
|
|
|
|
args = parser.parse_args(argv)
|
|
|
|
io_path = Path(args.io_file)
|
|
dev_path = Path(args.devices_file)
|
|
out_path = Path(args.output_file)
|
|
|
|
if not io_path.is_file():
|
|
raise SystemExit(f"ERROR: IO file not found: {io_path}")
|
|
if not dev_path.is_file():
|
|
raise SystemExit(f"ERROR: Devices file not found: {dev_path}")
|
|
|
|
compare(
|
|
io_path=io_path,
|
|
dev_path=dev_path,
|
|
out_path=out_path,
|
|
io_column=args.io_column,
|
|
dev_column=args.dev_column,
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|