import os, json, sys # ====================================================== # Helper Function: State Resolver # ====================================================== def get_device_state(value, tagPath): up = tagPath.upper() # === Base state dictionary === state_mappings = { 0: "Closed", 1: "Actuated", 2: "Communication Faulted", 3: "Conveyor Running In Maintenance Mode", 4: "Disabled", 5: "Disconnected", 6: "Stopped", 7: "Enabled Not Running", 8: "Encoder Fault", 9: "Energy Management", 10: "ESTOP Was Actuated", 11: "EStopped", 12: "EStopped Locally", 13: "Extended Faulted", 14: "Full", 15: "Gaylord Start Pressed", 16: "Jam Fault", 17: "Jammed", 18: "Loading Allowed", 19: "Loading Not Allowed", 20: "Low Air Pressure Fault Was Present", 21: "Maintenance Mode", 22: "Conveyor Stopped In Maintenance Mode", 23: "Motor Faulted", 24: "Motor Was Faulted", 25: "Normal", 26: "Off Inactive", 27: "Open", 28: "PLC Ready To Run", 29: "Package Release Pressed", 30: "Power Branch Was Faulted", 31: "Pressed", 32: "Ready To Receive", 33: "Running", 34: "Started", 35: "Stopped", 36: "System Started", 37: "Unknown", 38: "VFD Fault", 39: "Conveyor Running In Power Saving Mode", 40: "Conveyor Jogging In Maintenance Mode", 41: "VFD Reset Required", 42: "Jam Reset Push Button Pressed", 43: "Start Push Button Pressed", 44: "Stop Push Button Pressed", 45: "No Container", 46: "Ready To Be Enabled", 47: "Half Full", 48: "Enabled", 49: "Tipper Faulted", 50: "OK", 51: "Disconnected", 52: "Faulted", 53: "Faulted/Disconnect", 54: "Diverting" } # === TPE (Tracking Photoeye) === if "/TPE/" in up: if value == 0: return "Blocked" elif value == 27: return "Clear" elif value == 17: return "Jammed" else: return state_mappings.get(value, "Offline") # === Single Photoeyes (PE1, PE2) === if up.endswith(("PE1", "PE2")): if value == 4: return "Clear" else: return "Blocked" # === Prox Sensors (PRX1, PRX2) === if up.endswith(("PRX1", "PRX2")): if value: return "Inactive" else: return "Actuated" # === Beacons (BCN) === if "/BEACON" in up: if value == 0: return "Off" elif value == 1: return "Cleared / Reset Required" else: return "Active" # === Default === return state_mappings.get(value, "Offline") # ====================================================== # Helper Function: Read One Device (multi or single state) # ====================================================== def read_device_status(tagPath, provider, dev_name): """ Reads the appropriate state tag(s) for a given device and returns a list of {Device, Status} dictionaries. Handles multi-state (SS), VFD, PRX, PE, EN, etc. """ rows = [] try: up = tagPath.upper() # === Case 0: SS (Start/Stop Station) === if up.endswith("SS") or "/SS/" in up: for sub in ("Start", "Stop"): sub_path = provider + tagPath + "/" + sub + "/State" try: result = system.tag.readBlocking([sub_path])[0] if result.quality.isGood(): status_value = get_device_state(result.value, tagPath) else: status_value = "Offline" except: status_value = "Offline" rows.append({ "Device": "{} ({})".format(dev_name, sub), "Status": status_value }) return rows # handled fully # === Case 1: VFD / Conveyor === if "/VFD/" in up: path = provider + tagPath + "/Drive/Lenze" # === Case 2: Chute sensors (PE / PRX) === elif up.endswith(("PE1", "PE2", "PRX1", "PRX2")) and "/TPE/" not in up: path = provider + tagPath # === Case 3: Chute EN === elif up.endswith("EN") and "/CHUTE/" in up: path = provider + tagPath + "_State" # === Case 4: Default === else: path = provider + tagPath + "/State" try: result = system.tag.readBlocking([path])[0] if result.quality.isGood(): status_value = get_device_state(result.value, tagPath) else: status_value = "Offline" except: status_value = "Offline" rows.append({ "Device": dev_name, "Status": status_value }) except Exception as e: system.perspective.print("Error reading device status for %s: %s" % (dev_name, e)) return rows # ====================================================== # Helper Function: Single Device Reader (for Docked Device View) # ====================================================== def get_single_device_status(self, tagPath): """ Reads a single device tag (used for docked device views). Returns a single readable status string (e.g. "Running", "Blocked", etc.) """ try: up = tagPath.upper() provider = "[" + self.session.custom.fc + "_SCADA_TAG_PROVIDER]" if up.endswith("SS") or "/SS/" in up: states = [] for sub in ("Start", "Stop"): sub_path = provider + tagPath + "/" + sub + "/State" try: result = system.tag.readBlocking([sub_path])[0] if result.quality.isGood(): states.append("{}: {}".format(sub, get_device_state(result.value, tagPath))) else: states.append("{}: Unknown".format(sub)) except: states.append("{}: Unknown".format(sub)) return " | ".join(states) # === VFD === if "/VFD/" in up: path = provider + tagPath + "/Drive/Lenze" # === Sensors === elif up.endswith(("PE1", "PE2", "PRX1", "PRX2")) and "/TPE/" not in up: path = provider + tagPath # === EN === elif up.endswith("EN") and "/CHUTE/" in up: path = provider + tagPath + "_State" else: path = provider + tagPath + "/State" result = system.tag.readBlocking([path])[0] if result.quality.isGood(): return get_device_state(result.value, tagPath) else: return "Offline" except Exception as e: system.perspective.print("Error reading single device status for %s: %s" % (tagPath, e)) return "Offline" # ====================================================== # Device Mapping Builder # ====================================================== global_device_mapping = {} def build_device_mapping(full_tag_path): """ Builds global_device_mapping for devices under the same PLC and parent device. Adds support for: - Chute FIOM devices (e.g. S03_CH109_FIOM_1 when clicking S03_CH109) - Shared JR and PE devices used by multiple chutes (e.g. S03_1_JR1, S03_1_LRPE1) """ system.perspective.print(full_tag_path) global global_device_mapping global_device_mapping.clear() try: path_parts = full_tag_path.split("/") plc_name = path_parts[1] if len(path_parts) > 1 else path_parts[0] clicked_name = path_parts[-1] if len(path_parts) > 0 else "" # --- Clean clicked name --- if "_VFD" in clicked_name: clicked_name = clicked_name.split("_VFD")[0] project_name = system.util.getProjectName() base_path = ( os.getcwd().replace("\\", "/") + "/data/projects/" + project_name + "/com.inductiveautomation.perspective/Views/autStand/Detailed_Views/MCM-Views" ) if not os.path.exists(base_path): system.perspective.print("Path not found: " + base_path) return {} # --- Detect if this is a Chute --- is_chute = "/CHUTE/" in full_tag_path.upper() for view_folder in os.listdir(base_path): json_file = os.path.join(base_path, view_folder, "view.json") if not os.path.isfile(json_file): continue try: with open(json_file, "r") as fh: view_json = json.load(fh) except Exception: continue root_children = (view_json.get("root") or {}).get("children") or [] if not root_children: continue container = root_children[0] children = container.get("children") or [] for child in children: props = child.get("props") or {} params = props.get("params") or {} tag_props = params.get("tagProps") if isinstance(tag_props, list) and len(tag_props) > 0: tag_prop = str(tag_props[0]) parts = tag_prop.split("/") if len(parts) > 1 and parts[1] == plc_name: dev_name = parts[-1] if len(parts) > 3 and parts[-2] == clicked_name: dev_name = clicked_name + "_" + parts[-1] else: dev_name = parts[-1] prefix = clicked_name + "_" # === 🟢 NEW: Chute FIOM match === if is_chute and dev_name.startswith(clicked_name + "_"): global_device_mapping[dev_name] = { "tagPath": tag_prop, "zone": view_folder } continue # === Default inclusion === if dev_name.startswith(prefix) or (len(parts) > 3 and parts[-2] == clicked_name): global_device_mapping[dev_name] = { "tagPath": tag_prop, "zone": view_folder } # === Special Case: JR Buttons === elif "/JR/" in tag_prop.upper(): try: jr_parts = tag_prop.split("/JR/") if len(jr_parts) > 1: sub_path = jr_parts[1] if sub_path.startswith(clicked_name + "_JR"): dev_name = sub_path.split("/")[0] global_device_mapping[dev_name] = { "tagPath": tag_prop, "zone": view_folder } except: pass shared_jr_pe_map = { "S03_CH101": ["S03_1_JR1", "S03_1_LRPE1"], "S03_CH103": ["S03_1_JR1", "S03_1_LRPE1"], "S03_CH105": ["S03_1_JR1", "S03_1_LRPE1"], "S03_CH107": ["S03_1_JR3", "S03_1_LRPE3"], "S03_CH108": ["S03_1_JR4", "S03_1_LRPE4"], "S03_CH109": ["S03_1_JR3", "S03_1_LRPE3"], "S03_CH110": ["S03_1_JR4", "S03_1_LRPE4"], "S03_CH111": ["S03_1_JR3", "S03_1_LRPE3"], "S03_CH112": ["S03_1_JR2", "S03_1_LRPE2"], "S03_CH113": ["S03_1_JR5", "S03_1_LRPE5"], "S03_CH114": ["S03_1_JR6", "S03_1_LRPE6"], "S03_CH115": ["S03_1_JR5", "S03_1_LRPE5"], "S03_CH116": ["S03_1_JR6", "S03_1_LRPE6"], "S03_CH117": ["S03_1_JR5", "S03_1_LRPE5"], "S03_CH118": ["S03_1_JR6", "S03_1_LRPE6"], "S03_CH119": ["S03_1_JR7", "S03_1_LRPE7"], "S03_CH120": ["S03_1_JR8", "S03_1_LRPE8"], "S03_CH121": ["S03_1_JR7", "S03_1_LRPE7"], "S03_CH122": ["S03_1_JR8", "S03_1_LRPE8"], "S03_CH123": ["S03_1_JR7", "S03_1_LRPE7"], "S03_CH124": ["S03_1_JR8", "S03_1_LRPE8"], } shared_fiom_map = { "NCS1_1": ["S03_1_FIOM_5", "S03_1_FIOM_9", "S03_1_FIOM_1", "S03_1_FIOM_2","S03_1_FIOM_3","S03_1_FIOM_4", "S03_1_FIOM_6","S03_1_FIOM_7", "S03_1_FIOM_8"], } if clicked_name in shared_jr_pe_map: extra_devices = shared_jr_pe_map[clicked_name] for dev in extra_devices: try: # Base tag (for PE) base_tag = "System/MCM02/Station/Chute_JR/" + dev # JR subtag (for JR button) jr_tag = base_tag + "/JR" if dev.endswith("JR1") else base_tag for tag_candidate in [base_tag, jr_tag]: global_device_mapping[dev] = { "tagPath": tag_candidate, "zone": "Chute_JR" } except Exception as ex: system.perspective.print("Error adding JR/PE for {}: {}".format(clicked_name, ex)) if clicked_name in shared_fiom_map: for dev in shared_fiom_map[clicked_name]: tag_path = "System/{}/IO_Block/FIO/{}".format(plc_name, dev) global_device_mapping[dev] = { "tagPath": tag_path, "zone": "FIO" } return global_device_mapping except Exception as e: whid = "unknown" try: whid = system.tag.readBlocking("Configuration/FC")[0].value except: pass logger = system.util.getLogger("%s-build_device_mapping" % whid) exc_type, exc_obj, tb = sys.exc_info() logger.error("Error at line %s: %s" % (tb.tb_lineno, exc_obj)) return {} # ====================================================== # Device Table Builder # ====================================================== def build_device_table(self): rows = [] try: for dev_name, info in global_device_mapping.items(): tagPath = info.get("tagPath", "") if not tagPath: continue provider = "[" + self.session.custom.fc + "_SCADA_TAG_PROVIDER]" rows.extend(read_device_status(tagPath, provider, dev_name)) return rows except Exception as e: system.perspective.print("Error building device table: %s" % e) return [] # ====================================================== # Get All Tags for Clicked Device # ====================================================== def getAllTags(self, tagPath, section="all"): """ Reads all tags under a UDT instance (recursively) and returns a list of dictionaries: [ {"Name": "State", "OPC Path": "System/MCM01/...", "Value": "Running"}, ... ] """ rows = [] try: providerPath = "[" + self.session.custom.fc + "_SCADA_TAG_PROVIDER]" driveFolderName = "Drive" # === Utility: read a single atomic tag === def readSingleTag(path, prefix=""): try: result = system.tag.readBlocking([providerPath + path])[0] value = str(result.value) if result.quality.isGood() else "Offline" except: value = "Offline" displayName = prefix + path.split("/")[-1] if prefix else path.split("/")[-1] rows.append({ "Name": displayName, "OPC Path": path, "Value": value }) # === Utility: recursive browse === def browseRecursive(basePath, prefix=""): children = system.tag.browse(providerPath + basePath).getResults() for child in children: tagType = str(child.get("tagType", "")) name = str(child.get("name", "")) fullPath = str(child.get("fullPath", "")) if fullPath.startswith("[") and "]" in fullPath: fullPath = fullPath.split("]", 1)[1] # --- Conveyor filter (skip Drive folder) --- if section == "conveyor" and name == driveFolderName: continue if tagType == "Folder": # --- Skip JR subfolder if current device is LRPE --- if name.upper() == "JR" and "_JR" in basePath.upper(): continue newPrefix = prefix + name + "/" if prefix else name + "/" browseRecursive(basePath + "/" + name, newPrefix) elif tagType == "AtomicTag": readSingleTag(fullPath, prefix) # === MAIN ENTRY POINT === # --- Case 1: VFD --- if section == "vfd": if tagPath.endswith("/Drive"): browseRecursive(tagPath) else: browseRecursive(tagPath + "/" + driveFolderName) # --- Case 2: Flat EN_ tags (Chutes) --- elif tagPath.upper().endswith("/EN"): parentPath = "/".join(tagPath.split("/")[:-1]) children = system.tag.browse(providerPath + parentPath).getResults() for child in children: tagType = str(child.get("tagType", "")) name = str(child.get("name", "")) if tagType == "AtomicTag" and name.upper().startswith("EN_"): fullPath = str(child.get("fullPath", "")) if fullPath.startswith("[") and "]" in fullPath: fullPath = fullPath.split("]", 1)[1] readSingleTag(fullPath) # --- Case 3: Single Sensors (PE/PRX) --- elif tagPath.upper().endswith(("PE1", "PE2", "PRX1", "PRX2")) and "/TPE/" not in tagPath.upper(): readSingleTag(tagPath) # --- Case 4: Default / Fallback --- else: browseResult = system.tag.browse(providerPath + tagPath).getResults() if not browseResult: # Possibly a struct-style UDT (like some TPEs) try: result = system.tag.readBlocking([providerPath + tagPath])[0] value = result.value # === Expand STRUCT === if isinstance(value, dict): def flattenStruct(struct, base=""): for k, v in struct.items(): newName = base + "/" + k if base else k if isinstance(v, dict): flattenStruct(v, newName) else: rows.append({ "Name": newName, "OPC Path": tagPath + "/" + newName, "Value": str(v) }) flattenStruct(value) else: # Not a struct, read normally readSingleTag(tagPath) except Exception as ex: system.perspective.print("Fallback read failed for {}: {}".format(tagPath, ex)) else: # Normal browse case browseRecursive(tagPath) return rows except Exception as e: system.perspective.print("Error in getAllTags: {}".format(e)) return []