309 lines
11 KiB
Python

import os, json, sys
global_device_mapping = {}
def build_device_mapping(full_tag_path):
"""
Builds global_device_mapping for devices that:
- Belong to the same PLC (index 1)
- Are children of the clicked device (start with clicked_name + "_")
"""
global global_device_mapping
global_device_mapping.clear()
try:
# Parse PLC and clicked device
path_parts = full_tag_path.split("/")
plc_name = path_parts[1] if len(path_parts) > 1 else path_parts[0]
clicked_name = path_parts[-1] if len(path_parts) > 0 else ""
if "_VFD" in clicked_name:
idx = clicked_name.find("_VFD")
if idx != -1:
clicked_name = clicked_name[:idx]
project_name = system.util.getProjectName()
base_path = (
os.getcwd().replace("\\", "/")
+ "/data/projects/"
+ project_name
+ "/com.inductiveautomation.perspective/Views/autStand/Detailed_Views/MCM-Views"
)
if not os.path.exists(base_path):
system.perspective.print("Path not found: " + base_path)
return {}
# loop through all view folders
for view_folder in os.listdir(base_path):
json_file = os.path.join(base_path, view_folder, "view.json")
if not os.path.isfile(json_file):
continue
try:
with open(json_file, "r") as fh:
view_json = json.load(fh)
except Exception:
continue
# go one level deeper: root -> children[0] (coordinateContainer) -> its children
root_children = (view_json.get("root") or {}).get("children") or []
if not root_children:
continue
container = root_children[0]
children = container.get("children") or []
for child in children:
props = child.get("props") or {}
params = props.get("params") or {}
tag_props = params.get("tagProps")
if isinstance(tag_props, list) and len(tag_props) > 0:
tag_prop = str(tag_props[0])
parts = tag_prop.split("/")
if len(parts) > 1 and parts[1] == plc_name:
dev_name = parts[-1]
if len(parts) > 3 and parts[-2] == clicked_name:
dev_name = clicked_name + "_" + parts[-1]
system.perspective.print(dev_name)
# ONLY include devices that are children of clicked_name
else:
dev_name = parts[-1]
prefix = clicked_name + "_"
if dev_name.startswith(prefix) or (len(parts) > 3 and parts[-2] == clicked_name):
global_device_mapping[dev_name] = {
"tagPath": tag_prop,
"zone": view_folder
}
return global_device_mapping
except Exception as e:
whid = "unknown"
try:
whid = system.tag.readBlocking("Configuration/FC")[0].value
except:
pass
logger = system.util.getLogger("%s-build_device_mapping" % whid)
exc_type, exc_obj, tb = sys.exc_info()
logger.error("Error at line %s: %s" % (tb.tb_lineno, exc_obj))
return {}
def build_device_table(self):
"""
Converts global_device_mapping into a list of dictionaries:
Keys: Device, Status
Reads each tag value, falls back to 'Unknown' if error/null.
"""
rows = []
state_mappings = {
0: "Closed",
1: "Actuated",
2: "Communication Faulted",
3: "Conveyor Running In Maintenance Mode",
4: "Disabled",
5: "Disconnected",
6: "Stopped",
7: "Enabled Not Running",
8: "Encoder Fault",
9: "Energy Management",
10: "ESTOP Was Actuated",
11: "EStopped",
12: "EStopped Locally",
13: "Extended Faulted",
14: "Full",
15: "Gaylord Start Pressed",
16: "Jam Fault",
17: "Jammed",
18: "Loading Allowed",
19: "Loading Not Allowed",
20: "Low Air Pressure Fault Was Present",
21: "Maintenance Mode",
22: "Conveyor Stopped In Maintenance Mode",
23: "Motor Faulted",
24: "Motor Was Faulted",
25: "Normal",
26: "Off Inactive",
27: "Open",
28: "PLC Ready To Run",
29: "Package Release Pressed",
30: "Power Branch Was Faulted",
31: "Pressed",
32: "Ready To Receive",
33: "Running",
34: "Started",
35: "Stopped",
36: "System Started",
37: "Unknown",
38: "VFD Fault",
39: "Conveyor Running In Power Saving Mode",
40: "Conveyor Jogging In Maintenance Mode",
41: "VFD Reset Required",
42: "Jam Reset Push Button Pressed",
43: "Start Push Button Pressed",
44: "Stop Push Button Pressed",
45: "No Container",
46: "Ready To Be Enabled",
47: "Half Full",
48: "Enabled",
49: "Tipper Faulted"
}
try:
for dev_name, info in global_device_mapping.items():
tagPath = info.get("tagPath", "")
status_value = ""
provider = "[" + self.session.custom.fc + "_SCADA_TAG_PROVIDER]"
path = provider + tagPath + "/STATE"
if tagPath:
try:
result = system.tag.readBlocking([path])[0]
status_value = state_mappings.get(result.value, "Unknown")
except:
status_value = "Unknown"
# Append as dictionary
rows.append({
'Device': dev_name,
'Status': status_value
})
return rows
except Exception as e:
system.perspective.print("Error building device table: %s" % e)
return [] # Return empty list on error
def getAllTags(self, tagPath, section="all"):
"""
Reads all tags under a UDT instance (recursively) and returns a list of dictionaries.
Supports:
- VFD (Drive folder)
- Conveyor (skips Drive)
- Chute (root + PE/PRX/EN tags)
- Single Photoeyes (PE1/PE2)
- Single Prox Sensors (PRX1/PRX2)
- Enable buttons (EN_Color, EN_State, EN_Priority)
- Tracking Photoeyes (TPE, handles both folder- and struct-style UDTs)
"""
rows = []
try:
providerPath = "[" + self.session.custom.fc + "_SCADA_TAG_PROVIDER]"
driveFolderName = "Drive"
# === Utility: read a single atomic tag ===
def readSingleTag(path, prefix=""):
try:
result = system.tag.readBlocking([providerPath + path])[0]
value = str(result.value) if result.quality.isGood() else "Unknown"
except:
value = "Unknown"
displayName = prefix + path.split("/")[-1] if prefix else path.split("/")[-1]
rows.append({
"Name": displayName,
"OPC Path": path,
"Value": value
})
# === Utility: recursive browse ===
def browseRecursive(basePath, prefix=""):
children = system.tag.browse(providerPath + basePath).getResults()
for child in children:
tagType = str(child.get("tagType", ""))
name = str(child.get("name", ""))
fullPath = str(child.get("fullPath", ""))
if fullPath.startswith("[") and "]" in fullPath:
fullPath = fullPath.split("]", 1)[1]
# --- Conveyor filter (skip Drive folder) ---
if section == "conveyor" and name == driveFolderName:
continue
if tagType == "Folder":
newPrefix = prefix + name + "/" if prefix else name + "/"
browseRecursive(basePath + "/" + name, newPrefix)
elif tagType == "AtomicTag":
readSingleTag(fullPath, prefix)
# === MAIN ENTRY POINT ===
if section == "vfd":
# Browse only inside Drive folder
drivePath = tagPath + "/" + driveFolderName
browseRecursive(drivePath)
elif tagPath.upper().endswith("/EN"):
# --- Handle flat EN_ tags ---
parentPath = "/".join(tagPath.split("/")[:-1])
children = system.tag.browse(providerPath + parentPath).getResults()
for child in children:
tagType = str(child.get("tagType", ""))
name = str(child.get("name", ""))
if tagType == "AtomicTag" and name.upper().startswith("EN_"):
fullPath = str(child.get("fullPath", ""))
if fullPath.startswith("[") and "]" in fullPath:
fullPath = fullPath.split("]", 1)[1]
readSingleTag(fullPath)
elif tagPath.upper().endswith(("PE1", "PE2", "PRX1", "PRX2")) and "/TPE/" not in tagPath.upper():
# --- Single sensors ---
readSingleTag(tagPath)
else:
# --- Default path ---
browseResult = system.tag.browse(providerPath + tagPath).getResults()
if not browseResult:
# Possibly a struct-style UDT (like some TPEs)
system.perspective.print("Empty browse for {}, checking struct value...".format(tagPath))
try:
result = system.tag.readBlocking([providerPath + tagPath])[0]
value = result.value
# If we got a STRUCT, expand it into sub-rows
if isinstance(value, dict):
system.perspective.print("Detected STRUCT value, expanding {}".format(tagPath))
def flattenStruct(struct, base=""):
for k, v in struct.items():
newName = base + "/" + k if base else k
if isinstance(v, dict):
flattenStruct(v, newName)
else:
rows.append({
"Name": newName,
"OPC Path": tagPath + "/" + newName,
"Value": str(v)
})
flattenStruct(value)
else:
# Not a struct, just read it normally
readSingleTag(tagPath)
except Exception as ex:
system.perspective.print("Fallback read failed for {}: {}".format(tagPath, ex))
else:
# Normal case — browse folder/UDT structure
browseRecursive(tagPath)
return rows
except Exception as e:
system.perspective.print("Error in getAllTags: {}".format(e))
return []