551 lines
15 KiB
Python
551 lines
15 KiB
Python
import pandas as pd
|
|
import re
|
|
import os
|
|
from openpyxl import load_workbook
|
|
from openpyxl.styles import PatternFill
|
|
from openpyxl.formatting.rule import FormulaRule
|
|
from datetime import datetime
|
|
|
|
|
|
DEVICE_CLASSES = [
|
|
("SAFETY", ["E-STOP", "ESTOP", "EMERGENCY", "SSP", "PULL CORD", "EPC"]),
|
|
("PB", ["PB", "PUSHBUTTON"]),
|
|
("TPE", ["TPE", "PHOTOEYE", "PE"]),
|
|
("ENC", ["ENC", "ENCODER"]),
|
|
("VFD", ["VFD"]),
|
|
("DCM", ["DCM", "DIVERT"]),
|
|
("JDS", ["JDS", "JAM"]),
|
|
("DISC", ["DISC", "DISCONNECT"]),
|
|
]
|
|
|
|
|
|
|
|
# -----------------------------
|
|
# Helpers
|
|
# -----------------------------
|
|
|
|
def extract_core_ids(desc):
|
|
"""
|
|
Pull strong identifiers like SDP1_SEND, LRP_SEND, CHUTE14_DCM_CABLE, etc.
|
|
We keep tokens that contain '_' because they are usually engineered names.
|
|
"""
|
|
if not desc:
|
|
return set()
|
|
|
|
u = desc.upper()
|
|
|
|
# Turn common separators into spaces but keep underscores
|
|
u = re.sub(r"[-/()]", " ", u)
|
|
|
|
tokens = re.split(r"\s+", u.strip())
|
|
|
|
# keep underscore identifiers, and also allow ones like SDP1_SEND (contains _)
|
|
ids = {t for t in tokens if "_" in t and len(t) >= 5}
|
|
return ids
|
|
|
|
|
|
def has_shared_core_id(gen_desc, plc_desc):
|
|
return len(extract_core_ids(gen_desc) & extract_core_ids(plc_desc)) > 0
|
|
|
|
|
|
def extract_device_class(desc):
|
|
if not desc:
|
|
return None
|
|
u = desc.upper()
|
|
for cls, keys in DEVICE_CLASSES:
|
|
for k in keys:
|
|
if k in u:
|
|
return cls
|
|
return None
|
|
|
|
|
|
def extract_numbers(desc):
|
|
if not desc:
|
|
return set()
|
|
return set(re.findall(r"\d+", desc))
|
|
|
|
|
|
def normalize_point(p):
|
|
if not isinstance(p, str):
|
|
return None
|
|
return p.strip().upper()
|
|
|
|
def normalize_desc(d):
|
|
if not isinstance(d, str):
|
|
return ""
|
|
d = d.strip()
|
|
return "" if d.upper() == "SPARE" else d
|
|
|
|
def normalize_channel(ch):
|
|
if not isinstance(ch, str):
|
|
return None
|
|
return ch.replace(".", "").upper()
|
|
|
|
def is_output_device(desc):
|
|
if not desc:
|
|
return False
|
|
u = desc.upper()
|
|
return any(k in u for k in ["HORN", "BEACON", "LIGHT"])
|
|
|
|
def split_section_tag_name(name):
|
|
"""
|
|
TAG row NAME looks like: PS2_4PT_APF1:SI
|
|
Return (device, section) or (None, None)
|
|
"""
|
|
if not isinstance(name, str):
|
|
return None, None
|
|
u = name.strip().upper()
|
|
m = re.match(r"(.+?):(I|O|SI|SO)$", u)
|
|
if not m:
|
|
return None, None
|
|
return m.group(1), m.group(2)
|
|
|
|
|
|
def split_point(point):
|
|
"""
|
|
PS1_6PT_APF1:I.IN_0
|
|
PS1_6PT_APF1:I.PT00.DATA
|
|
-> device, section, channel
|
|
"""
|
|
m = re.match(r"(.+?):(I|O|SI|SO)\.(.+)", point)
|
|
if not m:
|
|
return None, None, None
|
|
|
|
device = m.group(1)
|
|
section = m.group(2)
|
|
channel = normalize_channel(m.group(3))
|
|
|
|
return device, section, channel
|
|
|
|
def get_subsystem_from_generated_csv(path):
|
|
"""
|
|
Reads the Subsystem column (3rd column) from the generated CSV
|
|
and returns the first non-empty value.
|
|
"""
|
|
df = pd.read_csv(path, dtype=str).fillna("")
|
|
|
|
if df.shape[1] < 3:
|
|
return "UNKNOWN"
|
|
|
|
for val in df.iloc[:, 2]:
|
|
v = str(val).strip()
|
|
if v:
|
|
return v.upper()
|
|
|
|
return "UNKNOWN"
|
|
|
|
def is_apf_io_channel(device, channel):
|
|
return (
|
|
"_APF" in device.upper()
|
|
and channel.startswith("IO")
|
|
)
|
|
|
|
def find_apf_io_in_plc(plc, device, channel):
|
|
"""
|
|
APF IO_x outputs may be wired as PLC inputs.
|
|
Try locating the point under I if O is missing.
|
|
"""
|
|
return plc.get((device, "I", channel))
|
|
|
|
# -----------------------------
|
|
# File selection helper
|
|
# -----------------------------
|
|
|
|
def select_csv_file(prompt_text):
|
|
csv_files = [f for f in os.listdir(".") if f.lower().endswith(".csv")]
|
|
|
|
if not csv_files:
|
|
raise RuntimeError("No CSV files found in current directory.")
|
|
|
|
print("\nAvailable CSV files:")
|
|
for i, f in enumerate(csv_files, start=1):
|
|
print(f"{i}) {f}")
|
|
|
|
while True:
|
|
choice = input(f"\n{prompt_text} (number): ").strip()
|
|
if choice.isdigit():
|
|
idx = int(choice) - 1
|
|
if 0 <= idx < len(csv_files):
|
|
return csv_files[idx]
|
|
print("Invalid selection, try again.")
|
|
|
|
|
|
# -----------------------------
|
|
# Load PLC CSV
|
|
# -----------------------------
|
|
|
|
def load_plc_csv(path):
|
|
with open(path, "r", encoding="utf-8", errors="ignore") as f:
|
|
lines = f.readlines()
|
|
|
|
header_idx = None
|
|
for i, line in enumerate(lines):
|
|
if line.startswith("TYPE,"):
|
|
header_idx = i
|
|
break
|
|
|
|
if header_idx is None:
|
|
raise RuntimeError("Could not find TYPE header row in PLC export")
|
|
|
|
df = pd.read_csv(path, dtype=str, skiprows=header_idx).fillna("")
|
|
|
|
# ---- NEW: collect all SPECIFIER points that appear anywhere (mostly COMMENT rows)
|
|
all_points = set()
|
|
for val in df.get("SPECIFIER", []):
|
|
p = normalize_point(val)
|
|
if p:
|
|
all_points.add(p)
|
|
|
|
# ---- NEW: collect TAG sections (device, section) from NAME col
|
|
tag_sections = set()
|
|
for _, row in df[df["TYPE"] == "TAG"].iterrows():
|
|
dev, sec = split_section_tag_name(row.get("NAME", ""))
|
|
if dev and sec:
|
|
tag_sections.add((dev, sec))
|
|
|
|
# Keep existing behavior: only COMMENT rows become records
|
|
df = df[df["TYPE"] == "COMMENT"]
|
|
|
|
records = {}
|
|
for _, row in df.iterrows():
|
|
point = normalize_point(row.get("SPECIFIER", ""))
|
|
desc = normalize_desc(row.get("DESCRIPTION", ""))
|
|
|
|
device, section, channel = split_point(point)
|
|
if not device:
|
|
continue
|
|
|
|
key = (device, section, channel)
|
|
records[key] = {"section": section, "desc": desc, "raw_point": point}
|
|
|
|
return records, all_points, tag_sections
|
|
|
|
|
|
def normalize_point_path(p):
|
|
"""
|
|
Normalize IO path for strict comparison.
|
|
We keep structure, but normalize DATA member explicitly.
|
|
"""
|
|
if not isinstance(p, str):
|
|
return ""
|
|
p = p.upper().strip()
|
|
|
|
# Normalize common DATA variations
|
|
p = p.replace(".DATA", "_DATA")
|
|
|
|
return p
|
|
|
|
|
|
def io_path_mismatch(gen_point, plc_point):
|
|
"""
|
|
Returns True if IO paths differ structurally after normalization.
|
|
"""
|
|
return normalize_point_path(gen_point) != normalize_point_path(plc_point)
|
|
|
|
def section_implies_members(device, section, channel=None):
|
|
"""
|
|
Determines whether a PLC section implies member existence
|
|
even without explicit COMMENT rows.
|
|
"""
|
|
dev = device.upper()
|
|
sec = section.upper()
|
|
ch = (channel or "").upper()
|
|
|
|
# ---- APF rules ----
|
|
if "_APF" in dev:
|
|
# Safety
|
|
if sec in ("SI", "SO"):
|
|
return True
|
|
|
|
# Standard inputs
|
|
if sec == "I" and (ch.startswith("IN_") or ch.startswith("IO_")):
|
|
return True
|
|
|
|
# Standard outputs
|
|
if sec == "O" and ch.startswith("IO_"):
|
|
return True
|
|
|
|
return False
|
|
|
|
# ---- SIO rules ----
|
|
if "_SIO" in dev:
|
|
if sec in ("I", "O") and ch.startswith("PT"):
|
|
return True
|
|
return False
|
|
|
|
# ---- Default (FIO, others) ----
|
|
return False
|
|
|
|
# -----------------------------
|
|
# Load Generated CSV
|
|
# -----------------------------
|
|
|
|
def load_generated_csv(path):
|
|
df = pd.read_csv(path, dtype=str).fillna("")
|
|
|
|
records = {}
|
|
|
|
for _, row in df.iterrows():
|
|
point = normalize_point(row.iloc[0])
|
|
desc = normalize_desc(row.iloc[1])
|
|
|
|
device, section, channel = split_point(point)
|
|
if not device:
|
|
continue
|
|
|
|
key = (device, section, channel)
|
|
records[key] = {
|
|
"section": section,
|
|
"desc": desc,
|
|
"raw_point": point
|
|
}
|
|
|
|
return records
|
|
|
|
def build_gen_point_set(gen):
|
|
"""
|
|
Build a set of normalized GEN raw points for fast lookup.
|
|
"""
|
|
return {v["raw_point"] for v in gen.values()}
|
|
|
|
|
|
|
|
# -----------------------------
|
|
# Build Report
|
|
# -----------------------------
|
|
|
|
def build_report(plc, plc_all_points, plc_tag_sections, gen, output_xlsx):
|
|
rows = []
|
|
|
|
for key, gen_row in gen.items():
|
|
device, section, channel = key
|
|
plc_row = plc.get(key)
|
|
|
|
gen_desc = gen_row["desc"]
|
|
plc_desc = plc_row["desc"] if plc_row else ""
|
|
|
|
gen_empty = gen_desc == ""
|
|
plc_empty = not plc_row or plc_desc == ""
|
|
|
|
result = "PASS"
|
|
reason = "OK"
|
|
|
|
if gen_empty and plc_empty:
|
|
gen_point = gen_row["raw_point"]
|
|
|
|
section_allows_members = section_implies_members(device, section, channel)
|
|
|
|
if plc_row is None:
|
|
if section_allows_members:
|
|
# APF SI/SO safety blocks → members exist even without COMMENT rows
|
|
result = "OK"
|
|
reason = "SPARE"
|
|
|
|
else:
|
|
# For FIO / normal IO, member must exist explicitly
|
|
if gen_point in plc_all_points:
|
|
result = "OK"
|
|
reason = "SPARE"
|
|
else:
|
|
result = "REVIEW"
|
|
reason = "Should be deleted"
|
|
else:
|
|
result = "OK"
|
|
reason = "SPARE"
|
|
|
|
elif gen_empty and not plc_empty:
|
|
result = "FAIL"
|
|
reason = "IO MISSING IN ASSINGNMENT"
|
|
|
|
elif not gen_empty and plc_empty:
|
|
|
|
# APF special case: logical output wired as PLC input (IO_x)
|
|
if is_apf_io_channel(device, channel):
|
|
alt_plc = find_apf_io_in_plc(plc, device, channel)
|
|
if alt_plc:
|
|
plc_row = alt_plc
|
|
|
|
result = "REVIEW"
|
|
reason = "APF OUTPUT WIRED AS PLC INPUT"
|
|
else:
|
|
result = "FAIL"
|
|
reason = "OUTPUT NOT FOUND AT PLC ADDRESS"
|
|
|
|
else:
|
|
if is_output_device(gen_desc):
|
|
result = "FAIL"
|
|
reason = "OUTPUT NOT FOUND AT PLC ADDRESS"
|
|
else:
|
|
result = "FAIL"
|
|
reason = "IO MISSING IN PLC"
|
|
|
|
|
|
elif plc_row and gen_row["section"] != plc_row["section"]:
|
|
|
|
# APF special case: logical outputs wired as PLC inputs (IO_x)
|
|
if (
|
|
is_apf_io_channel(device, channel)
|
|
and gen_row["section"] == "O"
|
|
and plc_row["section"] == "I"
|
|
):
|
|
result = "REVIEW"
|
|
reason = f"APF OUTPUT WIRED AS PLC INPUT → {plc_row['raw_point']}"
|
|
|
|
else:
|
|
result = "FAIL"
|
|
reason = f"WRONG DIRECTION — ASSINGNMENT={gen_row['section']}, PLC={plc_row['section']}"
|
|
|
|
elif plc_row:
|
|
# STRICT IO PATH MATCH RULE (ALL DEVICES)
|
|
gen_point = gen_row["raw_point"]
|
|
plc_point = plc_row["raw_point"]
|
|
|
|
if io_path_mismatch(gen_point, plc_point):
|
|
result = "FAIL"
|
|
reason = "IO PATH FORMAT MISMATCH"
|
|
|
|
else:
|
|
if plc_desc and gen_desc:
|
|
gen_cls = extract_device_class(gen_desc)
|
|
plc_cls = extract_device_class(plc_desc)
|
|
|
|
gen_nums = extract_numbers(gen_desc)
|
|
plc_nums = extract_numbers(plc_desc)
|
|
|
|
if gen_cls and not plc_cls:
|
|
# If PLC description is a short-form engineered name that matches the assingnment one, PASS.
|
|
if has_shared_core_id(gen_desc, plc_desc):
|
|
result = "PASS"
|
|
reason = "PLC DESCRIPTION SHORT FORM"
|
|
else:
|
|
result = "FAIL"
|
|
reason = "PLC SIGNAL TYPE / STATUS MISMATCH"
|
|
|
|
elif gen_cls != plc_cls:
|
|
result = "FAIL"
|
|
reason = f"DEVICE TYPE MISMATCH — IO_ASS={gen_cls}, PLC={plc_cls}"
|
|
|
|
elif gen_nums != plc_nums:
|
|
result = "REVIEW"
|
|
reason = "DEVICE NUMBER MISMATCH"
|
|
|
|
else:
|
|
reason = "DESC PARTIAL / DIFFERENT"
|
|
|
|
|
|
rows.append({
|
|
"Point": gen_row["raw_point"],
|
|
"Assingnment_Description": gen_desc,
|
|
"PLC_Point": plc_row["raw_point"] if plc_row else "",
|
|
"PLC_Description": plc_desc,
|
|
"Result": result,
|
|
"Reason": reason
|
|
})
|
|
|
|
# -----------------------------
|
|
# PLC → GEN (missing in assignment)
|
|
# -----------------------------
|
|
|
|
gen_points = build_gen_point_set(gen)
|
|
|
|
for (device, section, channel), plc_row in plc.items():
|
|
plc_point = plc_row["raw_point"]
|
|
plc_desc = plc_row["desc"]
|
|
|
|
# Already covered by GEN → PLC
|
|
if plc_point in gen_points:
|
|
continue
|
|
|
|
# Ignore SPARE PLC points
|
|
if plc_desc == "":
|
|
continue
|
|
|
|
# Ignore implicit members (APF / SIO)
|
|
if section_implies_members(device, section, channel):
|
|
continue
|
|
|
|
# At this point: exists in PLC, not in GEN, not implicit
|
|
rows.append({
|
|
"Point": "",
|
|
"Assingnment_Description": "",
|
|
"PLC_Point": plc_point,
|
|
"PLC_Description": plc_desc,
|
|
"Result": "REVIEW",
|
|
"Reason": "Missing in assignment"
|
|
})
|
|
|
|
|
|
df = pd.DataFrame(rows)
|
|
df.to_excel(output_xlsx, index=False)
|
|
|
|
wb = load_workbook(output_xlsx)
|
|
ws = wb.active
|
|
|
|
result_col = None
|
|
for col in range(1, ws.max_column + 1):
|
|
if ws.cell(row=1, column=col).value == "Result":
|
|
result_col = col
|
|
break
|
|
|
|
if result_col:
|
|
green = PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid")
|
|
red = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid")
|
|
gray = PatternFill(start_color="E7E6E6", end_color="E7E6E6", fill_type="solid")
|
|
yellow = PatternFill(start_color="FFEB9C", end_color="FFEB9C", fill_type="solid")
|
|
|
|
|
|
col_letter = chr(64 + result_col)
|
|
|
|
ws.conditional_formatting.add(
|
|
f"A2:F{ws.max_row}",
|
|
FormulaRule(formula=[f"${col_letter}2=\"PASS\""], fill=green)
|
|
)
|
|
ws.conditional_formatting.add(
|
|
f"A2:F{ws.max_row}",
|
|
FormulaRule(formula=[f"${col_letter}2=\"FAIL\""], fill=red)
|
|
)
|
|
ws.conditional_formatting.add(
|
|
f"A2:F{ws.max_row}",
|
|
FormulaRule(formula=[f"${col_letter}2=\"OK\""], fill=gray)
|
|
)
|
|
ws.conditional_formatting.add(
|
|
f"A2:F{ws.max_row}",
|
|
FormulaRule(formula=[f"${col_letter}2=\"REVIEW\""], fill=yellow)
|
|
)
|
|
|
|
wb.save(output_xlsx)
|
|
|
|
|
|
# -----------------------------
|
|
# Main
|
|
# -----------------------------
|
|
|
|
def main():
|
|
plc_csv = select_csv_file("Select PLC export file")
|
|
gen_csv = select_csv_file("Select Generated IO file")
|
|
|
|
print(f"\nComparing:\n PLC → {plc_csv}\n GEN → {gen_csv}\n")
|
|
|
|
plc, plc_all_points, plc_tag_sections = load_plc_csv(plc_csv)
|
|
|
|
gen = load_generated_csv(gen_csv)
|
|
|
|
subsystem = get_subsystem_from_generated_csv(gen_csv)
|
|
|
|
run_ts = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
|
|
output_file = f"{subsystem}_verification_report_{run_ts}.xlsx"
|
|
|
|
|
|
build_report(plc, plc_all_points, plc_tag_sections, gen, output_file)
|
|
|
|
|
|
df = pd.read_excel(output_file)
|
|
if (df["Result"] == "FAIL").any():
|
|
print(f"🔴 FAIL — see {output_file}")
|
|
else:
|
|
print(f"🟢 PASS — see {output_file}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|