542 lines
16 KiB
Plaintext
542 lines
16 KiB
Plaintext
import os, json, sys
|
|
|
|
# ======================================================
|
|
# Helper Function: State Resolver
|
|
# ======================================================
|
|
def get_device_state(value, tagPath):
|
|
up = tagPath.upper()
|
|
|
|
# === Base state dictionary ===
|
|
state_mappings = {
|
|
0: "Closed",
|
|
1: "Actuated",
|
|
2: "Communication Faulted",
|
|
3: "Conveyor Running In Maintenance Mode",
|
|
4: "Disabled",
|
|
5: "Disconnected",
|
|
6: "Stopped",
|
|
7: "Enabled Not Running",
|
|
8: "Encoder Fault",
|
|
9: "Energy Management",
|
|
10: "ESTOP Was Actuated",
|
|
11: "EStopped",
|
|
12: "EStopped Locally",
|
|
13: "Extended Faulted",
|
|
14: "Full",
|
|
15: "Gaylord Start Pressed",
|
|
16: "Jam Fault",
|
|
17: "Jammed",
|
|
18: "Loading Allowed",
|
|
19: "Loading Not Allowed",
|
|
20: "Low Air Pressure Fault Was Present",
|
|
21: "Maintenance Mode",
|
|
22: "Conveyor Stopped In Maintenance Mode",
|
|
23: "Motor Faulted",
|
|
24: "Motor Was Faulted",
|
|
25: "Normal",
|
|
26: "Off Inactive",
|
|
27: "Open",
|
|
28: "PLC Ready To Run",
|
|
29: "Package Release Pressed",
|
|
30: "Power Branch Was Faulted",
|
|
31: "Pressed",
|
|
32: "Ready To Receive",
|
|
33: "Running",
|
|
34: "Started",
|
|
35: "Stopped",
|
|
36: "System Started",
|
|
37: "Unknown",
|
|
38: "VFD Fault",
|
|
39: "Conveyor Running In Power Saving Mode",
|
|
40: "Conveyor Jogging In Maintenance Mode",
|
|
41: "VFD Reset Required",
|
|
42: "Jam Reset Push Button Pressed",
|
|
43: "Start Push Button Pressed",
|
|
44: "Stop Push Button Pressed",
|
|
45: "No Container",
|
|
46: "Ready To Be Enabled",
|
|
47: "Half Full",
|
|
48: "Enabled",
|
|
49: "Tipper Faulted",
|
|
50: "OK",
|
|
51: "Disconnected",
|
|
52: "Faulted",
|
|
53: "Faulted/Disconnect",
|
|
54: "Diverting"
|
|
}
|
|
|
|
# === TPE (Tracking Photoeye) ===
|
|
if "/TPE/" in up:
|
|
if value == 0:
|
|
return "Blocked"
|
|
elif value == 27:
|
|
return "Clear"
|
|
elif value == 17:
|
|
return "Jammed"
|
|
else:
|
|
return state_mappings.get(value, "Offline")
|
|
|
|
# === Single Photoeyes (PE1, PE2) ===
|
|
if up.endswith(("PE1", "PE2")):
|
|
if value == 4:
|
|
return "Clear"
|
|
else:
|
|
return "Blocked"
|
|
|
|
# === Prox Sensors (PRX1, PRX2) ===
|
|
if up.endswith(("PRX1", "PRX2")):
|
|
if value:
|
|
return "Inactive"
|
|
else:
|
|
return "Actuated"
|
|
|
|
# === Beacons (BCN) ===
|
|
if "/BEACON" in up:
|
|
if value == 0:
|
|
return "Off"
|
|
elif value == 1:
|
|
return "Cleared / Reset Required"
|
|
else:
|
|
return "Active"
|
|
|
|
# === Default ===
|
|
return state_mappings.get(value, "Offline")
|
|
|
|
|
|
# ======================================================
|
|
# Helper Function: Read One Device (multi or single state)
|
|
# ======================================================
|
|
def read_device_status(tagPath, provider, dev_name):
|
|
"""
|
|
Reads the appropriate state tag(s) for a given device and returns
|
|
a list of {Device, Status} dictionaries.
|
|
Handles multi-state (SS), VFD, PRX, PE, EN, etc.
|
|
"""
|
|
rows = []
|
|
try:
|
|
up = tagPath.upper()
|
|
|
|
# === Case 0: SS (Start/Stop Station) ===
|
|
if up.endswith("SS") or "/SS/" in up:
|
|
for sub in ("Start", "Stop"):
|
|
sub_path = provider + tagPath + "/" + sub + "/State"
|
|
try:
|
|
result = system.tag.readBlocking([sub_path])[0]
|
|
if result.quality.isGood():
|
|
status_value = get_device_state(result.value, tagPath)
|
|
else:
|
|
status_value = "Offline"
|
|
except:
|
|
status_value = "Offline"
|
|
|
|
rows.append({
|
|
"Device": "{} ({})".format(dev_name, sub),
|
|
"Status": status_value
|
|
})
|
|
return rows # handled fully
|
|
|
|
# === Case 1: VFD / Conveyor ===
|
|
if "/VFD/" in up:
|
|
path = provider + tagPath + "/Drive/Lenze"
|
|
|
|
# === Case 2: Chute sensors (PE / PRX) ===
|
|
elif up.endswith(("PE1", "PE2", "PRX1", "PRX2")) and "/TPE/" not in up:
|
|
path = provider + tagPath
|
|
|
|
# === Case 3: Chute EN ===
|
|
elif up.endswith("EN") and "/CHUTE/" in up:
|
|
path = provider + tagPath + "_State"
|
|
|
|
# === Case 4: Default ===
|
|
else:
|
|
path = provider + tagPath + "/State"
|
|
|
|
try:
|
|
result = system.tag.readBlocking([path])[0]
|
|
if result.quality.isGood():
|
|
status_value = get_device_state(result.value, tagPath)
|
|
else:
|
|
status_value = "Offline"
|
|
except:
|
|
status_value = "Offline"
|
|
|
|
rows.append({
|
|
"Device": dev_name,
|
|
"Status": status_value
|
|
})
|
|
|
|
except Exception as e:
|
|
system.perspective.print("Error reading device status for %s: %s" % (dev_name, e))
|
|
|
|
return rows
|
|
|
|
|
|
# ======================================================
|
|
# Helper Function: Single Device Reader (for Docked Device View)
|
|
# ======================================================
|
|
def get_single_device_status(self, tagPath):
|
|
"""
|
|
Reads a single device tag (used for docked device views).
|
|
Returns a single readable status string (e.g. "Running", "Blocked", etc.)
|
|
"""
|
|
try:
|
|
up = tagPath.upper()
|
|
provider = "[" + self.session.custom.fc + "_SCADA_TAG_PROVIDER]"
|
|
|
|
if up.endswith("SS") or "/SS/" in up:
|
|
states = []
|
|
for sub in ("Start", "Stop"):
|
|
sub_path = provider + tagPath + "/" + sub + "/State"
|
|
try:
|
|
result = system.tag.readBlocking([sub_path])[0]
|
|
if result.quality.isGood():
|
|
states.append("{}: {}".format(sub, get_device_state(result.value, tagPath)))
|
|
else:
|
|
states.append("{}: Unknown".format(sub))
|
|
except:
|
|
states.append("{}: Unknown".format(sub))
|
|
return " | ".join(states)
|
|
|
|
# === VFD ===
|
|
if "/VFD/" in up:
|
|
path = provider + tagPath + "/Drive/Lenze"
|
|
# === Sensors ===
|
|
elif up.endswith(("PE1", "PE2", "PRX1", "PRX2")) and "/TPE/" not in up:
|
|
path = provider + tagPath
|
|
# === EN ===
|
|
elif up.endswith("EN") and "/CHUTE/" in up:
|
|
path = provider + tagPath + "_State"
|
|
else:
|
|
path = provider + tagPath + "/State"
|
|
|
|
result = system.tag.readBlocking([path])[0]
|
|
if result.quality.isGood():
|
|
return get_device_state(result.value, tagPath)
|
|
else:
|
|
return "Offline"
|
|
|
|
except Exception as e:
|
|
system.perspective.print("Error reading single device status for %s: %s" % (tagPath, e))
|
|
return "Offline"
|
|
|
|
|
|
# ======================================================
|
|
# Device Mapping Builder
|
|
# ======================================================
|
|
global_device_mapping = {}
|
|
|
|
def build_device_mapping(full_tag_path):
|
|
"""
|
|
Builds global_device_mapping for devices under the same PLC and parent device.
|
|
Adds support for:
|
|
- Chute FIOM devices (e.g. S03_CH109_FIOM_1 when clicking S03_CH109)
|
|
- Shared JR and PE devices used by multiple chutes (e.g. S03_1_JR1, S03_1_LRPE1)
|
|
"""
|
|
system.perspective.print(full_tag_path)
|
|
global global_device_mapping
|
|
global_device_mapping.clear()
|
|
|
|
try:
|
|
path_parts = full_tag_path.split("/")
|
|
plc_name = path_parts[1] if len(path_parts) > 1 else path_parts[0]
|
|
clicked_name = path_parts[-1] if len(path_parts) > 0 else ""
|
|
|
|
# --- Clean clicked name ---
|
|
if "_VFD" in clicked_name:
|
|
clicked_name = clicked_name.split("_VFD")[0]
|
|
|
|
project_name = system.util.getProjectName()
|
|
base_path = (
|
|
os.getcwd().replace("\\", "/")
|
|
+ "/data/projects/"
|
|
+ project_name
|
|
+ "/com.inductiveautomation.perspective/Views/autStand/Detailed_Views/MCM-Views"
|
|
)
|
|
|
|
if not os.path.exists(base_path):
|
|
system.perspective.print("Path not found: " + base_path)
|
|
return {}
|
|
|
|
# --- Detect if this is a Chute ---
|
|
is_chute = "/CHUTE/" in full_tag_path.upper()
|
|
|
|
for view_folder in os.listdir(base_path):
|
|
json_file = os.path.join(base_path, view_folder, "view.json")
|
|
if not os.path.isfile(json_file):
|
|
continue
|
|
|
|
try:
|
|
with open(json_file, "r") as fh:
|
|
view_json = json.load(fh)
|
|
except Exception:
|
|
continue
|
|
|
|
root_children = (view_json.get("root") or {}).get("children") or []
|
|
if not root_children:
|
|
continue
|
|
|
|
container = root_children[0]
|
|
children = container.get("children") or []
|
|
|
|
for child in children:
|
|
props = child.get("props") or {}
|
|
params = props.get("params") or {}
|
|
tag_props = params.get("tagProps")
|
|
|
|
if isinstance(tag_props, list) and len(tag_props) > 0:
|
|
tag_prop = str(tag_props[0])
|
|
parts = tag_prop.split("/")
|
|
|
|
if len(parts) > 1 and parts[1] == plc_name:
|
|
dev_name = parts[-1]
|
|
if len(parts) > 3 and parts[-2] == clicked_name:
|
|
dev_name = clicked_name + "_" + parts[-1]
|
|
else:
|
|
dev_name = parts[-1]
|
|
|
|
prefix = clicked_name + "_"
|
|
|
|
# === 🟢 NEW: Chute FIOM match ===
|
|
if is_chute and dev_name.startswith(clicked_name + "_"):
|
|
global_device_mapping[dev_name] = {
|
|
"tagPath": tag_prop,
|
|
"zone": view_folder
|
|
}
|
|
continue
|
|
|
|
# === Default inclusion ===
|
|
if dev_name.startswith(prefix) or (len(parts) > 3 and parts[-2] == clicked_name):
|
|
global_device_mapping[dev_name] = {
|
|
"tagPath": tag_prop,
|
|
"zone": view_folder
|
|
}
|
|
|
|
# === Special Case: JR Buttons ===
|
|
elif "/JR/" in tag_prop.upper():
|
|
try:
|
|
jr_parts = tag_prop.split("/JR/")
|
|
if len(jr_parts) > 1:
|
|
sub_path = jr_parts[1]
|
|
if sub_path.startswith(clicked_name + "_JR"):
|
|
dev_name = sub_path.split("/")[0]
|
|
global_device_mapping[dev_name] = {
|
|
"tagPath": tag_prop,
|
|
"zone": view_folder
|
|
}
|
|
except:
|
|
pass
|
|
|
|
shared_jr_pe_map = {
|
|
"S03_CH101": ["S03_1_JR1", "S03_1_LRPE1"],
|
|
"S03_CH103": ["S03_1_JR1", "S03_1_LRPE1"],
|
|
"S03_CH105": ["S03_1_JR1", "S03_1_LRPE1"],
|
|
"S03_CH107": ["S03_1_JR3", "S03_1_LRPE3"],
|
|
"S03_CH108": ["S03_1_JR4", "S03_1_LRPE4"],
|
|
"S03_CH109": ["S03_1_JR3", "S03_1_LRPE3"],
|
|
"S03_CH110": ["S03_1_JR4", "S03_1_LRPE4"],
|
|
"S03_CH111": ["S03_1_JR3", "S03_1_LRPE3"],
|
|
"S03_CH112": ["S03_1_JR2", "S03_1_LRPE2"],
|
|
"S03_CH113": ["S03_1_JR5", "S03_1_LRPE5"],
|
|
"S03_CH114": ["S03_1_JR6", "S03_1_LRPE6"],
|
|
"S03_CH115": ["S03_1_JR5", "S03_1_LRPE5"],
|
|
"S03_CH116": ["S03_1_JR6", "S03_1_LRPE6"],
|
|
"S03_CH117": ["S03_1_JR5", "S03_1_LRPE5"],
|
|
"S03_CH118": ["S03_1_JR6", "S03_1_LRPE6"],
|
|
"S03_CH119": ["S03_1_JR7", "S03_1_LRPE7"],
|
|
"S03_CH120": ["S03_1_JR8", "S03_1_LRPE8"],
|
|
"S03_CH121": ["S03_1_JR7", "S03_1_LRPE7"],
|
|
"S03_CH122": ["S03_1_JR8", "S03_1_LRPE8"],
|
|
"S03_CH123": ["S03_1_JR7", "S03_1_LRPE7"],
|
|
"S03_CH124": ["S03_1_JR8", "S03_1_LRPE8"],
|
|
}
|
|
shared_fiom_map = {
|
|
"NCS1_1": ["S03_1_FIOM_5", "S03_1_FIOM_9", "S03_1_FIOM_1", "S03_1_FIOM_2","S03_1_FIOM_3","S03_1_FIOM_4", "S03_1_FIOM_6","S03_1_FIOM_7", "S03_1_FIOM_8"],
|
|
}
|
|
|
|
if clicked_name in shared_jr_pe_map:
|
|
extra_devices = shared_jr_pe_map[clicked_name]
|
|
for dev in extra_devices:
|
|
try:
|
|
# Base tag (for PE)
|
|
base_tag = "System/MCM02/Station/Chute_JR/" + dev
|
|
# JR subtag (for JR button)
|
|
jr_tag = base_tag + "/JR" if dev.endswith("JR1") else base_tag
|
|
|
|
for tag_candidate in [base_tag, jr_tag]:
|
|
global_device_mapping[dev] = {
|
|
"tagPath": tag_candidate,
|
|
"zone": "Chute_JR"
|
|
}
|
|
except Exception as ex:
|
|
system.perspective.print("Error adding JR/PE for {}: {}".format(clicked_name, ex))
|
|
|
|
if clicked_name in shared_fiom_map:
|
|
for dev in shared_fiom_map[clicked_name]:
|
|
tag_path = "System/{}/IO_Block/FIO/{}".format(plc_name, dev)
|
|
global_device_mapping[dev] = {
|
|
"tagPath": tag_path,
|
|
"zone": "FIO"
|
|
}
|
|
|
|
return global_device_mapping
|
|
|
|
except Exception as e:
|
|
whid = "unknown"
|
|
try:
|
|
whid = system.tag.readBlocking("Configuration/FC")[0].value
|
|
except:
|
|
pass
|
|
logger = system.util.getLogger("%s-build_device_mapping" % whid)
|
|
exc_type, exc_obj, tb = sys.exc_info()
|
|
logger.error("Error at line %s: %s" % (tb.tb_lineno, exc_obj))
|
|
return {}
|
|
|
|
|
|
|
|
|
|
# ======================================================
|
|
# Device Table Builder
|
|
# ======================================================
|
|
def build_device_table(self):
|
|
rows = []
|
|
try:
|
|
for dev_name, info in global_device_mapping.items():
|
|
tagPath = info.get("tagPath", "")
|
|
if not tagPath:
|
|
continue
|
|
|
|
provider = "[" + self.session.custom.fc + "_SCADA_TAG_PROVIDER]"
|
|
rows.extend(read_device_status(tagPath, provider, dev_name))
|
|
|
|
return rows
|
|
|
|
except Exception as e:
|
|
system.perspective.print("Error building device table: %s" % e)
|
|
return []
|
|
|
|
# ======================================================
|
|
# Get All Tags for Clicked Device
|
|
# ======================================================
|
|
def getAllTags(self, tagPath, section="all"):
|
|
"""
|
|
Reads all tags under a UDT instance (recursively)
|
|
and returns a list of dictionaries:
|
|
[
|
|
{"Name": "State", "OPC Path": "System/MCM01/...", "Value": "Running"},
|
|
...
|
|
]
|
|
"""
|
|
rows = []
|
|
try:
|
|
providerPath = "[" + self.session.custom.fc + "_SCADA_TAG_PROVIDER]"
|
|
driveFolderName = "Drive"
|
|
|
|
# === Utility: read a single atomic tag ===
|
|
def readSingleTag(path, prefix=""):
|
|
try:
|
|
result = system.tag.readBlocking([providerPath + path])[0]
|
|
value = str(result.value) if result.quality.isGood() else "Offline"
|
|
except:
|
|
value = "Offline"
|
|
|
|
displayName = prefix + path.split("/")[-1] if prefix else path.split("/")[-1]
|
|
rows.append({
|
|
"Name": displayName,
|
|
"OPC Path": path,
|
|
"Value": value
|
|
})
|
|
|
|
# === Utility: recursive browse ===
|
|
def browseRecursive(basePath, prefix=""):
|
|
children = system.tag.browse(providerPath + basePath).getResults()
|
|
for child in children:
|
|
tagType = str(child.get("tagType", ""))
|
|
name = str(child.get("name", ""))
|
|
fullPath = str(child.get("fullPath", ""))
|
|
|
|
if fullPath.startswith("[") and "]" in fullPath:
|
|
fullPath = fullPath.split("]", 1)[1]
|
|
|
|
# --- Conveyor filter (skip Drive folder) ---
|
|
if section == "conveyor" and name == driveFolderName:
|
|
continue
|
|
|
|
if tagType == "Folder":
|
|
# --- Skip JR subfolder if current device is LRPE ---
|
|
if name.upper() == "JR" and "_JR" in basePath.upper():
|
|
continue
|
|
newPrefix = prefix + name + "/" if prefix else name + "/"
|
|
browseRecursive(basePath + "/" + name, newPrefix)
|
|
elif tagType == "AtomicTag":
|
|
readSingleTag(fullPath, prefix)
|
|
|
|
# === MAIN ENTRY POINT ===
|
|
|
|
# --- Case 1: VFD ---
|
|
if section == "vfd":
|
|
if tagPath.endswith("/Drive"):
|
|
browseRecursive(tagPath)
|
|
else:
|
|
browseRecursive(tagPath + "/" + driveFolderName)
|
|
|
|
# --- Case 2: Flat EN_ tags (Chutes) ---
|
|
elif tagPath.upper().endswith("/EN"):
|
|
parentPath = "/".join(tagPath.split("/")[:-1])
|
|
children = system.tag.browse(providerPath + parentPath).getResults()
|
|
for child in children:
|
|
tagType = str(child.get("tagType", ""))
|
|
name = str(child.get("name", ""))
|
|
if tagType == "AtomicTag" and name.upper().startswith("EN_"):
|
|
fullPath = str(child.get("fullPath", ""))
|
|
if fullPath.startswith("[") and "]" in fullPath:
|
|
fullPath = fullPath.split("]", 1)[1]
|
|
readSingleTag(fullPath)
|
|
|
|
# --- Case 3: Single Sensors (PE/PRX) ---
|
|
elif tagPath.upper().endswith(("PE1", "PE2", "PRX1", "PRX2")) and "/TPE/" not in tagPath.upper():
|
|
readSingleTag(tagPath)
|
|
|
|
# --- Case 4: Default / Fallback ---
|
|
else:
|
|
browseResult = system.tag.browse(providerPath + tagPath).getResults()
|
|
|
|
if not browseResult:
|
|
# Possibly a struct-style UDT (like some TPEs)
|
|
try:
|
|
result = system.tag.readBlocking([providerPath + tagPath])[0]
|
|
value = result.value
|
|
|
|
# === Expand STRUCT ===
|
|
if isinstance(value, dict):
|
|
|
|
def flattenStruct(struct, base=""):
|
|
for k, v in struct.items():
|
|
newName = base + "/" + k if base else k
|
|
if isinstance(v, dict):
|
|
flattenStruct(v, newName)
|
|
else:
|
|
rows.append({
|
|
"Name": newName,
|
|
"OPC Path": tagPath + "/" + newName,
|
|
"Value": str(v)
|
|
})
|
|
|
|
flattenStruct(value)
|
|
|
|
else:
|
|
# Not a struct, read normally
|
|
readSingleTag(tagPath)
|
|
|
|
except Exception as ex:
|
|
system.perspective.print("Fallback read failed for {}: {}".format(tagPath, ex))
|
|
|
|
else:
|
|
# Normal browse case
|
|
browseRecursive(tagPath)
|
|
|
|
return rows
|
|
|
|
except Exception as e:
|
|
system.perspective.print("Error in getAllTags: {}".format(e))
|
|
return []
|