698 lines
24 KiB
Python
698 lines
24 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Script to generate Ignition MCM view JSON from CSV input.
|
|
CSV format: DPM,DPM_IP,Name,IP
|
|
"""
|
|
|
|
import csv
|
|
import json
|
|
import sys
|
|
import re
|
|
from typing import List, Dict, Any
|
|
from collections import defaultdict
|
|
|
|
|
|
def create_plc_device(mcm_name: str, x: float, y: float) -> Dict[str, Any]:
|
|
"""Create PLC device component."""
|
|
return {
|
|
"type": "ia.display.view",
|
|
"version": 0,
|
|
"props": {
|
|
"path": "Windows/Tabs/Enternet Windows/Components/Device",
|
|
"params": {
|
|
"name": mcm_name,
|
|
"type": "PLC"
|
|
}
|
|
},
|
|
"meta": {
|
|
"name": "PLC"
|
|
},
|
|
"position": {
|
|
"x": x,
|
|
"y": y,
|
|
"height": 0.1032,
|
|
"width": 0.0522
|
|
},
|
|
"custom": {},
|
|
"propConfig": {
|
|
"props.params.tag": {
|
|
"binding": {
|
|
"config": {
|
|
"fallbackDelay": 2.5,
|
|
"mode": "indirect",
|
|
"references": {
|
|
"fc": "{session.custom.fc}"
|
|
},
|
|
"tagPath": f"[{{fc}}_SCADA_TAG_PROVIDER]System/{mcm_name}/Rack/Communication_Faulted"
|
|
},
|
|
"transforms": [
|
|
{
|
|
"expression": "coalesce({value},{view.params.forceFaultStatus},\"comm\")",
|
|
"type": "expression"
|
|
},
|
|
{
|
|
"fallback": False,
|
|
"inputType": "scalar",
|
|
"mappings": [
|
|
{
|
|
"input": True,
|
|
"output": True
|
|
},
|
|
{
|
|
"input": "comm",
|
|
"output": "comm"
|
|
}
|
|
],
|
|
"outputType": "scalar",
|
|
"type": "map"
|
|
}
|
|
],
|
|
"type": "tag"
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
def create_plc_switch(x: float, y: float, down: bool = True, up: bool = True,
|
|
left: bool = False, right: bool = False, name: str = "PLC_Switch") -> Dict[str, Any]:
|
|
"""Create PLC Switch component."""
|
|
return {
|
|
"type": "ia.display.view",
|
|
"version": 0,
|
|
"props": {
|
|
"path": "Windows/Tabs/Enternet Windows/Components/PLC_Switch",
|
|
"params": {
|
|
"down": down,
|
|
"left": left,
|
|
"right": right,
|
|
"up": up
|
|
}
|
|
},
|
|
"meta": {
|
|
"name": name
|
|
},
|
|
"position": {
|
|
"x": x,
|
|
"y": y,
|
|
"height": 0.1032,
|
|
"width": 0.0522
|
|
},
|
|
"custom": {}
|
|
}
|
|
|
|
|
|
def create_switch(x: float, y: float, down: bool = True, up: bool = True,
|
|
left: bool = False, right: bool = False, name: str = "Switch") -> Dict[str, Any]:
|
|
"""Create Switch component."""
|
|
return {
|
|
"type": "ia.display.view",
|
|
"version": 0,
|
|
"props": {
|
|
"path": "Windows/Tabs/Enternet Windows/Components/Switch",
|
|
"params": {
|
|
"down": down,
|
|
"left": left,
|
|
"right": right,
|
|
"up": up
|
|
}
|
|
},
|
|
"meta": {
|
|
"name": name
|
|
},
|
|
"position": {
|
|
"x": x,
|
|
"y": y,
|
|
"height": 0.1032,
|
|
"width": 0.0522
|
|
},
|
|
"custom": {}
|
|
}
|
|
|
|
|
|
def create_line(x: float, y: float, down: bool = True, up: bool = True,
|
|
left: bool = False, right: bool = False, name: str = "Line") -> Dict[str, Any]:
|
|
"""Create Line component."""
|
|
return {
|
|
"type": "ia.display.view",
|
|
"version": 0,
|
|
"props": {
|
|
"path": "Windows/Tabs/Enternet Windows/Components/Line",
|
|
"params": {
|
|
"down": down,
|
|
"left": left,
|
|
"right": right,
|
|
"up": up
|
|
}
|
|
},
|
|
"meta": {
|
|
"name": name
|
|
},
|
|
"position": {
|
|
"x": x,
|
|
"y": y,
|
|
"height": 0.1032,
|
|
"width": 0.0522
|
|
},
|
|
"custom": {}
|
|
}
|
|
|
|
|
|
def create_dpm_device(mcm_name: str, dpm_name: str, dpm_ip: str, x: float, y: float) -> Dict[str, Any]:
|
|
"""Create DPM device component."""
|
|
return {
|
|
"type": "ia.display.view",
|
|
"version": 0,
|
|
"props": {
|
|
"path": "Windows/Tabs/Enternet Windows/Components/Device",
|
|
"params": {
|
|
"ip": dpm_ip,
|
|
"name": dpm_name,
|
|
"type": "DPM_BLOCK"
|
|
}
|
|
},
|
|
"meta": {
|
|
"name": dpm_name
|
|
},
|
|
"position": {
|
|
"x": x,
|
|
"y": y,
|
|
"height": 0.1032,
|
|
"width": 0.0522
|
|
},
|
|
"custom": {},
|
|
"propConfig": {
|
|
"props.params.tag": {
|
|
"binding": {
|
|
"config": {
|
|
"fallbackDelay": 2.5,
|
|
"mode": "indirect",
|
|
"references": {
|
|
"fc": "{session.custom.fc}"
|
|
},
|
|
"tagPath": f"[{{fc}}_SCADA_TAG_PROVIDER]System/{mcm_name}/IO_BLOCK/DPM/{dpm_name}/Communication_Faulted"
|
|
},
|
|
"transforms": [
|
|
{
|
|
"expression": "coalesce({value},{view.params.forceFaultStatus},\"comm\")",
|
|
"type": "expression"
|
|
},
|
|
{
|
|
"fallback": False,
|
|
"inputType": "scalar",
|
|
"mappings": [
|
|
{
|
|
"input": True,
|
|
"output": True
|
|
},
|
|
{
|
|
"input": "comm",
|
|
"output": "comm"
|
|
}
|
|
],
|
|
"outputType": "scalar",
|
|
"type": "map"
|
|
}
|
|
],
|
|
"type": "tag"
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
def determine_device_type(device_name: str) -> str:
|
|
"""Determine device type from name."""
|
|
name_upper = device_name.upper()
|
|
|
|
if '_VFD' in name_upper:
|
|
return 'APF'
|
|
elif '_EX' in name_upper:
|
|
return 'EXTENDO'
|
|
elif '_FIO' in name_upper or '_SIO' in name_upper or '_FIOM' in name_upper:
|
|
return 'FIO_SIO'
|
|
elif '_TIPPER' in name_upper:
|
|
return 'TIPPER'
|
|
elif '_PMM' in name_upper:
|
|
return 'PMM'
|
|
elif '_ZMX' in name_upper:
|
|
return 'ZMX'
|
|
else:
|
|
return 'APF' # Default
|
|
|
|
|
|
def determine_device_category(device_name: str, device_type: str) -> str:
|
|
"""Determine device category for tag path."""
|
|
if device_type == 'APF':
|
|
return 'Conveyor/VFD'
|
|
elif device_type == 'EXTENDO':
|
|
return 'Conveyor/EXTENDO'
|
|
elif device_type == 'TIPPER':
|
|
return 'Conveyor/TIPPER'
|
|
elif device_type == 'FIO_SIO':
|
|
return 'IO_BLOCK/FIO'
|
|
elif device_type == 'PMM':
|
|
return 'PMM'
|
|
elif device_type == 'ZMX':
|
|
return 'Chute/D2C'
|
|
else:
|
|
return 'Conveyor/VFD'
|
|
|
|
|
|
def create_device(mcm_name: str, device_name: str, device_ip: str,
|
|
x: float, y: float) -> Dict[str, Any]:
|
|
"""Create a device component."""
|
|
device_type = determine_device_type(device_name)
|
|
category = determine_device_category(device_name, device_type)
|
|
|
|
# For ZMX devices, replace _ZMX* suffix with _D2C_CHUTE in the tag path
|
|
tag_device_name = device_name
|
|
if device_type == 'ZMX':
|
|
# Replace _ZMX followed by any digits with _D2C_CHUTE
|
|
tag_device_name = re.sub(r'_ZMX\d*', '_D2C_CHUTE', device_name)
|
|
|
|
return {
|
|
"type": "ia.display.view",
|
|
"version": 0,
|
|
"props": {
|
|
"path": "Windows/Tabs/Enternet Windows/Components/Device",
|
|
"params": {
|
|
"ip": device_ip,
|
|
"name": device_name,
|
|
"type": device_type
|
|
}
|
|
},
|
|
"meta": {
|
|
"name": device_name
|
|
},
|
|
"position": {
|
|
"x": x,
|
|
"y": y,
|
|
"height": 0.1032,
|
|
"width": 0.0522
|
|
},
|
|
"custom": {},
|
|
"propConfig": {
|
|
"props.params.tag": {
|
|
"binding": {
|
|
"config": {
|
|
"fallbackDelay": 2.5,
|
|
"mode": "indirect",
|
|
"references": {
|
|
"fc": "{session.custom.fc}"
|
|
},
|
|
"tagPath": f"[{{fc}}_SCADA_TAG_PROVIDER]System/{mcm_name}/{category}/{tag_device_name}/Communication_Faulted"
|
|
},
|
|
"transforms": [
|
|
{
|
|
"expression": "coalesce({value},{view.params.forceFaultStatus},\"comm\")",
|
|
"type": "expression"
|
|
},
|
|
{
|
|
"fallback": False,
|
|
"inputType": "scalar",
|
|
"mappings": [
|
|
{
|
|
"input": True,
|
|
"output": True
|
|
},
|
|
{
|
|
"input": "comm",
|
|
"output": "comm"
|
|
}
|
|
],
|
|
"outputType": "scalar",
|
|
"type": "map"
|
|
}
|
|
],
|
|
"type": "tag"
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
def extract_mcm_number(full_name: str) -> str:
|
|
"""Extract MCM number from full name (e.g., 'SAT9_MCM02' -> 'MCM02')."""
|
|
# Look for MCM followed by digits
|
|
match = re.search(r'MCM\d+', full_name, re.IGNORECASE)
|
|
if match:
|
|
return match.group().upper()
|
|
return full_name
|
|
|
|
|
|
def generate_mcm_view(mcm_name: str, dpm_data: Dict[str, Dict]) -> List[Dict[str, Any]]:
|
|
"""
|
|
Generate the complete MCM view structure with multi-row support.
|
|
|
|
Args:
|
|
mcm_name: Name of the MCM (e.g., "MCM02")
|
|
dpm_data: Dictionary mapping DPM names to their info and devices
|
|
|
|
Returns:
|
|
List containing the root container with all children
|
|
"""
|
|
children = []
|
|
switch_counter = 1
|
|
|
|
# Layout constants
|
|
start_x = 0.0188
|
|
plc_y = 0.0024
|
|
plc_switch_y = 0.1028
|
|
dpm_start_y = 0.2034
|
|
vertical_switch_offset = 0.0966 # Distance from DPM to vertical switch between DPMs
|
|
base_dpm_spacing = 0.1974 # Base vertical spacing between DPMs
|
|
horizontal_spacing = 0.0527 # Horizontal spacing between devices
|
|
device_below_switch = 0.0997 # Distance from switch to device below it
|
|
row_height = 0.20 # Height for each row (switch + device + gap)
|
|
|
|
# Maximum devices per row (fits in 1920px width)
|
|
max_devices_per_row = 17
|
|
|
|
# Add PLC at the top
|
|
children.append(create_plc_device(mcm_name, start_x, plc_y))
|
|
|
|
# Add PLC Switch connecting to DPMs
|
|
children.append(create_plc_switch(start_x, plc_switch_y, down=True, up=True))
|
|
|
|
# Track cumulative Y offset for variable DPM spacing
|
|
cumulative_y = dpm_start_y
|
|
|
|
# Add DPMs with their devices
|
|
for dpm_idx, (dpm_name, dpm_info) in enumerate(dpm_data.items()):
|
|
dpm_y = cumulative_y
|
|
|
|
# Add DPM
|
|
children.append(create_dpm_device(
|
|
mcm_name,
|
|
dpm_name,
|
|
dpm_info['dpm_ip'],
|
|
start_x,
|
|
dpm_y
|
|
))
|
|
|
|
# Add devices horizontally from this DPM (with multi-row support)
|
|
devices = dpm_info.get('devices', [])
|
|
num_rows = max(1, (len(devices) + max_devices_per_row - 1) // max_devices_per_row) if devices else 1
|
|
|
|
# If multiple rows, create vertical chain from DPM (Lines + Switch)
|
|
# Position vertical chain at DPM location to avoid overlap with first row
|
|
row_connector_x = start_x
|
|
|
|
if num_rows > 1:
|
|
# Calculate position of last device in first row
|
|
first_row_device_count = min(max_devices_per_row, len(devices))
|
|
last_device_pos_in_row = first_row_device_count - 1
|
|
first_row_last_device_x = start_x + ((last_device_pos_in_row + 1) * horizontal_spacing)
|
|
|
|
# Add 3 Lines connecting last device of first row to vertical chain
|
|
first_row_switch_y = dpm_y # Y position of first row switches
|
|
first_row_device_y = first_row_switch_y + device_below_switch # Y position of first row devices
|
|
second_row_switch_y = dpm_y + row_height # Y position of second row switches
|
|
|
|
# Position lines to the right - full spacing away (same distance as between switches)
|
|
lines_x = first_row_last_device_x + horizontal_spacing
|
|
|
|
# Line 1: At same height as first row switch, connects left to switch and down
|
|
line1_y = first_row_switch_y
|
|
children.append(create_line(
|
|
lines_x,
|
|
line1_y,
|
|
down=True,
|
|
up=False,
|
|
left=True,
|
|
right=False,
|
|
name="FirstRowLine1"
|
|
))
|
|
|
|
# Line 2: At same height as first row devices, vertical line up and down
|
|
line2_y = first_row_device_y
|
|
children.append(create_line(
|
|
lines_x,
|
|
line2_y,
|
|
down=True,
|
|
up=True,
|
|
left=False,
|
|
right=False,
|
|
name="FirstRowLine2"
|
|
))
|
|
|
|
# Line 3: At same height as second row switches, connects up and left to vertical chain
|
|
line3_y = second_row_switch_y
|
|
children.append(create_line(
|
|
lines_x,
|
|
line3_y,
|
|
down=False,
|
|
up=True,
|
|
left=True,
|
|
right=False,
|
|
name="FirstRowLine3"
|
|
))
|
|
|
|
# Create vertical connectors: Line - Switch - Line pattern
|
|
# Need 3 connectors for proper connection: Line, Switch (branches), Line
|
|
num_vertical_connectors = 3 if num_rows == 2 else (num_rows - 1) * 2 + 1
|
|
|
|
for v_idx in range(num_vertical_connectors):
|
|
v_y = dpm_y + ((v_idx + 1) * row_height / 2)
|
|
|
|
# Determine which row this connector branches to (if any)
|
|
connects_to_row = None
|
|
if num_rows == 2 and v_idx == 1:
|
|
connects_to_row = 1 # Second connector (Switch) connects to second row
|
|
elif num_rows > 2:
|
|
# For more rows, connect at appropriate intervals
|
|
for r in range(1, num_rows):
|
|
if v_idx == (r * 2 - 1):
|
|
connects_to_row = r
|
|
break
|
|
|
|
# All connectors connect upward and downward in the chain
|
|
has_up = True
|
|
has_down = True
|
|
|
|
# Pattern: Line (idx 0), Switch (idx 1), Line (idx 2)
|
|
if v_idx == 1:
|
|
# Middle is Switch (vertical only, no branching)
|
|
children.append(create_switch(
|
|
row_connector_x,
|
|
v_y,
|
|
down=has_down,
|
|
up=has_up,
|
|
left=False,
|
|
right=False, # No branching
|
|
name=f"Switch{switch_counter}"
|
|
))
|
|
switch_counter += 1
|
|
else:
|
|
# First and third are Lines
|
|
line_num = 1 if v_idx == 0 else 2
|
|
children.append(create_line(
|
|
row_connector_x,
|
|
v_y,
|
|
down=has_down,
|
|
up=has_up,
|
|
left=False,
|
|
right=False, # Lines don't branch
|
|
name=f"VerticalLine{line_num}"
|
|
))
|
|
|
|
for dev_idx, device in enumerate(devices):
|
|
# Calculate which row and position in row
|
|
row_num = dev_idx // max_devices_per_row
|
|
pos_in_row = dev_idx % max_devices_per_row
|
|
|
|
# Calculate position
|
|
if row_num == 0:
|
|
# First row: left to right, starting after DPM
|
|
device_x = start_x + ((pos_in_row + 1) * horizontal_spacing)
|
|
else:
|
|
# Additional rows: right to left, starting from first row's last switch position
|
|
# Start at same X as last switch of first row and go backwards (leftward)
|
|
if num_rows > 1:
|
|
device_x = first_row_last_device_x - (pos_in_row * horizontal_spacing)
|
|
else:
|
|
device_x = start_x + ((pos_in_row + 1) * horizontal_spacing)
|
|
|
|
switch_y = dpm_y + (row_num * row_height)
|
|
device_y = switch_y + device_below_switch
|
|
|
|
# Determine switch connections
|
|
is_first_in_row = (pos_in_row == 0)
|
|
is_last_in_row = (pos_in_row == max_devices_per_row - 1) or (dev_idx == len(devices) - 1)
|
|
is_last_device_of_first_row = (row_num == 0 and is_last_in_row and num_rows > 1)
|
|
|
|
# Switch connections
|
|
if row_num == 0:
|
|
# First row: left to right
|
|
connects_left = True
|
|
connects_right = (not is_last_in_row) or (num_rows > 1) # Last switch connects right if multiple rows
|
|
else:
|
|
# Additional rows: right to left (reversed)
|
|
connects_right = True
|
|
connects_left = not is_last_in_row
|
|
|
|
# Add switch above device
|
|
children.append(create_switch(
|
|
device_x,
|
|
switch_y,
|
|
down=True,
|
|
up=False,
|
|
left=connects_left,
|
|
right=connects_right,
|
|
name=f"Switch{switch_counter}"
|
|
))
|
|
switch_counter += 1
|
|
|
|
# Add device
|
|
children.append(create_device(
|
|
mcm_name,
|
|
device['name'],
|
|
device['ip'],
|
|
device_x,
|
|
device_y
|
|
))
|
|
|
|
# Calculate total height used by this DPM including all rows
|
|
if devices and num_rows > 1:
|
|
# Multi-row: With vertical connectors, height includes the vertical Line/Switch chain
|
|
num_vertical_connectors = 3 if num_rows == 2 else (num_rows - 1) * 2 + 1
|
|
# Vertical connectors start at (v_idx + 1) * row_height / 2
|
|
# Last connector position will be where next DPM connects
|
|
dpm_total_height = ((num_vertical_connectors + 1) * row_height / 2)
|
|
cumulative_y += dpm_total_height
|
|
else:
|
|
# Single row or no devices - use original simple spacing
|
|
# Just add base DPM spacing, vertical switch will be added separately
|
|
cumulative_y += base_dpm_spacing
|
|
|
|
# Add vertical switch below DPM to connect to next DPM (except for the last DPM)
|
|
# Only needed if single row or no devices - vertical chain already handles multi-row
|
|
if dpm_idx < len(dpm_data) - 1 and (num_rows == 1 or not devices):
|
|
switch_y = dpm_y + vertical_switch_offset
|
|
|
|
children.append(create_switch(
|
|
start_x,
|
|
switch_y,
|
|
down=True,
|
|
up=True,
|
|
left=False,
|
|
right=False,
|
|
name=f"Switch{switch_counter}"
|
|
))
|
|
switch_counter += 1
|
|
|
|
# Create root container
|
|
root = {
|
|
"type": "ia.container.coord",
|
|
"version": 0,
|
|
"props": {
|
|
"mode": "percent"
|
|
},
|
|
"meta": {
|
|
"name": mcm_name
|
|
},
|
|
"position": {},
|
|
"custom": {},
|
|
"children": children
|
|
}
|
|
|
|
return [root]
|
|
|
|
|
|
def read_csv_file(csv_path: str) -> Dict[str, Dict]:
|
|
"""
|
|
Read CSV file and extract DPM and device information.
|
|
|
|
CSV format: DPM,DPM_IP,Name,IP
|
|
|
|
Returns:
|
|
Dictionary mapping DPM names to their info and devices
|
|
"""
|
|
dpm_data = defaultdict(lambda: {'dpm_ip': None, 'devices': []})
|
|
|
|
with open(csv_path, 'r', encoding='utf-8-sig') as f: # utf-8-sig handles BOM
|
|
reader = csv.DictReader(f)
|
|
|
|
# Debug: print available columns
|
|
first_row = True
|
|
for row in reader:
|
|
if first_row:
|
|
print(f"CSV columns found: {list(row.keys())}")
|
|
first_row = False
|
|
|
|
# Handle potential column name variations
|
|
dpm_key = 'DPM' if 'DPM' in row else next((k for k in row.keys() if 'DPM' in k.upper()), None)
|
|
dpm_ip_key = 'DPM_IP' if 'DPM_IP' in row else next((k for k in row.keys() if 'DPM_IP' in k.upper() or 'DPM IP' in k.upper()), None)
|
|
name_key = 'Name' if 'Name' in row else next((k for k in row.keys() if k.upper() == 'NAME'), None)
|
|
ip_key = 'IP' if 'IP' in row else next((k for k in row.keys() if k.upper() == 'IP' and k != dpm_ip_key), None)
|
|
|
|
if not dpm_key or not dpm_ip_key:
|
|
print(f"Warning: Could not find DPM columns. Available: {list(row.keys())}")
|
|
continue
|
|
|
|
dpm_name = row[dpm_key].strip()
|
|
dpm_ip = row[dpm_ip_key].strip()
|
|
|
|
# Skip empty rows
|
|
if not dpm_name or not dpm_ip:
|
|
continue
|
|
|
|
# Set DPM IP if not already set
|
|
if dpm_data[dpm_name]['dpm_ip'] is None:
|
|
dpm_data[dpm_name]['dpm_ip'] = dpm_ip
|
|
|
|
# Add device if name and IP are provided
|
|
if name_key and ip_key:
|
|
device_name = row[name_key].strip()
|
|
device_ip = row[ip_key].strip()
|
|
|
|
if device_name and device_ip:
|
|
dpm_data[dpm_name]['devices'].append({
|
|
'name': device_name,
|
|
'ip': device_ip
|
|
})
|
|
|
|
return dict(dpm_data)
|
|
|
|
|
|
def main():
|
|
if len(sys.argv) < 3:
|
|
print("Usage: python generate_network_topology.py <csv_file> <mcm_name> [output_file]")
|
|
print("\nCSV format: DPM,DPM_IP,Name,IP")
|
|
print("Example: python generate_network_topology.py dpms.csv SAT9_MCM02 output.json")
|
|
sys.exit(1)
|
|
|
|
csv_file = sys.argv[1]
|
|
mcm_name_input = sys.argv[2]
|
|
|
|
# Extract just the MCM number (e.g., "SAT9_MCM02" -> "MCM02")
|
|
mcm_name = extract_mcm_number(mcm_name_input)
|
|
output_file = sys.argv[3] if len(sys.argv) > 3 else f"{mcm_name}_view.json"
|
|
|
|
print(f"Processing {csv_file}...")
|
|
print(f"MCM Name: {mcm_name}")
|
|
|
|
# Read CSV
|
|
dpm_data = read_csv_file(csv_file)
|
|
|
|
# Count total devices
|
|
total_devices = sum(len(dpm_info['devices']) for dpm_info in dpm_data.values())
|
|
|
|
# Generate view
|
|
view = generate_mcm_view(mcm_name, dpm_data)
|
|
|
|
# Write output
|
|
with open(output_file, 'w') as f:
|
|
json.dump(view, f, indent=2)
|
|
|
|
print(f"\n✓ Generated view for {mcm_name}")
|
|
print(f" - {len(dpm_data)} DPMs")
|
|
print(f" - {total_devices} total devices")
|
|
print(f"✓ Output written to: {output_file}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|