IO_checkout/compare-plc-generated/compare-plc-generated.py
2026-01-20 08:14:59 +04:00

551 lines
15 KiB
Python

import pandas as pd
import re
import os
from openpyxl import load_workbook
from openpyxl.styles import PatternFill
from openpyxl.formatting.rule import FormulaRule
from datetime import datetime
DEVICE_CLASSES = [
("SAFETY", ["E-STOP", "ESTOP", "EMERGENCY", "SSP", "PULL CORD", "EPC"]),
("PB", ["PB", "PUSHBUTTON"]),
("TPE", ["TPE", "PHOTOEYE", "PE"]),
("ENC", ["ENC", "ENCODER"]),
("VFD", ["VFD"]),
("DCM", ["DCM", "DIVERT"]),
("JDS", ["JDS", "JAM"]),
("DISC", ["DISC", "DISCONNECT"]),
]
# -----------------------------
# Helpers
# -----------------------------
def extract_core_ids(desc):
"""
Pull strong identifiers like SDP1_SEND, LRP_SEND, CHUTE14_DCM_CABLE, etc.
We keep tokens that contain '_' because they are usually engineered names.
"""
if not desc:
return set()
u = desc.upper()
# Turn common separators into spaces but keep underscores
u = re.sub(r"[-/()]", " ", u)
tokens = re.split(r"\s+", u.strip())
# keep underscore identifiers, and also allow ones like SDP1_SEND (contains _)
ids = {t for t in tokens if "_" in t and len(t) >= 5}
return ids
def has_shared_core_id(gen_desc, plc_desc):
return len(extract_core_ids(gen_desc) & extract_core_ids(plc_desc)) > 0
def extract_device_class(desc):
if not desc:
return None
u = desc.upper()
for cls, keys in DEVICE_CLASSES:
for k in keys:
if k in u:
return cls
return None
def extract_numbers(desc):
if not desc:
return set()
return set(re.findall(r"\d+", desc))
def normalize_point(p):
if not isinstance(p, str):
return None
return p.strip().upper()
def normalize_desc(d):
if not isinstance(d, str):
return ""
d = d.strip()
return "" if d.upper() == "SPARE" else d
def normalize_channel(ch):
if not isinstance(ch, str):
return None
return ch.replace(".", "").upper()
def is_output_device(desc):
if not desc:
return False
u = desc.upper()
return any(k in u for k in ["HORN", "BEACON", "LIGHT"])
def split_section_tag_name(name):
"""
TAG row NAME looks like: PS2_4PT_APF1:SI
Return (device, section) or (None, None)
"""
if not isinstance(name, str):
return None, None
u = name.strip().upper()
m = re.match(r"(.+?):(I|O|SI|SO)$", u)
if not m:
return None, None
return m.group(1), m.group(2)
def split_point(point):
"""
PS1_6PT_APF1:I.IN_0
PS1_6PT_APF1:I.PT00.DATA
-> device, section, channel
"""
m = re.match(r"(.+?):(I|O|SI|SO)\.(.+)", point)
if not m:
return None, None, None
device = m.group(1)
section = m.group(2)
channel = normalize_channel(m.group(3))
return device, section, channel
def get_subsystem_from_generated_csv(path):
"""
Reads the Subsystem column (3rd column) from the generated CSV
and returns the first non-empty value.
"""
df = pd.read_csv(path, dtype=str).fillna("")
if df.shape[1] < 3:
return "UNKNOWN"
for val in df.iloc[:, 2]:
v = str(val).strip()
if v:
return v.upper()
return "UNKNOWN"
def is_apf_io_channel(device, channel):
return (
"_APF" in device.upper()
and channel.startswith("IO")
)
def find_apf_io_in_plc(plc, device, channel):
"""
APF IO_x outputs may be wired as PLC inputs.
Try locating the point under I if O is missing.
"""
return plc.get((device, "I", channel))
# -----------------------------
# File selection helper
# -----------------------------
def select_csv_file(prompt_text):
csv_files = [f for f in os.listdir(".") if f.lower().endswith(".csv")]
if not csv_files:
raise RuntimeError("No CSV files found in current directory.")
print("\nAvailable CSV files:")
for i, f in enumerate(csv_files, start=1):
print(f"{i}) {f}")
while True:
choice = input(f"\n{prompt_text} (number): ").strip()
if choice.isdigit():
idx = int(choice) - 1
if 0 <= idx < len(csv_files):
return csv_files[idx]
print("Invalid selection, try again.")
# -----------------------------
# Load PLC CSV
# -----------------------------
def load_plc_csv(path):
with open(path, "r", encoding="utf-8", errors="ignore") as f:
lines = f.readlines()
header_idx = None
for i, line in enumerate(lines):
if line.startswith("TYPE,"):
header_idx = i
break
if header_idx is None:
raise RuntimeError("Could not find TYPE header row in PLC export")
df = pd.read_csv(path, dtype=str, skiprows=header_idx).fillna("")
# ---- NEW: collect all SPECIFIER points that appear anywhere (mostly COMMENT rows)
all_points = set()
for val in df.get("SPECIFIER", []):
p = normalize_point(val)
if p:
all_points.add(p)
# ---- NEW: collect TAG sections (device, section) from NAME col
tag_sections = set()
for _, row in df[df["TYPE"] == "TAG"].iterrows():
dev, sec = split_section_tag_name(row.get("NAME", ""))
if dev and sec:
tag_sections.add((dev, sec))
# Keep existing behavior: only COMMENT rows become records
df = df[df["TYPE"] == "COMMENT"]
records = {}
for _, row in df.iterrows():
point = normalize_point(row.get("SPECIFIER", ""))
desc = normalize_desc(row.get("DESCRIPTION", ""))
device, section, channel = split_point(point)
if not device:
continue
key = (device, section, channel)
records[key] = {"section": section, "desc": desc, "raw_point": point}
return records, all_points, tag_sections
def normalize_point_path(p):
"""
Normalize IO path for strict comparison.
We keep structure, but normalize DATA member explicitly.
"""
if not isinstance(p, str):
return ""
p = p.upper().strip()
# Normalize common DATA variations
p = p.replace(".DATA", "_DATA")
return p
def io_path_mismatch(gen_point, plc_point):
"""
Returns True if IO paths differ structurally after normalization.
"""
return normalize_point_path(gen_point) != normalize_point_path(plc_point)
def section_implies_members(device, section, channel=None):
"""
Determines whether a PLC section implies member existence
even without explicit COMMENT rows.
"""
dev = device.upper()
sec = section.upper()
ch = (channel or "").upper()
# ---- APF rules ----
if "_APF" in dev:
# Safety
if sec in ("SI", "SO"):
return True
# Standard inputs
if sec == "I" and (ch.startswith("IN_") or ch.startswith("IO_")):
return True
# Standard outputs
if sec == "O" and ch.startswith("IO_"):
return True
return False
# ---- SIO rules ----
if "_SIO" in dev:
if sec in ("I", "O") and ch.startswith("PT"):
return True
return False
# ---- Default (FIO, others) ----
return False
# -----------------------------
# Load Generated CSV
# -----------------------------
def load_generated_csv(path):
df = pd.read_csv(path, dtype=str).fillna("")
records = {}
for _, row in df.iterrows():
point = normalize_point(row.iloc[0])
desc = normalize_desc(row.iloc[1])
device, section, channel = split_point(point)
if not device:
continue
key = (device, section, channel)
records[key] = {
"section": section,
"desc": desc,
"raw_point": point
}
return records
def build_gen_point_set(gen):
"""
Build a set of normalized GEN raw points for fast lookup.
"""
return {v["raw_point"] for v in gen.values()}
# -----------------------------
# Build Report
# -----------------------------
def build_report(plc, plc_all_points, plc_tag_sections, gen, output_xlsx):
rows = []
for key, gen_row in gen.items():
device, section, channel = key
plc_row = plc.get(key)
gen_desc = gen_row["desc"]
plc_desc = plc_row["desc"] if plc_row else ""
gen_empty = gen_desc == ""
plc_empty = not plc_row or plc_desc == ""
result = "PASS"
reason = "OK"
if gen_empty and plc_empty:
gen_point = gen_row["raw_point"]
section_allows_members = section_implies_members(device, section, channel)
if plc_row is None:
if section_allows_members:
# APF SI/SO safety blocks → members exist even without COMMENT rows
result = "OK"
reason = "SPARE"
else:
# For FIO / normal IO, member must exist explicitly
if gen_point in plc_all_points:
result = "OK"
reason = "SPARE"
else:
result = "REVIEW"
reason = "Should be deleted"
else:
result = "OK"
reason = "SPARE"
elif gen_empty and not plc_empty:
result = "FAIL"
reason = "IO MISSING IN ASSINGNMENT"
elif not gen_empty and plc_empty:
# APF special case: logical output wired as PLC input (IO_x)
if is_apf_io_channel(device, channel):
alt_plc = find_apf_io_in_plc(plc, device, channel)
if alt_plc:
plc_row = alt_plc
result = "REVIEW"
reason = "APF OUTPUT WIRED AS PLC INPUT"
else:
result = "FAIL"
reason = "OUTPUT NOT FOUND AT PLC ADDRESS"
else:
if is_output_device(gen_desc):
result = "FAIL"
reason = "OUTPUT NOT FOUND AT PLC ADDRESS"
else:
result = "FAIL"
reason = "IO MISSING IN PLC"
elif plc_row and gen_row["section"] != plc_row["section"]:
# APF special case: logical outputs wired as PLC inputs (IO_x)
if (
is_apf_io_channel(device, channel)
and gen_row["section"] == "O"
and plc_row["section"] == "I"
):
result = "REVIEW"
reason = f"APF OUTPUT WIRED AS PLC INPUT → {plc_row['raw_point']}"
else:
result = "FAIL"
reason = f"WRONG DIRECTION — ASSINGNMENT={gen_row['section']}, PLC={plc_row['section']}"
elif plc_row:
# STRICT IO PATH MATCH RULE (ALL DEVICES)
gen_point = gen_row["raw_point"]
plc_point = plc_row["raw_point"]
if io_path_mismatch(gen_point, plc_point):
result = "FAIL"
reason = "IO PATH FORMAT MISMATCH"
else:
if plc_desc and gen_desc:
gen_cls = extract_device_class(gen_desc)
plc_cls = extract_device_class(plc_desc)
gen_nums = extract_numbers(gen_desc)
plc_nums = extract_numbers(plc_desc)
if gen_cls and not plc_cls:
# If PLC description is a short-form engineered name that matches the assingnment one, PASS.
if has_shared_core_id(gen_desc, plc_desc):
result = "PASS"
reason = "PLC DESCRIPTION SHORT FORM"
else:
result = "FAIL"
reason = "PLC SIGNAL TYPE / STATUS MISMATCH"
elif gen_cls != plc_cls:
result = "FAIL"
reason = f"DEVICE TYPE MISMATCH — IO_ASS={gen_cls}, PLC={plc_cls}"
elif gen_nums != plc_nums:
result = "REVIEW"
reason = "DEVICE NUMBER MISMATCH"
else:
reason = "DESC PARTIAL / DIFFERENT"
rows.append({
"Point": gen_row["raw_point"],
"Assingnment_Description": gen_desc,
"PLC_Point": plc_row["raw_point"] if plc_row else "",
"PLC_Description": plc_desc,
"Result": result,
"Reason": reason
})
# -----------------------------
# PLC → GEN (missing in assignment)
# -----------------------------
gen_points = build_gen_point_set(gen)
for (device, section, channel), plc_row in plc.items():
plc_point = plc_row["raw_point"]
plc_desc = plc_row["desc"]
# Already covered by GEN → PLC
if plc_point in gen_points:
continue
# Ignore SPARE PLC points
if plc_desc == "":
continue
# Ignore implicit members (APF / SIO)
if section_implies_members(device, section, channel):
continue
# At this point: exists in PLC, not in GEN, not implicit
rows.append({
"Point": "",
"Assingnment_Description": "",
"PLC_Point": plc_point,
"PLC_Description": plc_desc,
"Result": "REVIEW",
"Reason": "Missing in assignment"
})
df = pd.DataFrame(rows)
df.to_excel(output_xlsx, index=False)
wb = load_workbook(output_xlsx)
ws = wb.active
result_col = None
for col in range(1, ws.max_column + 1):
if ws.cell(row=1, column=col).value == "Result":
result_col = col
break
if result_col:
green = PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid")
red = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid")
gray = PatternFill(start_color="E7E6E6", end_color="E7E6E6", fill_type="solid")
yellow = PatternFill(start_color="FFEB9C", end_color="FFEB9C", fill_type="solid")
col_letter = chr(64 + result_col)
ws.conditional_formatting.add(
f"A2:F{ws.max_row}",
FormulaRule(formula=[f"${col_letter}2=\"PASS\""], fill=green)
)
ws.conditional_formatting.add(
f"A2:F{ws.max_row}",
FormulaRule(formula=[f"${col_letter}2=\"FAIL\""], fill=red)
)
ws.conditional_formatting.add(
f"A2:F{ws.max_row}",
FormulaRule(formula=[f"${col_letter}2=\"OK\""], fill=gray)
)
ws.conditional_formatting.add(
f"A2:F{ws.max_row}",
FormulaRule(formula=[f"${col_letter}2=\"REVIEW\""], fill=yellow)
)
wb.save(output_xlsx)
# -----------------------------
# Main
# -----------------------------
def main():
plc_csv = select_csv_file("Select PLC export file")
gen_csv = select_csv_file("Select Generated IO file")
print(f"\nComparing:\n PLC → {plc_csv}\n GEN → {gen_csv}\n")
plc, plc_all_points, plc_tag_sections = load_plc_csv(plc_csv)
gen = load_generated_csv(gen_csv)
subsystem = get_subsystem_from_generated_csv(gen_csv)
run_ts = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
output_file = f"{subsystem}_verification_report_{run_ts}.xlsx"
build_report(plc, plc_all_points, plc_tag_sections, gen, output_file)
df = pd.read_excel(output_file)
if (df["Result"] == "FAIL").any():
print(f"🔴 FAIL — see {output_file}")
else:
print(f"🟢 PASS — see {output_file}")
if __name__ == "__main__":
main()