224 lines
7.5 KiB
Plaintext
224 lines
7.5 KiB
Plaintext
import os, json, sys
|
|
|
|
global_device_mapping = {}
|
|
|
|
def build_device_mapping(full_tag_path):
|
|
"""
|
|
Builds global_device_mapping for devices that:
|
|
- Belong to the same PLC (index 1)
|
|
- Are children of the clicked device (start with clicked_name + "_")
|
|
"""
|
|
global global_device_mapping
|
|
global_device_mapping.clear()
|
|
|
|
try:
|
|
# Parse PLC and clicked device
|
|
path_parts = full_tag_path.split("/")
|
|
plc_name = path_parts[1] if len(path_parts) > 1 else path_parts[0]
|
|
clicked_name = path_parts[-1] if len(path_parts) > 0 else ""
|
|
|
|
project_name = system.util.getProjectName()
|
|
base_path = (
|
|
os.getcwd().replace("\\", "/")
|
|
+ "/data/projects/"
|
|
+ project_name
|
|
+ "/com.inductiveautomation.perspective/Views/autStand/Detailed_Views/MCM-Views"
|
|
)
|
|
|
|
if not os.path.exists(base_path):
|
|
system.perspective.print("Path not found: " + base_path)
|
|
return {}
|
|
|
|
# loop through all view folders
|
|
for view_folder in os.listdir(base_path):
|
|
json_file = os.path.join(base_path, view_folder, "view.json")
|
|
if not os.path.isfile(json_file):
|
|
continue
|
|
|
|
try:
|
|
with open(json_file, "r") as fh:
|
|
view_json = json.load(fh)
|
|
except Exception:
|
|
continue
|
|
|
|
# go one level deeper: root -> children[0] (coordinateContainer) -> its children
|
|
root_children = (view_json.get("root") or {}).get("children") or []
|
|
if not root_children:
|
|
continue
|
|
|
|
container = root_children[0]
|
|
children = container.get("children") or []
|
|
|
|
for child in children:
|
|
props = child.get("props") or {}
|
|
params = props.get("params") or {}
|
|
tag_props = params.get("tagProps")
|
|
|
|
if isinstance(tag_props, list) and len(tag_props) > 0:
|
|
tag_prop = str(tag_props[0])
|
|
parts = tag_prop.split("/")
|
|
|
|
if len(parts) > 1 and parts[1] == plc_name:
|
|
dev_name = parts[-1]
|
|
|
|
# ONLY include devices that are children of clicked_name
|
|
prefix = clicked_name + "_"
|
|
if dev_name.startswith(prefix):
|
|
global_device_mapping[dev_name] = {
|
|
"tagPath": tag_prop,
|
|
"zone": view_folder
|
|
}
|
|
|
|
return global_device_mapping
|
|
|
|
except Exception as e:
|
|
whid = "unknown"
|
|
try:
|
|
whid = system.tag.readBlocking("Configuration/FC")[0].value
|
|
except:
|
|
pass
|
|
logger = system.util.getLogger("%s-build_device_mapping" % whid)
|
|
exc_type, exc_obj, tb = sys.exc_info()
|
|
logger.error("Error at line %s: %s" % (tb.tb_lineno, exc_obj))
|
|
return {}
|
|
|
|
def build_device_table(self):
|
|
"""
|
|
Converts global_device_mapping into a dataset:
|
|
Columns: Device, Status
|
|
Reads each tag value, falls back to 'Unknown' if error/null.
|
|
"""
|
|
headers = ["Device", "Status"]
|
|
rows = []
|
|
state_mappings = {
|
|
0: "Closed",
|
|
1: "Actuated",
|
|
2: "Communication Faulted",
|
|
3: "Conveyor Running In Maintenance Mode",
|
|
4: "Disabled",
|
|
5: "Disconnected",
|
|
6: "Stopped",
|
|
7: "Enabled Not Running",
|
|
8: "Encoder Fault",
|
|
9: "Energy Management",
|
|
10: "ESTOP Was Actuated",
|
|
11: "EStopped",
|
|
12: "EStopped Locally",
|
|
13: "Extended Faulted",
|
|
14: "Full",
|
|
15: "Gaylord Start Pressed",
|
|
16: "Jam Fault",
|
|
17: "Jammed",
|
|
18: "Loading Allowed",
|
|
19: "Loading Not Allowed",
|
|
20: "Low Air Pressure Fault Was Present",
|
|
21: "Maintenance Mode",
|
|
22: "Conveyor Stopped In Maintenance Mode",
|
|
23: "Motor Faulted",
|
|
24: "Motor Was Faulted",
|
|
25: "Normal",
|
|
26: "Off Inactive",
|
|
27: "Open",
|
|
28: "PLC Ready To Run",
|
|
29: "Package Release Pressed",
|
|
30: "Power Branch Was Faulted",
|
|
31: "Pressed",
|
|
32: "Ready To Receive",
|
|
33: "Running",
|
|
34: "Started",
|
|
35: "Stopped",
|
|
36: "System Started",
|
|
37: "Unknown",
|
|
38: "VFD Fault",
|
|
39: "Conveyor Running In Power Saving Mode",
|
|
40: "Conveyor Jogging In Maintenance Mode",
|
|
41: "VFD Reset Required",
|
|
42: "Jam Reset Push Button Pressed",
|
|
43: "Start Push Button Pressed",
|
|
44: "Stop Push Button Pressed",
|
|
45: "No Container",
|
|
46: "Ready To Be Enabled",
|
|
47: "Half Full",
|
|
48: "Enabled",
|
|
49: "Tipper Faulted"
|
|
}
|
|
|
|
try:
|
|
for dev_name, info in global_device_mapping.items():
|
|
tagPath = info.get("tagPath", "")
|
|
status_value = ""
|
|
provider = "[" + self.session.custom.fc + "_SCADA_TAG_PROVIDER]"
|
|
path = provider + tagPath + "/STATE"
|
|
|
|
if tagPath:
|
|
try:
|
|
result = system.tag.readBlocking([path])[0]
|
|
status_value = state_mappings.get(result.value, "Unknown")
|
|
except:
|
|
status_value = "Unknown"
|
|
|
|
rows.append([dev_name, status_value])
|
|
|
|
return system.dataset.toDataSet(headers, rows)
|
|
|
|
except Exception as e:
|
|
system.perspective.print("Error building device table: %s" % e)
|
|
return system.dataset.toDataSet(headers, [])
|
|
|
|
def getAllTags(self, tagPath):
|
|
"""
|
|
Reads all tags under a UDT instance (recursively) and returns a dataset.
|
|
|
|
Args:
|
|
tagPath (str): Full path to the clicked device instance
|
|
(e.g., System/MCM01/Photoeyes/TPE/PS3_1_TPE1)
|
|
|
|
Returns:
|
|
system.dataset: Dataset with columns ["Name", "OPC Path", "Value"]
|
|
"""
|
|
headers = ["Name", "OPC Path", "Value"]
|
|
rows = []
|
|
|
|
try:
|
|
providerPath = "[" + self.session.custom.fc + "_SCADA_TAG_PROVIDER]"
|
|
|
|
def browseRecursive(basePath, prefix=""):
|
|
children = system.tag.browse(providerPath + basePath, filter={}).getResults()
|
|
for child in children:
|
|
tagType = str(child.get("tagType", ""))
|
|
name = str(child.get("name", ""))
|
|
fullPath = str(child.get("fullPath", ""))
|
|
|
|
# Strip provider name
|
|
if fullPath.startswith("[") and "]" in fullPath:
|
|
fullPath = fullPath.split("]", 1)[1]
|
|
|
|
if tagType == "AtomicTag":
|
|
value = None
|
|
try:
|
|
readPath = providerPath + fullPath
|
|
result = system.tag.readBlocking([readPath])[0]
|
|
|
|
if result.quality.isGood():
|
|
value = str(result.value)
|
|
else:
|
|
value = "Unknown"
|
|
except:
|
|
value = "Unknown"
|
|
|
|
# Use prefix/folder style path if inside folder
|
|
displayName = prefix + name if prefix else name
|
|
rows.append([displayName, fullPath, value])
|
|
|
|
elif tagType == "Folder":
|
|
# Dive deeper into folder
|
|
newPrefix = prefix + name + "/"
|
|
browseRecursive(basePath + "/" + name, prefix=newPrefix)
|
|
|
|
# Start recursion
|
|
browseRecursive(tagPath)
|
|
|
|
return system.dataset.toDataSet(headers, rows)
|
|
|
|
except Exception as e:
|
|
system.perspective.print("Error in getAllTags: {}".format(e)) |