import pandas as pd import re import os from openpyxl import load_workbook from openpyxl.styles import PatternFill from openpyxl.formatting.rule import FormulaRule from datetime import datetime DEVICE_CLASSES = [ ("SAFETY", ["E-STOP", "ESTOP", "EMERGENCY", "SSP", "PULL CORD", "EPC"]), ("PB", ["PB", "PUSHBUTTON"]), ("TPE", ["TPE", "PHOTOEYE", "PE"]), ("ENC", ["ENC", "ENCODER"]), ("VFD", ["VFD"]), ("DCM", ["DCM", "DIVERT"]), ("JDS", ["JDS", "JAM"]), ("DISC", ["DISC", "DISCONNECT"]), ] # ----------------------------- # Helpers # ----------------------------- def extract_core_ids(desc): """ Pull strong identifiers like SDP1_SEND, LRP_SEND, CHUTE14_DCM_CABLE, etc. We keep tokens that contain '_' because they are usually engineered names. """ if not desc: return set() u = desc.upper() # Turn common separators into spaces but keep underscores u = re.sub(r"[-/()]", " ", u) tokens = re.split(r"\s+", u.strip()) # keep underscore identifiers, and also allow ones like SDP1_SEND (contains _) ids = {t for t in tokens if "_" in t and len(t) >= 5} return ids def has_shared_core_id(gen_desc, plc_desc): return len(extract_core_ids(gen_desc) & extract_core_ids(plc_desc)) > 0 def extract_device_class(desc): if not desc: return None u = desc.upper() for cls, keys in DEVICE_CLASSES: for k in keys: if k in u: return cls return None def extract_numbers(desc): if not desc: return set() return set(re.findall(r"\d+", desc)) def normalize_point(p): if not isinstance(p, str): return None return p.strip().upper() def normalize_desc(d): if not isinstance(d, str): return "" d = d.strip() return "" if d.upper() == "SPARE" else d def normalize_channel(ch): if not isinstance(ch, str): return None return ch.replace(".", "").upper() def is_output_device(desc): if not desc: return False u = desc.upper() return any(k in u for k in ["HORN", "BEACON", "LIGHT"]) def split_section_tag_name(name): """ TAG row NAME looks like: PS2_4PT_APF1:SI Return (device, section) or (None, None) """ if not isinstance(name, str): return None, None u = name.strip().upper() m = re.match(r"(.+?):(I|O|SI|SO)$", u) if not m: return None, None return m.group(1), m.group(2) def split_point(point): """ PS1_6PT_APF1:I.IN_0 PS1_6PT_APF1:I.PT00.DATA -> device, section, channel """ m = re.match(r"(.+?):(I|O|SI|SO)\.(.+)", point) if not m: return None, None, None device = m.group(1) section = m.group(2) channel = normalize_channel(m.group(3)) return device, section, channel def get_subsystem_from_generated_csv(path): """ Reads the Subsystem column (3rd column) from the generated CSV and returns the first non-empty value. """ df = pd.read_csv(path, dtype=str).fillna("") if df.shape[1] < 3: return "UNKNOWN" for val in df.iloc[:, 2]: v = str(val).strip() if v: return v.upper() return "UNKNOWN" def is_apf_io_channel(device, channel): return ( "_APF" in device.upper() and channel.startswith("IO") ) def find_apf_io_in_plc(plc, device, channel): """ APF IO_x outputs may be wired as PLC inputs. Try locating the point under I if O is missing. """ return plc.get((device, "I", channel)) # ----------------------------- # File selection helper # ----------------------------- def select_csv_file(prompt_text): csv_files = [f for f in os.listdir(".") if f.lower().endswith(".csv")] if not csv_files: raise RuntimeError("No CSV files found in current directory.") print("\nAvailable CSV files:") for i, f in enumerate(csv_files, start=1): print(f"{i}) {f}") while True: choice = input(f"\n{prompt_text} (number): ").strip() if choice.isdigit(): idx = int(choice) - 1 if 0 <= idx < len(csv_files): return csv_files[idx] print("Invalid selection, try again.") # ----------------------------- # Load PLC CSV # ----------------------------- def load_plc_csv(path): with open(path, "r", encoding="utf-8", errors="ignore") as f: lines = f.readlines() header_idx = None for i, line in enumerate(lines): if line.startswith("TYPE,"): header_idx = i break if header_idx is None: raise RuntimeError("Could not find TYPE header row in PLC export") df = pd.read_csv(path, dtype=str, skiprows=header_idx).fillna("") # ---- NEW: collect all SPECIFIER points that appear anywhere (mostly COMMENT rows) all_points = set() for val in df.get("SPECIFIER", []): p = normalize_point(val) if p: all_points.add(p) # ---- NEW: collect TAG sections (device, section) from NAME col tag_sections = set() for _, row in df[df["TYPE"] == "TAG"].iterrows(): dev, sec = split_section_tag_name(row.get("NAME", "")) if dev and sec: tag_sections.add((dev, sec)) # Keep existing behavior: only COMMENT rows become records df = df[df["TYPE"] == "COMMENT"] records = {} for _, row in df.iterrows(): point = normalize_point(row.get("SPECIFIER", "")) desc = normalize_desc(row.get("DESCRIPTION", "")) device, section, channel = split_point(point) if not device: continue key = (device, section, channel) records[key] = {"section": section, "desc": desc, "raw_point": point} return records, all_points, tag_sections def normalize_point_path(p): """ Normalize IO path for strict comparison. We keep structure, but normalize DATA member explicitly. """ if not isinstance(p, str): return "" p = p.upper().strip() # Normalize common DATA variations p = p.replace(".DATA", "_DATA") return p def io_path_mismatch(gen_point, plc_point): """ Returns True if IO paths differ structurally after normalization. """ return normalize_point_path(gen_point) != normalize_point_path(plc_point) def section_implies_members(device, section, channel=None): """ Determines whether a PLC section implies member existence even without explicit COMMENT rows. """ dev = device.upper() sec = section.upper() ch = (channel or "").upper() # ---- APF rules ---- if "_APF" in dev: # Safety if sec in ("SI", "SO"): return True # Standard inputs if sec == "I" and (ch.startswith("IN_") or ch.startswith("IO_")): return True # Standard outputs if sec == "O" and ch.startswith("IO_"): return True return False # ---- SIO rules ---- if "_SIO" in dev: if sec in ("I", "O") and ch.startswith("PT"): return True return False # ---- Default (FIO, others) ---- return False # ----------------------------- # Load Generated CSV # ----------------------------- def load_generated_csv(path): df = pd.read_csv(path, dtype=str).fillna("") records = {} for _, row in df.iterrows(): point = normalize_point(row.iloc[0]) desc = normalize_desc(row.iloc[1]) device, section, channel = split_point(point) if not device: continue key = (device, section, channel) records[key] = { "section": section, "desc": desc, "raw_point": point } return records def build_gen_point_set(gen): """ Build a set of normalized GEN raw points for fast lookup. """ return {v["raw_point"] for v in gen.values()} # ----------------------------- # Build Report # ----------------------------- def build_report(plc, plc_all_points, plc_tag_sections, gen, output_xlsx): rows = [] for key, gen_row in gen.items(): device, section, channel = key plc_row = plc.get(key) gen_desc = gen_row["desc"] plc_desc = plc_row["desc"] if plc_row else "" gen_empty = gen_desc == "" plc_empty = not plc_row or plc_desc == "" result = "PASS" reason = "OK" if gen_empty and plc_empty: gen_point = gen_row["raw_point"] section_allows_members = section_implies_members(device, section, channel) if plc_row is None: if section_allows_members: # APF SI/SO safety blocks → members exist even without COMMENT rows result = "OK" reason = "SPARE" else: # For FIO / normal IO, member must exist explicitly if gen_point in plc_all_points: result = "OK" reason = "SPARE" else: result = "REVIEW" reason = "Should be deleted" else: result = "OK" reason = "SPARE" elif gen_empty and not plc_empty: result = "FAIL" reason = "IO MISSING IN ASSINGNMENT" elif not gen_empty and plc_empty: # APF special case: logical output wired as PLC input (IO_x) if is_apf_io_channel(device, channel): alt_plc = find_apf_io_in_plc(plc, device, channel) if alt_plc: plc_row = alt_plc result = "REVIEW" reason = "APF OUTPUT WIRED AS PLC INPUT" else: result = "FAIL" reason = "OUTPUT NOT FOUND AT PLC ADDRESS" else: if is_output_device(gen_desc): result = "FAIL" reason = "OUTPUT NOT FOUND AT PLC ADDRESS" else: result = "FAIL" reason = "IO MISSING IN PLC" elif plc_row and gen_row["section"] != plc_row["section"]: # APF special case: logical outputs wired as PLC inputs (IO_x) if ( is_apf_io_channel(device, channel) and gen_row["section"] == "O" and plc_row["section"] == "I" ): result = "REVIEW" reason = f"APF OUTPUT WIRED AS PLC INPUT → {plc_row['raw_point']}" else: result = "FAIL" reason = f"WRONG DIRECTION — ASSINGNMENT={gen_row['section']}, PLC={plc_row['section']}" elif plc_row: # STRICT IO PATH MATCH RULE (ALL DEVICES) gen_point = gen_row["raw_point"] plc_point = plc_row["raw_point"] if io_path_mismatch(gen_point, plc_point): result = "FAIL" reason = "IO PATH FORMAT MISMATCH" else: if plc_desc and gen_desc: gen_cls = extract_device_class(gen_desc) plc_cls = extract_device_class(plc_desc) gen_nums = extract_numbers(gen_desc) plc_nums = extract_numbers(plc_desc) if gen_cls and not plc_cls: # If PLC description is a short-form engineered name that matches the assingnment one, PASS. if has_shared_core_id(gen_desc, plc_desc): result = "PASS" reason = "PLC DESCRIPTION SHORT FORM" else: result = "FAIL" reason = "PLC SIGNAL TYPE / STATUS MISMATCH" elif gen_cls != plc_cls: result = "FAIL" reason = f"DEVICE TYPE MISMATCH — IO_ASS={gen_cls}, PLC={plc_cls}" elif gen_nums != plc_nums: result = "REVIEW" reason = "DEVICE NUMBER MISMATCH" else: reason = "DESC PARTIAL / DIFFERENT" rows.append({ "Point": gen_row["raw_point"], "Assingnment_Description": gen_desc, "PLC_Point": plc_row["raw_point"] if plc_row else "", "PLC_Description": plc_desc, "Result": result, "Reason": reason }) # ----------------------------- # PLC → GEN (missing in assignment) # ----------------------------- gen_points = build_gen_point_set(gen) for (device, section, channel), plc_row in plc.items(): plc_point = plc_row["raw_point"] plc_desc = plc_row["desc"] # Already covered by GEN → PLC if plc_point in gen_points: continue # Ignore SPARE PLC points if plc_desc == "": continue # Ignore implicit members (APF / SIO) if section_implies_members(device, section, channel): continue # At this point: exists in PLC, not in GEN, not implicit rows.append({ "Point": "", "Assingnment_Description": "", "PLC_Point": plc_point, "PLC_Description": plc_desc, "Result": "REVIEW", "Reason": "Missing in assignment" }) df = pd.DataFrame(rows) df.to_excel(output_xlsx, index=False) wb = load_workbook(output_xlsx) ws = wb.active result_col = None for col in range(1, ws.max_column + 1): if ws.cell(row=1, column=col).value == "Result": result_col = col break if result_col: green = PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid") red = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid") gray = PatternFill(start_color="E7E6E6", end_color="E7E6E6", fill_type="solid") yellow = PatternFill(start_color="FFEB9C", end_color="FFEB9C", fill_type="solid") col_letter = chr(64 + result_col) ws.conditional_formatting.add( f"A2:F{ws.max_row}", FormulaRule(formula=[f"${col_letter}2=\"PASS\""], fill=green) ) ws.conditional_formatting.add( f"A2:F{ws.max_row}", FormulaRule(formula=[f"${col_letter}2=\"FAIL\""], fill=red) ) ws.conditional_formatting.add( f"A2:F{ws.max_row}", FormulaRule(formula=[f"${col_letter}2=\"OK\""], fill=gray) ) ws.conditional_formatting.add( f"A2:F{ws.max_row}", FormulaRule(formula=[f"${col_letter}2=\"REVIEW\""], fill=yellow) ) wb.save(output_xlsx) # ----------------------------- # Main # ----------------------------- def main(): plc_csv = select_csv_file("Select PLC export file") gen_csv = select_csv_file("Select Generated IO file") print(f"\nComparing:\n PLC → {plc_csv}\n GEN → {gen_csv}\n") plc, plc_all_points, plc_tag_sections = load_plc_csv(plc_csv) gen = load_generated_csv(gen_csv) subsystem = get_subsystem_from_generated_csv(gen_csv) run_ts = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") output_file = f"{subsystem}_verification_report_{run_ts}.xlsx" build_report(plc, plc_all_points, plc_tag_sections, gen, output_file) df = pd.read_excel(output_file) if (df["Result"] == "FAIL").any(): print(f"🔴 FAIL — see {output_file}") else: print(f"🟢 PASS — see {output_file}") if __name__ == "__main__": main()