scada_device_layout/update_scada_names.py
2026-03-20 17:46:47 +04:00

1975 lines
88 KiB
Python

import json
import re
import os
import sys
# Base Paths
BASE_OLD = r"C:\Users\beka.makharadze\Desktop\MTN6\OldDetailedViews"
BASE_NEW = r"C:\Program Files\Inductive Automation\Ignition\data\projects\MTN6_SCADA\com.inductiveautomation.perspective\views\DetailedView"
# Binding Configurations
STATE_BINDING_TEMPLATE = {
"binding": {
"type": "tag",
"config": {
"mode": "indirect",
"tagPath": "[{fc}_SCADA_TAG_PROVIDER]{0}/State",
"references": {
"0": "{this.%s.name}",
"fc": "{session.custom.fc}"
},
"fallbackDelay": 2.5
},
"transforms": [
{
"expression": "coalesce({value},999)",
"type": "expression"
},
{
"inputType": "scalar",
"outputType": "scalar",
"mappings": [
{"input": 0, "output": "Closed"},
{"input": 1, "output": "Actuated"},
{"input": 2, "output": "Communication Faulted"},
{"input": 3, "output": "Running In Maintenance Mode"},
{"input": 4, "output": "Disabled"},
{"input": 5, "output": "Disconnected"},
{"input": 6, "output": "Stopped"},
{"input": 7, "output": "Enabled Not Running"},
{"input": 8, "output": "Encoder Fault"},
{"input": 9, "output": "Energy Management"},
{"input": 10, "output": "ESTOP Was Actuated"},
{"input": 11, "output": "EStopped"},
{"input": 12, "output": "EStopped Locally"},
{"input": 13, "output": "Extended Faulted"},
{"input": 14, "output": "Full"},
{"input": 15, "output": "Gaylord Start Pressed"},
{"input": 16, "output": "Jam Fault"},
{"input": 17, "output": "Jammed"},
{"input": 18, "output": "Loading Allowed"},
{"input": 19, "output": "Loading Not Allowed"},
{"input": 20, "output": "Low Air Pressure Fault Was Present"},
{"input": 21, "output": "Maintenance Mode"},
{"input": 22, "output": "Stopped In Maintenance Mode"},
{"input": 23, "output": "Motor Faulted"},
{"input": 24, "output": "Motor Was Faulted"},
{"input": 25, "output": "Normal"},
{"input": 26, "output": "Off Inactive"},
{"input": 27, "output": "Open"},
{"input": 28, "output": "PLC Ready To Run"},
{"input": 29, "output": "Package Release Pressed"},
{"input": 30, "output": "Power Branch Was Faulted"},
{"input": 31, "output": "Pressed"},
{"input": 32, "output": "Ready To Receive"},
{"input": 33, "output": "Running"},
{"input": 34, "output": "Started"},
{"input": 35, "output": "Stopped"},
{"input": 36, "output": "System Started"},
{"input": 37, "output": "Unknown"},
{"input": 38, "output": "VFD Fault"},
{"input": 39, "output": "In Power Saving Mode"},
{"input": 40, "output": "Jogging In Maintenance Mode"},
{"input": 41, "output": "VFD Reset Required"},
{"input": 42, "output": "Jam Reset Push Button Pressed"},
{"input": 43, "output": "Start Push Button Pressed"},
{"input": 44, "output": "Stop Push Button Pressed"},
{"input": 45, "output": "No Container"},
{"input": 46, "output": "Ready To Be Enabled"},
{"input": 47, "output": "Half Full"},
{"input": 48, "output": "Enabled"},
{"input": 49, "output": "Tipper Faulted"},
{"input": 50, "output": "Diverted"},
{"input": 51, "output": "Retracted"},
{"input": 52, "output": "Diverting"},
{"input": 53, "output": "Network Node Faulted (Field IO)"},
{"input": 66, "output": "Inch And Store Mode"}
],
"fallback": "Unknown",
"type": "map"
}
]
}
}
PRIORITY_BINDING_TEMPLATE = {
"binding": {
"type": "tag",
"config": {
"mode": "indirect",
"tagPath": "[{fc}_SCADA_TAG_PROVIDER]{0}/Priority",
"references": {
"0": "{this.%s.name}",
"fc": "{session.custom.fc}"
},
"fallbackDelay": 2.5
},
"transforms": [
{
"expression": "coalesce({value},0)",
"type": "expression"
},
{
"inputType": "scalar",
"outputType": "scalar",
"mappings": [
{"input": 0, "output": "No Active Alarms"},
{"input": 1, "output": "High"},
{"input": 2, "output": "Medium"},
{"input": 3, "output": "Low"},
{"input": 4, "output": "Diagnostic"}
],
"fallback": "No Active Alarms",
"type": "map"
}
]
}
}
COLOR_BINDING_TEMPLATE = {
"binding": {
"type": "tag",
"config": {
"mode": "indirect",
"tagPath": "[{fc}_SCADA_TAG_PROVIDER]{0}/Color",
"references": {
"0": "{this.%s.name}",
"fc": "{session.custom.fc}"
},
"fallbackDelay": 2.5
},
"transforms": [
{
"expression": "coalesce({value},999)",
"type": "expression"
},
{
"inputType": "scalar",
"outputType": "color",
"mappings": [
{"input": 0, "output": "#C2C2C2"},
{"input": 1, "output": "#FF0000"},
{"input": 2, "output": "#FFA500"},
{"input": 3, "output": "#0008FF"},
{"input": 4, "output": "#00FF00"},
{"input": 5, "output": "#FFF700"},
{"input": 6, "output": "#87CEEB"},
{"input": 7, "output": "#90EE90"},
{"input": 8, "output": "#964B00"},
{"input": 9, "output": "#FFFFFF"},
{"input": 10, "output": "#000000"},
{"input": 11, "output": "#8B0000"},
{"input": 12, "output": "#808080"},
{"input": 13, "output": "#8B8000"},
{"input": 14, "output": "#006400"},
{"input": 15, "output": "#90FF90"},
{"input": 16, "output": "#00008B"},
{"input": 17, "output": "#FF7276"},
{"input": 18, "output": "#556B2F"},
{"input": 19, "output": "#B43434"},
{"input": 20, "output": "#4682B4"},
{"input": 21, "output": "#FFD700"}
],
"fallback": "#000000",
"type": "map"
}
]
}
}
INDUCTION_COLOR_SCRIPT = """ if value is None:
return '#000000'
# Priority-based status determination (lower number = higher priority)
active_status = 'default'
# Highest-priority checks first
if value.get('awInduction', 1) == 0:
active_status = 'inactive'
elif value.get('Common_Error', False):
active_status = 'common_error'
elif value.get('In_Test_Mode', False):
active_status = 'test_mode'
elif value.get('Jammed', False):
active_status = 'jammed'
elif value.get('Energy_Saving', False):
active_status = 'energy_saving'
elif value.get('Running', False):
active_status = 'running'
elif value.get('Starting', False) or value.get('Stopping', False):
active_status = 'transitional'
elif value.get('Stopped', False) or value.get('Disabled', False):
active_status = 'inactive'
elif value.get('Blocked', False):
active_status = 'blocked'
# Dictionary-based switch (cleaner than elif chain)
color_map = {
'common_error': '#FF0000',
'inactive': '#C2C2C2',
'blocked': '#C2C2C2',
'running': '#00FF00',
'transitional': '#90FF90',
'jammed': '#FF8C00',
'energy_saving': '#87CEFA',
'test_mode': '#AC5F00',
'default': '#000000'
}
# Return color using dictionary lookup (switch-like behavior)
return color_map.get(active_status, '#000000')"""
INDUCTION_PRIORITY_SCRIPT = """
data = dict(value) if value else {}
if data.get('Common_Error') or data.get('Jammed') :
return "High"
elif data.get('Blocked'):
return "High"
elif data.get('Test_Mode'):
return "Medium"
elif data.get('Disabled'):
return "Low"
elif data.get('Starting') or data.get('Stopping') or data.get('Running') or data.get('Energy_Saving') or data.get('Stopped') or data.get('Disabled') == False:
return "No Active Alarms"
return "No Active Alarms" """
INDUCTION_STATE_SCRIPT = """ data = dict(value) if value else {}
if data.get('awInduction', 1) == 0:
return 'Not Running'
elif data.get('Common_Error'):
return 'Common Error'
elif data.get('Jammed'):
return 'Jammed'
elif data.get('Test_Mode'):
return 'In Test Mode'
elif data.get('Energy_Saving'):
return 'Energy Saving'
elif data.get('Running'):
return 'Running'
elif data.get('Starting'):
return 'Starting'
elif data.get('Stopping'):
return 'Stopping'
elif data.get('Stopped'):
return 'Stopped'
elif data.get('Disabled') == False:
return 'Enabled'
elif data.get('Blocked'):
return 'Blocked'
else:
return 'Not Running' """
CONVEYOR_COLOR_SCRIPT = """ if value is None:
return '#000000'
# Determine conveyor status (priority order)
active_status = 'default'
if value.get('E_Stop', False):
active_status = 'e_stop'
elif value.get('Technical_Fault', False):
active_status = 'technical_fault'
elif value.get('Operational_Fault', False):
active_status = 'operational_fault'
elif value.get('Dieback', False):
active_status = 'dieback'
elif value.get('Running', False):
active_status = 'running'
elif value.get('Energy_Saving', False):
active_status = 'energy_saving'
elif value.get('Stopped', False):
active_status = 'stopped'
elif value.get('awConveyor', 1) == 0:
active_status = 'stopped'
# Color mapping
color_map = {
'e_stop': '#FF0000', # Red — Emergency stop
'technical_fault': '#FF0000', # Red— Technical fault
'operational_fault': '#FF8C00',# Orange — Operational fault
'dieback': '#90FF90', # Light green — Dieback (line full)
'running': '#00FF00', # Green — Running
'energy_saving': '#87CEFA', # Light blue — Energy saving
'stopped': '#C2C2C2', # Dark gray — Inactive
'default': '#000000' # Black — Default
}
return color_map.get(active_status, '#000000')"""
CONVEYOR_PRIORITY_SCRIPT = """ data = dict(value) if value else {}
if data.get('E_Stop') or data.get('Technical_Fault') or data.get('Operational_Fault'):
return "High"
elif data.get('Running') or data.get('Energy_Saving') or data.get('Stopped') or data.get('Dieback'):
return "No Active Alarms"
return "No Active Alarms" """
CONVEYOR_STATE_SCRIPT = """
data = dict(value) if value else {}
if data.get('E_Stop'):
return 'Estopped'
elif data.get('Technical_Fault'):
return 'Faulted'
elif data.get('Operational_Fault'):
return 'Jammed'
elif data.get('Dieback'):
return 'Dieback'
elif data.get('Running'):
return 'Running'
elif data.get('Energy_Saving'):
return 'Energy Saving'
elif data.get('Stopped'):
return 'Stopped'
elif data.get('awConveyor', 1) == 0:
return 'Not Running'
else:
return 'Not Running'"""
# Chute scripts for Chute_{nnn} elements
CHUTE_COLOR_SCRIPT = """ data = dict(value) if value else {}
if data.get("Jammed"):
return "#FFA500"
elif data.get("Full"):
return "#0000FF"
elif data.get("Half_Full"):
return "#FFFF00"
elif data.get("No_Container"):
return "#C2C2C2"
elif data.get("Blocked_By_Operator"):
return "#C2C2C2"
elif data.get("Blocked_From_SCADA"):
return "#C2C2C2"
elif data.get("Disabled") == False:
return "#00FF00"
else:
return "#000000" """
CHUTE_PRIORITY_SCRIPT = """ data = dict(value) if value else {}
if data.get("Jammed"):
return "High"
elif data.get("Full") or data.get("Half_Full") or data.get("No_Container") or data.get("Blocked_From_SCADA") or data.get("Blocked_By_Operator"):
return "Low"
elif data.get("Disabled") == False:
return "No Active Alarms"
return "No Active Alarms" """
CHUTE_STATE_SCRIPT = """ data = dict(value) if value else {}
if data.get("Jammed"):
return "Jammed"
elif data.get("Full"):
return "Full"
elif data.get("Half_Full"):
return "Half Full"
elif data.get("No_Container"):
return "No Container"
elif data.get("Blocked_By_Operator"):
return "Blocked By Operator"
elif data.get("Blocked_From_SCADA"):
return "Blocked From SCADA"
elif data.get("Disabled") == False:
return "Enabled"
else:
return "Inactive" """
# Sorter scripts
SORTER_COLOR_SCRIPT = """ if value is None:
return '#000000'
active_status = 'default'
if value.get('awInduction', 1) == 0:
active_status = 'inactive'
elif value.get('Common_Error', False):
active_status = 'common_error'
elif value.get('In_Test_Mode', False):
active_status = 'test_mode'
elif value.get('Jammed', False):
active_status = 'jammed'
elif value.get('Energy_Saving', False):
active_status = 'energy_saving'
elif value.get('Running', False):
active_status = 'running'
elif value.get('Starting', False) or value.get('Stopping', False):
active_status = 'transitional'
elif value.get('Stopped', False) or value.get('Disabled', False):
active_status = 'inactive'
elif value.get('Blocked', False):
active_status = 'blocked'
color_map = {
'common_error': '#FF0000',
'inactive': '#C2C2C2',
'blocked': '#C2C2C2',
'running': '#00FF00',
'transitional': '#90FF90',
'jammed': '#FF8C00',
'energy_saving': '#87CEFA',
'test_mode': '#AC5F00',
'default': '#000000'
}
return color_map.get(active_status, '#000000') """
SORTER_PRIORITY_SCRIPT = """ data = dict(value) if value else {}
if data.get('Common_Error'):
return "High"
elif data.get('Test_Mode') or data.get('Discharge_Test_Mode') or data.get('Lamp_Test_Mode'):
return "Medium"
elif data.get('Disabled'):
return "Low"
elif data.get('Starting') or data.get('Stopping') or data.get('Running') or data.get('Energy_Saving') or data.get('Stopped'):
return "No Active Alarms"
return "No Active Alarms" """
SORTER_STATE_SCRIPT = """ data = dict(value) if value else {}
if data.get('wSorter', 1) == 0:
return 'Not Running'
if data.get('Common_Error'):
return 'Common Error'
elif data.get('Test_Mode'):
return 'Test Mode'
elif data.get('Discharge_Test_Mode'):
return 'Discharge Test Mode'
elif data.get('Lamp_Test_Mode'):
return 'Lamp Test Mode'
elif data.get('Energy_Saving'):
return 'Energy Saving'
elif data.get('Running'):
return 'Running'
elif data.get('Starting'):
return 'Starting'
elif data.get('Stopping'):
return 'Stopping'
elif data.get('Stopped'):
return 'Stopped'
elif data.get('Disabled'):
return 'Disabled'
elif data.get('Blocked'):
return 'Blocked'
return 'Not Running' """
INDUCTION_BINDING_TEMPLATE = {
"binding": {
"type": "tag",
"config": {
"mode": "indirect",
"tagPath": "[{fc}_SCADA_TAG_PROVIDER]{0}",
"references": {
"0": "{this.%s.name}",
"fc": "{session.custom.fc}"
},
"fallbackDelay": 2.5
},
"transforms": [
{
"code": "",
"type": "script"
}
]
}
}
DISPLAY_BINDING_TEMPLATE = {
"binding": {
"type": "expr",
"config": {
"expression": ""
},
"transforms": [
{
"inputType": "scalar",
"outputType": "scalar",
"mappings": [
{
"input": False,
"output": "none"
}
],
"fallback": "block",
"type": "map"
}
]
}
}
INDUCTION_DATA = {
"Induction_0": {
"d": "m 0,0 h 53.89 l -0.88,0.88 13.47,13.47 32.13,-32.13 -14.08,-14.08 -12.87,12.87 h -71.71 z",
"transform": "translate(1411.47,855.11) translate(-0.01,-0.05)"
},
"Induction_1": {
"d": "m 0,0 h 53.89 l -0.88,0.88 13.47,13.47 32.13,-32.13 -14.08,-14.08 -12.87,12.87 h -71.71 z",
"transform": "translate(1413.24,784.98) translate(-0.01,-0.05)"
},
"Induction_14": {
"d": "m 0,0 h -53.89 l 0.88,0.88 -13.47,13.47 -32.13,-32.13 14.08,-14.08 12.87,12.87 h 71.71 z",
"transform": "translate(1600.55,564.1) translate(-0.01,-0.05)"
},
"Induction_17": {
"d": "m 0,0 h -53.89 l 0.88,0.88 -13.47,13.47 -32.13,-32.13 14.08,-14.08 12.87,12.87 h 71.71 z",
"transform": "translate(1487.17,484.1) translate(-0.01,-0.05)"
}
}
def extract_mcm_id(folder_name):
"""
Extract MCM ID from folder name (e.g., "MCM04" from "MCM04 North Bulk Inbound...")
Uses first 5 characters or extracts MCM pattern (MCM followed by digits)
"""
# Try to extract MCM pattern (MCM followed by digits)
match = re.search(r'(MCM\d+)', folder_name, re.IGNORECASE)
if match:
return match.group(1).upper()
# Fallback: use first 5 characters
return folder_name[:5].upper()
def load_json(filepath):
try:
with open(filepath, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"Error decoding JSON from {filepath}: {e}")
return None
def build_tag_map(view_data):
tag_map = {}
def traverse(node):
if isinstance(node, dict):
name = node.get('meta', {}).get('name')
tag_props = node.get('props', {}).get('params', {}).get('tagProps')
if tag_props and isinstance(tag_props, list) and len(tag_props) > 0:
tag_path = tag_props[0]
if tag_path:
# Map based on the tag path leaf name (ID extraction)
# e.g. System/MCM01/Conveyor/VFD/UL10_2_VFD1 -> UL10_2_VFD1
parts = tag_path.split('/')
if parts:
key = parts[-1]
tag_map[key] = {
'name': tag_path,
}
if name and tag_props and isinstance(tag_props, list) and len(tag_props) > 0:
tag_map[name] = {
'name': tag_props[0],
}
if 'root' in node:
traverse(node['root'])
if 'children' in node and isinstance(node['children'], list):
for child in node['children']:
traverse(child)
elif isinstance(node, list):
for item in node:
traverse(item)
traverse(view_data)
return tag_map
def find_svg_component(node):
if isinstance(node, dict):
if node.get('type') == 'ia.shapes.svg':
return node
# Check children
if 'children' in node and isinstance(node['children'], list):
for child in node['children']:
found = find_svg_component(child)
if found: return found
# Check root
if 'root' in node:
found = find_svg_component(node['root'])
if found: return found
return None
def update_scada_svg(scada_view_data, tag_map, system_name):
count_updated = 0
prop_config = {}
# scada_view_data is now a DICT (Full View)
svg_comp = find_svg_component(scada_view_data)
if svg_comp:
# Clear existing propConfig to remove old/nested bindings
svg_comp['propConfig'] = {}
prop_config = svg_comp['propConfig']
if 'props' in svg_comp and 'elements' in svg_comp['props']:
traverse_elements(svg_comp['props']['elements'], "props.elements", prop_config, tag_map, is_top_level=True, system_name=system_name)
count_updated = len(prop_config) // 3
return count_updated
def traverse_elements(elements, current_path, prop_config, tag_map, is_top_level, system_name):
for i, el in enumerate(elements):
element_path = f"{current_path}[{i}]"
# Cleanup nested properties if they exist
if not is_top_level:
for key in ['state', 'priority', 'color']:
if key in el:
del el[key]
# Only process bindings for TOP LEVEL items
if is_top_level:
# Skip defs
if el.get('type') == 'defs' or el.get('name') == 'defs1':
continue
# Update Name Logic
name = el.get('name')
el_id = el.get('id')
identifier = el_id if el_id else name
if identifier:
# Photoeye
if re.search(r'_(TPE|PE|JPE|FPE|LPE|SPAU)\d*$', identifier):
target_name = identifier
new_name = f"System/{system_name}/PE/{target_name}"
if el.get('name') != new_name:
el['name'] = new_name
# EXTENDO
elif re.search(r'_EX\d+$', identifier):
target_name = identifier
new_name = f"System/{system_name}/Conveyor/EXTENDO/{target_name}"
if el.get('name') != new_name:
el['name'] = new_name
# VFD (handles both _VFD and _VFD1, _VFD2, etc.)
elif re.search(r'_VFD\d*$', identifier):
target_name = identifier
new_name = f"System/{system_name}/Conveyor/VFD/{target_name}"
if el.get('name') != new_name:
el['name'] = new_name
# Chute (Chute_001, Chute_002, etc.)
elif re.search(r'^Chute_\d+$', identifier):
target_name = identifier
new_name = f"System/SMC/Chute/{target_name}"
if el.get('name') != new_name:
el['name'] = new_name
# Sorter (any element with "sorter" in name/id)
elif 'sorter' in identifier.lower():
new_name = "System/SMC/Sorter"
if el.get('name') != new_name:
el['name'] = new_name
# FIOH (Field IO Hub) - _FIOH1, _FIOH2, etc.
elif re.search(r'_FIOH\d+$', identifier):
target_name = identifier
new_name = f"System/{system_name}/IO_BLOCK/HUB/{target_name}"
if el.get('name') != new_name:
el['name'] = new_name
# SIO (Serial IO) - _SIO1, _SIO2, etc.
elif re.search(r'_SIO\d+$', identifier):
target_name = identifier
new_name = f"System/{system_name}/IO_BLOCK/SIO/{target_name}"
if el.get('name') != new_name:
el['name'] = new_name
# Tag Map
else:
lookup_name = identifier
if identifier.endswith('_Assembly'):
lookup_name = identifier[:-9]
if lookup_name in tag_map:
tag_data = tag_map[lookup_name]
# Skip tag map update for PEs and VFDs (they're handled explicitly above)
if not re.search(r'_(TPE|PE|JPE|FPE|LPE|SPAU)\d*$', identifier) and not re.search(r'_VFD\d*$', identifier):
new_tag = tag_data['name']
if el.get('name') != new_tag:
el['name'] = new_tag
# Refresh name variable to get the updated value (after tag_map updates)
name = el.get('name')
# Determine check name (strip _Assembly for pattern matching)
check_name = identifier
if identifier.endswith('_Assembly'):
check_name = identifier[:-9]
# Convert Induction elements from use to path - DISABLED (keep original structure)
# if identifier in INDUCTION_DATA:
# el['type'] = 'path'
# el['d'] = INDUCTION_DATA[identifier]['d']
# el['transform'] = INDUCTION_DATA[identifier]['transform']
# el['fill'] = {"paint": "#ffffff", "opacity": "1"}
# el['stroke'] = {"paint": "#000000", "dasharray": "none", "opacity": "1", "width": "1"}
# for key in ['x', 'y', 'href']:
# if key in el:
# del key[key]
# 2. Add Bindings based on Type
# Helper to check pattern in both check_name (ID) and name
def matches_pattern(pattern):
"""Check if pattern matches in either check_name or name"""
return re.search(pattern, check_name) or (name and re.search(pattern, name))
def name_contains(substring):
"""Check if name contains a substring"""
return name and substring in name
def name_or_id_startswith(prefix):
"""Check if name or ID starts with prefix"""
name_part = name.split('/')[-1] if name else ""
return check_name.startswith(prefix) or name_part.startswith(prefix)
def name_or_id_endswith(suffix):
"""Check if name or ID ends with suffix"""
return check_name.endswith(suffix) or (name and name.endswith(suffix))
# Helper to create binding
def create_binding(template, subpath=""):
b = json.loads(json.dumps(template))
ref_key = list(b['binding']['config']['references'].keys())[0]
b['binding']['config']['references'][ref_key] = b['binding']['config']['references'][ref_key] % element_path
# If subpath provided, insert it into tagPath
# Template tagPath: "[{fc}_SCADA_TAG_PROVIDER]{0}/State" (or /Color, /Priority)
# We want to replace "/State" with "/Subpath/State" if subpath is "Start"
if subpath:
tp = b['binding']['config']['tagPath']
# Split by last slash
base, suffix = tp.rsplit('/', 1)
b['binding']['config']['tagPath'] = f"{base}/{subpath}/{suffix}"
return b
# Special FL_{n}CH_PE1 elements (starts with FL and ends with _1CH_PE1, _2CH_PE1, _3CH_PE1, etc.)
# Check both ID and name
is_fl_ch_pe1 = name_or_id_startswith('FL') and matches_pattern(r'_\d+CH_PE1$')
if is_fl_ch_pe1:
# Generate name2 by adding "J" before "PE1" in the name
name2 = name.replace('_PE1', '_JPE1') if name else ""
el['name2'] = name2
el['full'] = False
el['jammed'] = False
# Full binding (uses name)
prop_config[f"{element_path}.full"] = {
"binding": {
"type": "tag",
"config": {
"mode": "indirect",
"tagPath": "[{fc}_SCADA_TAG_PROVIDER]{0}/Full",
"references": {
"0": f"{{this.{element_path}.name}}",
"fc": "{session.custom.fc}"
},
"fallbackDelay": 2.5
},
"transforms": [
{
"expression": "coalesce({value},false)",
"type": "expression"
}
]
}
}
# Jammed binding (uses name2)
prop_config[f"{element_path}.jammed"] = {
"binding": {
"type": "tag",
"config": {
"mode": "indirect",
"tagPath": "[{fc}_SCADA_TAG_PROVIDER]{0}/Jammed",
"references": {
"0": f"{{this.{element_path}.name2}}",
"fc": "{session.custom.fc}"
},
"fallbackDelay": 2.5
},
"transforms": [
{
"expression": "coalesce({value},false)",
"type": "expression"
}
]
}
}
# Color tag binding with /Full path and expression transform
color_expr = f'''if(
!isGood({{value}}),
"#000000",
if({{this.{element_path}.full}} && {{this.{element_path}.jammed}},
"#FFA500",
if({{this.{element_path}.jammed}},
"#FFA500",
if({{this.{element_path}.full}} ,
"#0008FF",
"#FFFFFF"
)
)
)
)'''
prop_config[f"{element_path}.color"] = {
"binding": {
"type": "tag",
"config": {
"mode": "indirect",
"tagPath": "[{fc}_SCADA_TAG_PROVIDER]{0}/Full",
"references": {
"0": f"{{this.{element_path}.name}}",
"fc": "{session.custom.fc}"
},
"fallbackDelay": 2.5
},
"transforms": [
{
"expression": color_expr,
"type": "expression"
}
]
}
}
# Priority tag binding with /Full path and expression transform
priority_expr = f'''if(
!isGood({{value}}),
"No Active Alarms",
if({{this.{element_path}.full}} && {{this.{element_path}.jammed}},
"High",
if({{this.{element_path}.jammed}},
"High",
if({{this.{element_path}.full}} ,
"Low",
"No Active Alarms"
)
)
)
)'''
prop_config[f"{element_path}.priority"] = {
"binding": {
"type": "tag",
"config": {
"mode": "indirect",
"tagPath": "[{fc}_SCADA_TAG_PROVIDER]{0}/Full",
"references": {
"0": f"{{this.{element_path}.name}}",
"fc": "{session.custom.fc}"
},
"fallbackDelay": 2.5
},
"transforms": [
{
"expression": priority_expr,
"type": "expression"
}
]
}
}
# State tag binding with /Full path and expression transform
state_expr = f'''if(
!isGood({{value}}),
"Unknown",
if({{this.{element_path}.full}} && {{this.{element_path}.jammed}},
"Jammed and Full",
if({{this.{element_path}.jammed}},
"Jammed",
if({{this.{element_path}.full}} ,
"Full",
"Normal"
)
)
)
)'''
prop_config[f"{element_path}.state"] = {
"binding": {
"type": "tag",
"config": {
"mode": "indirect",
"tagPath": "[{fc}_SCADA_TAG_PROVIDER]{0}/Full",
"references": {
"0": f"{{this.{element_path}.name}}",
"fc": "{session.custom.fc}"
},
"fallbackDelay": 2.5
},
"transforms": [
{
"expression": state_expr,
"type": "expression"
}
]
}
}
el['state'] = "value"
el['priority'] = "value"
el['color'] = "value"
elif matches_pattern(r'_SS\d+$'):
# Special SS Buttons: Start/Stop properties
# Start
prop_config[f"{element_path}.start_state"] = create_binding(STATE_BINDING_TEMPLATE, "Start")
prop_config[f"{element_path}.start_priority"] = create_binding(PRIORITY_BINDING_TEMPLATE, "Start")
prop_config[f"{element_path}.start_color"] = create_binding(COLOR_BINDING_TEMPLATE, "Start")
# Stop
prop_config[f"{element_path}.stop_state"] = create_binding(STATE_BINDING_TEMPLATE, "Stop")
prop_config[f"{element_path}.stop_priority"] = create_binding(PRIORITY_BINDING_TEMPLATE, "Stop")
prop_config[f"{element_path}.stop_color"] = create_binding(COLOR_BINDING_TEMPLATE, "Stop")
add_display_binding("show_buttons", is_ss=True)
el['start_state'] = "Normal"
el['start_priority'] = "No Active Alarms"
el['start_color'] = "#006400"
el['stop_state'] = "Normal"
el['stop_priority'] = "No Active Alarms"
el['stop_color'] = "#B43434"
elif matches_pattern(r'_S\d+$') and name_contains('/DIV/'):
# S buttons with DIV in name -> /Enable/ tagPath
prop_config[f"{element_path}.state"] = create_binding(STATE_BINDING_TEMPLATE, "Enable")
prop_config[f"{element_path}.priority"] = create_binding(PRIORITY_BINDING_TEMPLATE, "Enable")
prop_config[f"{element_path}.color"] = create_binding(COLOR_BINDING_TEMPLATE, "Enable")
el['state'] = "value"
el['priority'] = "value"
el['color'] = "value"
elif matches_pattern(r'_S\d+$'):
# S buttons -> /Start/ tagPath
prop_config[f"{element_path}.state"] = create_binding(STATE_BINDING_TEMPLATE, "Start")
prop_config[f"{element_path}.priority"] = create_binding(PRIORITY_BINDING_TEMPLATE, "Start")
prop_config[f"{element_path}.color"] = create_binding(COLOR_BINDING_TEMPLATE, "Start")
add_display_binding("show_buttons", include_color=True)
el['state'] = "value"
el['priority'] = "value"
el['color'] = "value"
elif matches_pattern(r'_JR\d+$') and name_contains('Chute_JR'):
# JR buttons with Chute_JR in name -> /Chute_JR/ tagPath
prop_config[f"{element_path}.state"] = create_binding(STATE_BINDING_TEMPLATE, "Chute_JR")
prop_config[f"{element_path}.priority"] = create_binding(PRIORITY_BINDING_TEMPLATE, "Chute_JR")
prop_config[f"{element_path}.color"] = create_binding(COLOR_BINDING_TEMPLATE, "Chute_JR")
add_display_binding("show_buttons", include_color=True)
el['state'] = "value"
el['priority'] = "value"
el['color'] = "value"
elif matches_pattern(r'_JR\d+$'):
# JR buttons -> /JR/ tagPath
prop_config[f"{element_path}.state"] = create_binding(STATE_BINDING_TEMPLATE, "JR")
prop_config[f"{element_path}.priority"] = create_binding(PRIORITY_BINDING_TEMPLATE, "JR")
prop_config[f"{element_path}.color"] = create_binding(COLOR_BINDING_TEMPLATE, "JR")
add_display_binding("show_buttons", include_color=True)
el['state'] = "value"
el['priority'] = "value"
el['color'] = "value"
elif matches_pattern(r'_GS\d+$') and name_contains('/Chute/NC/'):
# GS buttons with /Chute/NC/ in name -> /Enable/ tagPath
prop_config[f"{element_path}.state"] = create_binding(STATE_BINDING_TEMPLATE, "Enable")
prop_config[f"{element_path}.priority"] = create_binding(PRIORITY_BINDING_TEMPLATE, "Enable")
prop_config[f"{element_path}.color"] = create_binding(COLOR_BINDING_TEMPLATE, "Enable")
el['state'] = "value"
el['priority'] = "value"
el['color'] = "value"
elif matches_pattern(r'_GS\d+$'):
# GS buttons -> /Commands/bBlockHost1 tagPath with custom mappings
# Color binding
prop_config[f"{element_path}.color"] = {
"binding": {
"type": "tag",
"config": {
"mode": "indirect",
"tagPath": "[{fc}_SCADA_TAG_PROVIDER]{0}/Commands/bBlockHost1",
"references": {
"0": f"{{this.{element_path}.name}}",
"fc": "{session.custom.fc}"
},
"fallbackDelay": 2.5
},
"transforms": [
{
"expression": "coalesce({value},\"err\")",
"type": "expression"
},
{
"inputType": "scalar",
"outputType": "color",
"mappings": [
{"input": True, "output": "#00FF00"},
{"input": False, "output": "#00A700"}
],
"fallback": "#000000",
"type": "map"
}
]
}
}
# Priority binding
prop_config[f"{element_path}.priority"] = {
"binding": {
"type": "tag",
"config": {
"mode": "indirect",
"tagPath": "[{fc}_SCADA_TAG_PROVIDER]{0}/Commands/bBlockHost1",
"references": {
"0": f"{{this.{element_path}.name}}",
"fc": "{session.custom.fc}"
},
"fallbackDelay": 2.5
},
"transforms": [
{
"expression": "coalesce({value},\"err\")",
"type": "expression"
},
{
"inputType": "scalar",
"outputType": "scalar",
"mappings": [
{"input": True, "output": "Low"},
{"input": False, "output": "No Active Alarms"}
],
"fallback": "No Active Alarms",
"type": "map"
}
]
}
}
# State binding
prop_config[f"{element_path}.state"] = {
"binding": {
"type": "tag",
"config": {
"mode": "indirect",
"tagPath": "[{fc}_SCADA_TAG_PROVIDER]{0}/Commands/bBlockHost1",
"references": {
"0": f"{{this.{element_path}.name}}",
"fc": "{session.custom.fc}"
},
"fallbackDelay": 2.5
},
"transforms": [
{
"expression": "coalesce({value},\"err\")",
"type": "expression"
},
{
"inputType": "scalar",
"outputType": "scalar",
"mappings": [
{"input": True, "output": "Enable Pressed"},
{"input": False, "output": "Enable Not Pressed"}
],
"fallback": "Unknown",
"type": "map"
}
]
}
}
el['state'] = "value"
el['priority'] = "value"
el['color'] = "value"
elif matches_pattern(r'_PR\d+$'):
# PR buttons -> /PR/ tagPath
prop_config[f"{element_path}.state"] = create_binding(STATE_BINDING_TEMPLATE, "PR")
prop_config[f"{element_path}.priority"] = create_binding(PRIORITY_BINDING_TEMPLATE, "PR")
prop_config[f"{element_path}.color"] = create_binding(COLOR_BINDING_TEMPLATE, "PR")
el['state'] = "value"
el['priority'] = "value"
el['color'] = "value"
elif matches_pattern(r'Induction_?\d+$'):
# Induction logic: Script Transforms on Base Tag Path
# Color
c_bind = json.loads(json.dumps(INDUCTION_BINDING_TEMPLATE))
ref_key = list(c_bind['binding']['config']['references'].keys())[0]
c_bind['binding']['config']['references'][ref_key] = c_bind['binding']['config']['references'][ref_key] % element_path
c_bind['binding']['transforms'][0]['code'] = INDUCTION_COLOR_SCRIPT
prop_config[f"{element_path}.color"] = c_bind
# Priority
p_bind = json.loads(json.dumps(INDUCTION_BINDING_TEMPLATE))
ref_key = list(p_bind['binding']['config']['references'].keys())[0]
p_bind['binding']['config']['references'][ref_key] = p_bind['binding']['config']['references'][ref_key] % element_path
p_bind['binding']['transforms'][0]['code'] = INDUCTION_PRIORITY_SCRIPT
prop_config[f"{element_path}.priority"] = p_bind
# State
s_bind = json.loads(json.dumps(INDUCTION_BINDING_TEMPLATE))
ref_key = list(s_bind['binding']['config']['references'].keys())[0]
s_bind['binding']['config']['references'][ref_key] = s_bind['binding']['config']['references'][ref_key] % element_path
s_bind['binding']['transforms'][0]['code'] = INDUCTION_STATE_SCRIPT
prop_config[f"{element_path}.state"] = s_bind
el['state'] = "value"
el['priority'] = "value"
el['color'] = "value"
elif (name_contains('/CMC/Conveyors/') or (tag_map.get(check_name) and '/CMC/Conveyors/' in tag_map[check_name]['name'])) and not matches_pattern(r'_(VFD|EX)\d*$'):
# Special Conveyor logic (System/CMC/Conveyors/...)
# Color
c_bind = json.loads(json.dumps(INDUCTION_BINDING_TEMPLATE))
ref_key = list(c_bind['binding']['config']['references'].keys())[0]
c_bind['binding']['config']['references'][ref_key] = c_bind['binding']['config']['references'][ref_key] % element_path
c_bind['binding']['transforms'][0]['code'] = CONVEYOR_COLOR_SCRIPT
prop_config[f"{element_path}.color"] = c_bind
# Priority
p_bind = json.loads(json.dumps(INDUCTION_BINDING_TEMPLATE))
ref_key = list(p_bind['binding']['config']['references'].keys())[0]
p_bind['binding']['config']['references'][ref_key] = p_bind['binding']['config']['references'][ref_key] % element_path
p_bind['binding']['transforms'][0]['code'] = CONVEYOR_PRIORITY_SCRIPT
prop_config[f"{element_path}.priority"] = p_bind
# State
s_bind = json.loads(json.dumps(INDUCTION_BINDING_TEMPLATE))
ref_key = list(s_bind['binding']['config']['references'].keys())[0]
s_bind['binding']['config']['references'][ref_key] = s_bind['binding']['config']['references'][ref_key] % element_path
s_bind['binding']['transforms'][0]['code'] = CONVEYOR_STATE_SCRIPT
prop_config[f"{element_path}.state"] = s_bind
el['state'] = "value"
el['priority'] = "value"
el['color'] = "value"
elif el_id and 'CH' in el_id.upper() and name and 'CH' in name.upper() and not re.search(r'_(FPE|JPE|TPE|PE|SPAU|LPE)\d+', el_id) and not re.search(r'^Chute_\d+$', el_id):
# CH elements where both ID and name contain "CH" (but ID has no PE pattern): chute bindings
# Excludes Chute_{nnn} elements which have their own handling
# Uses standard color expression coalesce({value},999)
prop_config[f"{element_path}.state"] = create_binding(STATE_BINDING_TEMPLATE)
prop_config[f"{element_path}.priority"] = create_binding(PRIORITY_BINDING_TEMPLATE)
prop_config[f"{element_path}.color"] = create_binding(COLOR_BINDING_TEMPLATE)
el['state'] = "value"
el['priority'] = "value"
el['color'] = "value"
elif el_id and 'CH' in el_id.upper() and name and (re.search(r'_(FPE|JPE|TPE|PE|SPAU|LPE)\d+', name) or '/PE/' in name) and not re.search(r'^Chute_\d+$', el_id):
# CH (Chute) elements with PE-related names
prop_config[f"{element_path}.state"] = create_binding(STATE_BINDING_TEMPLATE)
prop_config[f"{element_path}.priority"] = create_binding(PRIORITY_BINDING_TEMPLATE)
# If ID also has PE pattern, use coalesce({value},0); otherwise use standard coalesce({value},999)
if re.search(r'_(FPE|JPE|TPE|PE|SPAU|LPE)\d+', el_id):
c_bind = create_binding(COLOR_BINDING_TEMPLATE)
c_bind['binding']['transforms'][0]['expression'] = "coalesce({value},0)"
prop_config[f"{element_path}.color"] = c_bind
else:
prop_config[f"{element_path}.color"] = create_binding(COLOR_BINDING_TEMPLATE)
el['state'] = "value"
el['priority'] = "value"
el['color'] = "value"
elif matches_pattern(r'_(TPE|JPE|FPE|PE|SPAU|LPE)\d+$') or matches_pattern(r'_DPM\d+$'):
# PE and DPM: Color expression coalesce({value},0)
prop_config[f"{element_path}.state"] = create_binding(STATE_BINDING_TEMPLATE)
prop_config[f"{element_path}.priority"] = create_binding(PRIORITY_BINDING_TEMPLATE)
c_bind = create_binding(COLOR_BINDING_TEMPLATE)
c_bind['binding']['transforms'][0]['expression'] = "coalesce({value},0)"
prop_config[f"{element_path}.color"] = c_bind
el['state'] = "value"
el['priority'] = "value"
el['color'] = "value"
elif matches_pattern(r'Chute_\d+'):
# Chute elements: Script-based bindings with base tag path (no sub-property)
# Uses INDUCTION_BINDING_TEMPLATE which has tagPath without /State, /Priority, /Color
# Color
c_bind = json.loads(json.dumps(INDUCTION_BINDING_TEMPLATE))
ref_key = list(c_bind['binding']['config']['references'].keys())[0]
c_bind['binding']['config']['references'][ref_key] = c_bind['binding']['config']['references'][ref_key] % element_path
c_bind['binding']['transforms'][0]['code'] = CHUTE_COLOR_SCRIPT
prop_config[f"{element_path}.color"] = c_bind
# Priority
p_bind = json.loads(json.dumps(INDUCTION_BINDING_TEMPLATE))
ref_key = list(p_bind['binding']['config']['references'].keys())[0]
p_bind['binding']['config']['references'][ref_key] = p_bind['binding']['config']['references'][ref_key] % element_path
p_bind['binding']['transforms'][0]['code'] = CHUTE_PRIORITY_SCRIPT
prop_config[f"{element_path}.priority"] = p_bind
# State
s_bind = json.loads(json.dumps(INDUCTION_BINDING_TEMPLATE))
ref_key = list(s_bind['binding']['config']['references'].keys())[0]
s_bind['binding']['config']['references'][ref_key] = s_bind['binding']['config']['references'][ref_key] % element_path
s_bind['binding']['transforms'][0]['code'] = CHUTE_STATE_SCRIPT
prop_config[f"{element_path}.state"] = s_bind
el['state'] = "value"
el['priority'] = "value"
el['color'] = "value"
elif check_name and 'sorter' in check_name.lower():
# Sorter elements: Script-based bindings with base tag path (no sub-property)
# Color
c_bind = json.loads(json.dumps(INDUCTION_BINDING_TEMPLATE))
ref_key = list(c_bind['binding']['config']['references'].keys())[0]
c_bind['binding']['config']['references'][ref_key] = c_bind['binding']['config']['references'][ref_key] % element_path
c_bind['binding']['transforms'][0]['code'] = SORTER_COLOR_SCRIPT
prop_config[f"{element_path}.color"] = c_bind
# Priority
p_bind = json.loads(json.dumps(INDUCTION_BINDING_TEMPLATE))
ref_key = list(p_bind['binding']['config']['references'].keys())[0]
p_bind['binding']['config']['references'][ref_key] = p_bind['binding']['config']['references'][ref_key] % element_path
p_bind['binding']['transforms'][0]['code'] = SORTER_PRIORITY_SCRIPT
prop_config[f"{element_path}.priority"] = p_bind
# State
s_bind = json.loads(json.dumps(INDUCTION_BINDING_TEMPLATE))
ref_key = list(s_bind['binding']['config']['references'].keys())[0]
s_bind['binding']['config']['references'][ref_key] = s_bind['binding']['config']['references'][ref_key] % element_path
s_bind['binding']['transforms'][0]['code'] = SORTER_STATE_SCRIPT
prop_config[f"{element_path}.state"] = s_bind
el['state'] = "value"
el['priority'] = "value"
el['color'] = "value"
elif matches_pattern(r'^MCM\d+$'):
# MCM elements: Color expression coalesce({value},0)
prop_config[f"{element_path}.state"] = create_binding(STATE_BINDING_TEMPLATE)
prop_config[f"{element_path}.priority"] = create_binding(PRIORITY_BINDING_TEMPLATE)
c_bind = create_binding(COLOR_BINDING_TEMPLATE)
c_bind['binding']['transforms'][0]['expression'] = "coalesce({value},0)"
prop_config[f"{element_path}.color"] = c_bind
el['state'] = "value"
el['priority'] = "value"
el['color'] = "value"
else:
# Generic Devices
prop_config[f"{element_path}.state"] = create_binding(STATE_BINDING_TEMPLATE)
prop_config[f"{element_path}.priority"] = create_binding(PRIORITY_BINDING_TEMPLATE)
prop_config[f"{element_path}.color"] = create_binding(COLOR_BINDING_TEMPLATE)
el['state'] = "value"
el['priority'] = "value"
el['color'] = "value"
# 3. Add Specific Inner Bindings (Style/Paint bindings) AND Display Bindings
# Helper to check if a property path exists in the element
def property_exists(target_subpath):
"""Check if the property path exists in the element structure"""
parts = target_subpath.split('.')
current = el
for part in parts:
# Handle array access like elements[0]
if '[' in part:
key, index_str = part.split('[')
index = int(index_str.rstrip(']'))
if key not in current or not isinstance(current[key], list):
return False
if index >= len(current[key]):
return False
current = current[key][index]
else:
if part not in current:
return False
current = current[part]
# Check if the final property exists and has a value
return current is not None
# Define helper to add property binding (only if property exists)
def add_prop_binding(target_subpath, source_prop="color"):
if not property_exists(target_subpath):
return # Skip if property doesn't exist
binding_key = f"{element_path}.{target_subpath}"
prop_config[binding_key] = {
"binding": {
"type": "property",
"config": {
"path": f"this.{element_path}.{source_prop}"
}
}
}
# Helper to add display binding
def add_display_binding(flag_name, priority_prop="priority", is_ss=False, include_color=False):
d_bind = json.loads(json.dumps(DISPLAY_BINDING_TEMPLATE))
# Construct expression
path_to_elements = f"this.{element_path}"
path_to_priority = f"{{{path_to_elements}.{priority_prop}}}"
path_to_color = f"{{{path_to_elements}.color}}"
path_to_flag = f"{{session.custom.alarm_filter.{flag_name}}}"
S_STATES = ["ESTOP Was Actuated", "Jammed", "Enabled"]
if is_ss:
path_to_start_state = f"{{{path_to_elements}.start_state}}"
path_to_stop_color = f"{{{path_to_elements}.stop_color}}"
path_to_start_priority = f"{{{path_to_elements}.start_priority}}"
path_to_stop_priority = f"{{{path_to_elements}.stop_priority}}"
state_checks = " || ".join(f"({path_to_start_state} = '{s}')" for s in S_STATES)
expression = (
f"{state_checks} || "
f"({path_to_stop_color} = '#FF0000') || "
f"({path_to_start_priority} = 'High' || {path_to_stop_priority} = 'High') || "
f"{path_to_flag}"
)
elif include_color:
path_to_state = f"{{{path_to_elements}.state}}"
state_checks = " || ".join(f"({path_to_state} = '{s}')" for s in S_STATES)
expression = f"{state_checks} || ({path_to_priority} = 'High') || {path_to_flag}"
else:
expression = f"({path_to_priority} = 'High') || {path_to_flag}"
d_bind['binding']['config']['expression'] = expression
prop_config[f"{element_path}.style.display"] = d_bind
# Ensure style object exists and has display property
if 'style' not in el:
el['style'] = {"classes": ""}
el['style']['display'] = "block"
# Special FL_{n}CH_PE1 elements
if is_fl_ch_pe1:
# Check if ID contains PE pattern
id_has_pe = el_id and re.search(r'PE\d*$', el_id)
if id_has_pe:
# ID has PE - add fill.paint directly and style.display binding
add_prop_binding("fill.paint")
# Custom style.display with show_pes
display_expr = f"({{this.{element_path}.priority}} = 'High') || {{session.custom.alarm_filter.show_pes}}"
prop_config[f"{element_path}.style.display"] = {
"binding": {
"type": "expr",
"config": {
"expression": display_expr
},
"transforms": [
{
"inputType": "scalar",
"outputType": "scalar",
"mappings": [
{"input": False, "output": "none"}
],
"fallback": "block",
"type": "map"
}
]
}
}
if 'style' not in el:
el['style'] = {"classes": ""}
el['style']['display'] = "block"
else:
# ID has no PE but name has PE - apply VFD-like handling
# Apply fill, stroke to elements[0] and font properties to elements[1]
if 'elements' in el and isinstance(el['elements'], list) and len(el['elements']) > 0:
# Ensure elements[0] has fill.paint BEFORE adding binding
if 'fill' not in el['elements'][0]:
el['elements'][0]['fill'] = {}
el['elements'][0]['fill']['paint'] = "#aaaaaa"
# Apply stroke to elements[0]
if 'stroke' not in el['elements'][0]:
el['elements'][0]['stroke'] = {}
el['elements'][0]['stroke']['paint'] = "#000000"
el['elements'][0]['stroke']['width'] = "1"
# Apply font properties to elements[1]
if len(el['elements']) > 1:
el['elements'][1]['fontSize'] = 10
el['elements'][1]['fontFamily'] = "Arial"
el['elements'][1]['fontWeight'] = "bold"
el['elements'][1]['textAnchor'] = "middle"
# Add elements[0].fill.paint binding (no style.display binding)
add_prop_binding("elements[0].fill.paint")
elif matches_pattern(r'_EPC\d+$'):
add_prop_binding("elements[0].stroke.paint")
add_prop_binding("elements[3].fill.paint")
add_display_binding("show_safety")
elif name and 'Diverter' in name:
# Diverter elements - bind elements[1], [2], [3] fill.paint and custom style.display
add_prop_binding("elements[1].fill.paint")
add_prop_binding("elements[2].fill.paint")
add_prop_binding("elements[3].fill.paint")
# Custom style.display for element[1]: hide when state is "Diverted", show otherwise
prop_config[f"{element_path}.elements[1].style.display"] = {
"binding": {
"type": "property",
"config": {
"path": f"this.{element_path}.state"
},
"transforms": [
{
"inputType": "scalar",
"outputType": "scalar",
"mappings": [
{"input": "Diverted", "output": "none"}
],
"fallback": "block",
"type": "map"
}
]
}
}
# Custom style.display for element[2]: show when state is "Diverted", hide otherwise
prop_config[f"{element_path}.elements[2].style.display"] = {
"binding": {
"type": "property",
"config": {
"path": f"this.{element_path}.state"
},
"transforms": [
{
"inputType": "scalar",
"outputType": "scalar",
"mappings": [
{"input": "Diverted", "output": "block"}
],
"fallback": "none",
"type": "map"
}
]
}
}
# Custom style.display for element[3]: show when state is "Diverted", hide otherwise
prop_config[f"{element_path}.elements[3].style.display"] = {
"binding": {
"type": "property",
"config": {
"path": f"this.{element_path}.state"
},
"transforms": [
{
"inputType": "scalar",
"outputType": "scalar",
"mappings": [
{"input": "Diverted", "output": "block"}
],
"fallback": "none",
"type": "map"
}
]
}
}
# Main element display binding
add_display_binding("show_gateways")
# Set default stroke and fill for elements[0]
if 'elements' in el and isinstance(el['elements'], list) and len(el['elements']) > 0:
if 'stroke' not in el['elements'][0]:
el['elements'][0]['stroke'] = {}
el['elements'][0]['stroke']['paint'] = "#000000"
el['elements'][0]['stroke']['width'] = "1"
if 'fill' not in el['elements'][0]:
el['elements'][0]['fill'] = {}
el['elements'][0]['fill']['paint'] = "#aaaaaa"
elif matches_pattern(r'_S\d+$') and name_contains('/DIV/'):
# S buttons with DIV in name
add_prop_binding("elements[1].fill.paint")
add_display_binding("show_buttons", include_color=True)
elif matches_pattern(r'_S\d+$'):
add_prop_binding("elements[1].fill.paint")
add_display_binding("show_buttons", include_color=True)
elif matches_pattern(r'_(JR|GS|PR)\d+$'):
add_prop_binding("elements[1].fill.paint")
add_display_binding("show_buttons")
elif matches_pattern(r'_SS\d+$'):
add_prop_binding("elements[1].fill.paint", "start_color")
add_prop_binding("elements[2].fill.paint", "stop_color")
add_display_binding("show_buttons", is_ss=True)
elif matches_pattern(r'_VFD\d*$'):
# VFD pattern: matches _VFD, _VFD1, _VFD2, etc.
# Apply fill, stroke to elements[0] and font properties to elements[1]
if 'elements' in el and isinstance(el['elements'], list) and len(el['elements']) > 0:
# Ensure elements[0] has fill.paint BEFORE adding binding
if 'fill' not in el['elements'][0]:
el['elements'][0]['fill'] = {}
el['elements'][0]['fill']['paint'] = "#aaaaaa"
# Apply stroke to elements[0]
if 'stroke' not in el['elements'][0]:
el['elements'][0]['stroke'] = {}
el['elements'][0]['stroke']['paint'] = "#000000"
el['elements'][0]['stroke']['width'] = "1"
# Apply font properties to elements[1]
if len(el['elements']) > 1:
el['elements'][1]['fontSize'] = 10
el['elements'][1]['fontFamily'] = "Arial"
el['elements'][1]['fontWeight'] = "bold"
el['elements'][1]['textAnchor'] = "middle"
# Now add bindings after property exists
add_prop_binding("elements[0].fill.paint")
# Display binding removed for VFD
elif matches_pattern(r'_ST\d+$'):
# ST pattern: matches _ST1, _ST2, etc. in ID or name
# Apply fill, stroke to elements[0] and font properties to elements[1]
if 'elements' in el and isinstance(el['elements'], list) and len(el['elements']) > 0:
# Ensure elements[0] has fill.paint BEFORE adding binding
if 'fill' not in el['elements'][0]:
el['elements'][0]['fill'] = {}
el['elements'][0]['fill']['paint'] = "#aaaaaa"
# Apply stroke to elements[0]
if 'stroke' not in el['elements'][0]:
el['elements'][0]['stroke'] = {}
el['elements'][0]['stroke']['paint'] = "#000000"
el['elements'][0]['stroke']['width'] = "1"
# Apply font properties to elements[1]
if len(el['elements']) > 1:
el['elements'][1]['fontSize'] = 10
el['elements'][1]['fontFamily'] = "Arial"
el['elements'][1]['fontWeight'] = "bold"
el['elements'][1]['textAnchor'] = "middle"
# Now add bindings after property exists
add_prop_binding("elements[0].fill.paint")
# Display binding removed for ST
elif matches_pattern(r'_DPM\d+$'):
add_prop_binding("elements[0].fill.paint")
add_display_binding("show_gateways")
# CH elements where both ID and name contain "CH" (but ID has no PE pattern)
# Excludes Chute_{nnn} elements which have their own handling
elif el_id and 'CH' in el_id.upper() and name and 'CH' in name.upper() and not re.search(r'_(FPE|JPE|TPE|PE|SPAU|LPE)\d+', el_id) and not re.search(r'^Chute_\d+$', el_id):
# Both ID and name contain "CH" but ID has no PE pattern - apply chute bindings like VFD
if 'elements' in el and isinstance(el['elements'], list) and len(el['elements']) > 0:
# Ensure elements[0] has fill.paint BEFORE adding binding
if 'fill' not in el['elements'][0]:
el['elements'][0]['fill'] = {}
if 'paint' not in el['elements'][0]['fill']:
el['elements'][0]['fill']['paint'] = "#aaaaaa"
# Apply stroke to elements[0]
if 'stroke' not in el['elements'][0]:
el['elements'][0]['stroke'] = {}
el['elements'][0]['stroke']['paint'] = "#000000"
el['elements'][0]['stroke']['width'] = "1"
# Apply font properties to elements[1]
if len(el['elements']) > 1:
el['elements'][1]['fontSize'] = 10
el['elements'][1]['fontFamily'] = "Arial"
el['elements'][1]['fontWeight'] = "bold"
el['elements'][1]['textAnchor'] = "middle"
# Now add bindings after property exists
add_prop_binding("elements[0].fill.paint")
# Display binding removed for CH
# Special handling for CH (Chute) elements with PE-related names
# Excludes Chute_{nnn} elements which have their own handling
elif el_id and 'CH' in el_id.upper() and name and (re.search(r'_(FPE|JPE|TPE|PE|SPAU|LPE)\d+', name) or '/PE/' in name) and not re.search(r'^Chute_\d+$', el_id):
# ID contains "CH" and name is PE-related (has PE pattern or /PE/ path)
# Apply bindings like VFD
if 'elements' in el and isinstance(el['elements'], list) and len(el['elements']) > 0:
# Ensure elements[0] has fill.paint BEFORE adding binding
if 'fill' not in el['elements'][0]:
el['elements'][0]['fill'] = {}
if 'paint' not in el['elements'][0]['fill']:
el['elements'][0]['fill']['paint'] = "#aaaaaa"
# Apply stroke to elements[0]
if 'stroke' not in el['elements'][0]:
el['elements'][0]['stroke'] = {}
el['elements'][0]['stroke']['paint'] = "#000000"
el['elements'][0]['stroke']['width'] = "1"
# Apply font properties to elements[1]
if len(el['elements']) > 1:
el['elements'][1]['fontSize'] = 10
el['elements'][1]['fontFamily'] = "Arial"
el['elements'][1]['fontWeight'] = "bold"
el['elements'][1]['textAnchor'] = "middle"
# Now add bindings after property exists
add_prop_binding("elements[0].fill.paint")
# Determine display binding:
# - If ID also has PE pattern (like FL1018_3CH_PE1), use show_pes
# - If ID has only CH (like PS11_5CH), no display binding
if re.search(r'_(FPE|JPE|TPE|PE|SPAU|LPE)\d+', el_id):
add_display_binding("show_pes")
# else: Display binding removed for CH
elif matches_pattern(r'_(TPE|JPE|FPE|PE|SPAU|LPE)\d+$') or name_contains('/PE/'):
# PE element (by ID pattern or by name containing /PE/)
add_prop_binding("fill.paint")
add_display_binding("show_pes")
elif matches_pattern(r'_PMM\d+$'):
add_prop_binding("elements[2].fill.paint")
add_display_binding("show_gateways")
elif matches_pattern(r'_FIO\d+$'):
add_prop_binding("fill.paint")
add_display_binding("show_fio")
# Apply stroke directly to element
if 'stroke' not in el:
el['stroke'] = {}
el['stroke']['paint'] = "#000000"
el['stroke']['width'] = "1"
elif matches_pattern(r'_EX\d+$'):
# Apply fill, stroke to elements[0] and font properties to elements[1]
if 'elements' in el and isinstance(el['elements'], list) and len(el['elements']) > 0:
# Ensure elements[0] has fill.paint BEFORE adding binding
if 'fill' not in el['elements'][0]:
el['elements'][0]['fill'] = {}
el['elements'][0]['fill']['paint'] = "#aaaaaa"
# Apply stroke to elements[0]
if 'stroke' not in el['elements'][0]:
el['elements'][0]['stroke'] = {}
el['elements'][0]['stroke']['paint'] = "#000000"
el['elements'][0]['stroke']['width'] = "1"
# Apply font properties to elements[1]
if len(el['elements']) > 1:
el['elements'][1]['fontSize'] = 10
el['elements'][1]['fontFamily'] = "Arial"
el['elements'][1]['fontWeight'] = "bold"
el['elements'][1]['textAnchor'] = "middle"
# Now add bindings after property exists
add_prop_binding("elements[0].fill.paint")
# Display binding removed for EX
elif matches_pattern(r'Induction_?\d+$'):
# Induction pattern: matches Induction_1, Induction1, etc.
# Apply fill, stroke to elements[0] and font properties to elements[1]
if 'elements' in el and isinstance(el['elements'], list) and len(el['elements']) > 0:
# Ensure elements[0] has fill.paint BEFORE adding binding
if 'fill' not in el['elements'][0]:
el['elements'][0]['fill'] = {}
el['elements'][0]['fill']['paint'] = "#aaaaaa"
# Apply stroke to elements[0]
if 'stroke' not in el['elements'][0]:
el['elements'][0]['stroke'] = {}
el['elements'][0]['stroke']['paint'] = "#000000"
el['elements'][0]['stroke']['width'] = "1"
# Apply font properties to elements[1]
if len(el['elements']) > 1:
el['elements'][1]['fontSize'] = 10
el['elements'][1]['fontFamily'] = "Arial"
el['elements'][1]['fontWeight'] = "bold"
el['elements'][1]['textAnchor'] = "middle"
# Now add bindings after property exists
add_prop_binding("elements[0].fill.paint")
# Display binding removed for Induction
elif matches_pattern(r'^IND\d+[A-Za-z]*(-\d+)?$'):
# IND pattern: matches IND1, IND1A, IND4A-4, etc.
# Apply fill, stroke to elements[0] and font properties to elements[1]
if 'elements' in el and isinstance(el['elements'], list) and len(el['elements']) > 0:
# Ensure elements[0] has fill.paint BEFORE adding binding
if 'fill' not in el['elements'][0]:
el['elements'][0]['fill'] = {}
el['elements'][0]['fill']['paint'] = "#aaaaaa"
# Apply stroke to elements[0]
if 'stroke' not in el['elements'][0]:
el['elements'][0]['stroke'] = {}
el['elements'][0]['stroke']['paint'] = "#000000"
el['elements'][0]['stroke']['width'] = "1"
# Apply font properties to elements[1]
if len(el['elements']) > 1:
el['elements'][1]['fontSize'] = 10
el['elements'][1]['fontFamily'] = "Arial"
el['elements'][1]['fontWeight'] = "bold"
el['elements'][1]['textAnchor'] = "middle"
# Now add bindings after property exists
add_prop_binding("elements[0].fill.paint")
# Display binding removed for IND
elif matches_pattern(r'Chute_\d+') or name_contains('/SMC/Chute/') or (el_id and el_id.startswith('S01')):
# Chute elements: Apply VFD-like handling with elements[0].fill.paint and text formatting
if 'elements' in el and isinstance(el['elements'], list) and len(el['elements']) > 0:
# Ensure elements[0] has fill.paint BEFORE adding binding
if 'fill' not in el['elements'][0]:
el['elements'][0]['fill'] = {}
el['elements'][0]['fill']['paint'] = "#aaaaaa"
# Apply stroke to elements[0]
if 'stroke' not in el['elements'][0]:
el['elements'][0]['stroke'] = {}
el['elements'][0]['stroke']['paint'] = "#000000"
el['elements'][0]['stroke']['width'] = "1"
# Apply font properties to elements[1]
if len(el['elements']) > 1:
el['elements'][1]['fontSize'] = 10
el['elements'][1]['fontFamily'] = "Arial"
el['elements'][1]['fontWeight'] = "bold"
el['elements'][1]['textAnchor'] = "middle"
# Now add bindings after property exists
add_prop_binding("elements[0].fill.paint")
# Display binding removed for Chute
elif check_name and 'sorter' in check_name.lower():
# Sorter elements: fill.paint bound to color
add_prop_binding("fill.paint")
# Display binding removed for Sorter
elif matches_pattern(r'_FIOH\d+$'):
# FIOH (Field IO Hub) elements: fill.paint bound to color
add_prop_binding("fill.paint")
add_display_binding("show_fio")
# Apply stroke directly to element
if 'stroke' not in el:
el['stroke'] = {}
el['stroke']['paint'] = "#000000"
el['stroke']['width'] = "1"
elif matches_pattern(r'_SIO\d+$'):
# SIO (Serial IO) elements: fill.paint bound to color
add_prop_binding("fill.paint")
add_display_binding("show_gateways")
# Apply stroke directly to element
if 'stroke' not in el:
el['stroke'] = {}
el['stroke']['paint'] = "#000000"
el['stroke']['width'] = "1"
elif matches_pattern(r'^MCM\d+$'):
add_prop_binding("elements[1].fill.paint")
add_display_binding("show_gateways")
elif name_contains('/CMC/Conveyors/') or (tag_map.get(check_name) and '/CMC/Conveyors/' in tag_map[check_name]['name']):
add_prop_binding("fill.paint")
# Display binding removed for CMC/Conveyors
# Recurse into nested elements ONLY for cleanup (is_top_level=False)
if 'elements' in el and isinstance(el['elements'], list):
traverse_elements(el['elements'], f"{element_path}.elements", prop_config, tag_map, is_top_level=False, system_name=system_name)
def process_system(system_id, view_name=None):
print(f"Searching for system: {system_id}")
if view_name:
print(f"Filtering for views containing: '{view_name}'")
# 1. Find matching folder(s) in Detailed-Views (Target)
target_folders = []
if os.path.exists(BASE_NEW):
for item in os.listdir(BASE_NEW):
item_path = os.path.join(BASE_NEW, item)
if item.startswith(system_id) and os.path.isdir(item_path):
# If view_name specified, filter by it
if view_name is None or view_name.lower() in item.lower():
target_folders.append(item)
if not target_folders:
print(f"Error: Could not find folder(s) starting with '{system_id}' in {BASE_NEW}")
if view_name:
print(f" (with name containing '{view_name}')")
return
print(f"\nFound {len(target_folders)} matching view(s):")
for i, folder in enumerate(target_folders, 1):
print(f" {i}. {folder}")
# 2. Process each target folder
success_count = 0
for target_folder_name in target_folders:
print(f"\n{'='*60}")
print(f"Processing folder: {target_folder_name}")
print(f"{'='*60}")
# Find matching folder in DetailedOldViews (Source)
# We first try exact name match
source_folder_name = None
if os.path.exists(BASE_OLD):
if os.path.exists(os.path.join(BASE_OLD, target_folder_name)):
source_folder_name = target_folder_name
else:
# Try to find best match - prefer folders that contain key parts of target name
target_parts = set(target_folder_name.lower().split())
best_match = None
best_score = 0
for item in os.listdir(BASE_OLD):
item_path = os.path.join(BASE_OLD, item)
if item.startswith(system_id) and os.path.isdir(item_path):
# Calculate match score based on common words
item_parts = set(item.lower().split())
common_parts = target_parts.intersection(item_parts)
score = len(common_parts)
# Prefer exact match
if item == target_folder_name:
source_folder_name = item
break
# Otherwise track best match
elif score > best_score:
best_score = score
best_match = item
# Use best match if no exact match found
if not source_folder_name and best_match:
source_folder_name = best_match
if not source_folder_name:
print(f"⚠ Warning: Could not find matching folder in {BASE_OLD} for '{target_folder_name}'")
print(f" Skipping this folder...")
continue
print(f" Source (Old): {source_folder_name}")
print(f" Target (New): {target_folder_name}")
# Extract MCM ID from folder name (e.g., "MCM04" from "MCM04 North Bulk Inbound...")
mcm_id = extract_mcm_id(target_folder_name)
print(f" Extracted MCM ID: {mcm_id}")
view_json_path = os.path.join(BASE_OLD, source_folder_name, "view.json")
scada_view_path = os.path.join(BASE_NEW, target_folder_name, "view.json")
# Create temporary copies to avoid permission issues if direct write fails
temp_source = f"{mcm_id}_source.json"
temp_target = f"{mcm_id}_target_{target_folder_name.replace(' ', '_')}.json"
try:
# We try to read directly first
print(f"Loading View JSON: {view_json_path}...")
view_data = load_json(view_json_path)
if not view_data:
print(f" ✗ Failed to load source view.json")
continue
print("Building tag map...")
tag_map = build_tag_map(view_data)
print(f"Loading SCADA SVG View: {scada_view_path}...")
scada_view_data = load_json(scada_view_path)
if not scada_view_data:
print(f" ✗ Failed to load target view.json")
continue
# Use extracted MCM ID instead of full folder name
print(f"Using System Name: {mcm_id}")
print("Updating SCADA SVG Bindings...")
updated_count = update_scada_svg(scada_view_data, tag_map, mcm_id)
print(f"Generated bindings for {updated_count} elements.")
# Write to temporary file first
with open(temp_target, 'w', encoding='utf-8') as f:
json.dump(scada_view_data, f, indent=2)
print(f"Successfully processed to {temp_target}")
# Now try to copy back using os.system/shell copy if direct write might fail
# But wait, Python write might fail due to permissions?
# Let's try writing directly to destination first.
try:
with open(scada_view_path, 'w', encoding='utf-8') as f:
json.dump(scada_view_data, f, indent=2)
print(f"✓ Saved directly to {scada_view_path}")
success_count += 1
except PermissionError:
print("Direct write failed (Permission denied). Attempting shell copy...")
import shutil
shutil.copy2(temp_target, scada_view_path)
print("✓ Shell copy completed.")
success_count += 1
except Exception as e:
print(f"✗ An error occurred processing {target_folder_name}: {e}")
import traceback
traceback.print_exc()
finally:
# Cleanup temp
if os.path.exists(temp_target):
os.remove(temp_target)
# Print summary
print(f"\n{'='*60}")
print(f"Summary: {success_count}/{len(target_folders)} view(s) processed successfully")
print(f"{'='*60}")
def main():
if len(sys.argv) < 2:
print("Usage: python update_scada_names.py <MCM_ID> [view_name]")
print("Example: python update_scada_names.py MCM02")
print("Example: python update_scada_names.py MCM02 'Fluid Inbound Upper'")
print("\nIf view_name is not specified, all views starting with MCM_ID will be processed.")
return
system_id = sys.argv[1]
view_name = sys.argv[2] if len(sys.argv) > 2 else None
if view_name:
print(f"Processing specific view containing: '{view_name}'")
process_system(system_id, view_name)
if __name__ == '__main__':
main()