Scripts/SCADA/network_topology/generate_network_topology.py
2025-12-06 19:16:58 +04:00

698 lines
24 KiB
Python

#!/usr/bin/env python3
"""
Script to generate Ignition MCM view JSON from CSV input.
CSV format: DPM,DPM_IP,Name,IP
"""
import csv
import json
import sys
import re
from typing import List, Dict, Any
from collections import defaultdict
def create_plc_device(mcm_name: str, x: float, y: float) -> Dict[str, Any]:
"""Create PLC device component."""
return {
"type": "ia.display.view",
"version": 0,
"props": {
"path": "Windows/Tabs/Enternet Windows/Components/Device",
"params": {
"name": mcm_name,
"type": "PLC"
}
},
"meta": {
"name": "PLC"
},
"position": {
"x": x,
"y": y,
"height": 0.1032,
"width": 0.0522
},
"custom": {},
"propConfig": {
"props.params.tag": {
"binding": {
"config": {
"fallbackDelay": 2.5,
"mode": "indirect",
"references": {
"fc": "{session.custom.fc}"
},
"tagPath": f"[{{fc}}_SCADA_TAG_PROVIDER]System/{mcm_name}/Rack/Communication_Faulted"
},
"transforms": [
{
"expression": "coalesce({value},{view.params.forceFaultStatus},\"comm\")",
"type": "expression"
},
{
"fallback": False,
"inputType": "scalar",
"mappings": [
{
"input": True,
"output": True
},
{
"input": "comm",
"output": "comm"
}
],
"outputType": "scalar",
"type": "map"
}
],
"type": "tag"
}
}
}
}
def create_plc_switch(x: float, y: float, down: bool = True, up: bool = True,
left: bool = False, right: bool = False, name: str = "PLC_Switch") -> Dict[str, Any]:
"""Create PLC Switch component."""
return {
"type": "ia.display.view",
"version": 0,
"props": {
"path": "Windows/Tabs/Enternet Windows/Components/PLC_Switch",
"params": {
"down": down,
"left": left,
"right": right,
"up": up
}
},
"meta": {
"name": name
},
"position": {
"x": x,
"y": y,
"height": 0.1032,
"width": 0.0522
},
"custom": {}
}
def create_switch(x: float, y: float, down: bool = True, up: bool = True,
left: bool = False, right: bool = False, name: str = "Switch") -> Dict[str, Any]:
"""Create Switch component."""
return {
"type": "ia.display.view",
"version": 0,
"props": {
"path": "Windows/Tabs/Enternet Windows/Components/Switch",
"params": {
"down": down,
"left": left,
"right": right,
"up": up
}
},
"meta": {
"name": name
},
"position": {
"x": x,
"y": y,
"height": 0.1032,
"width": 0.0522
},
"custom": {}
}
def create_line(x: float, y: float, down: bool = True, up: bool = True,
left: bool = False, right: bool = False, name: str = "Line") -> Dict[str, Any]:
"""Create Line component."""
return {
"type": "ia.display.view",
"version": 0,
"props": {
"path": "Windows/Tabs/Enternet Windows/Components/Line",
"params": {
"down": down,
"left": left,
"right": right,
"up": up
}
},
"meta": {
"name": name
},
"position": {
"x": x,
"y": y,
"height": 0.1032,
"width": 0.0522
},
"custom": {}
}
def create_dpm_device(mcm_name: str, dpm_name: str, dpm_ip: str, x: float, y: float) -> Dict[str, Any]:
"""Create DPM device component."""
return {
"type": "ia.display.view",
"version": 0,
"props": {
"path": "Windows/Tabs/Enternet Windows/Components/Device",
"params": {
"ip": dpm_ip,
"name": dpm_name,
"type": "DPM_BLOCK"
}
},
"meta": {
"name": dpm_name
},
"position": {
"x": x,
"y": y,
"height": 0.1032,
"width": 0.0522
},
"custom": {},
"propConfig": {
"props.params.tag": {
"binding": {
"config": {
"fallbackDelay": 2.5,
"mode": "indirect",
"references": {
"fc": "{session.custom.fc}"
},
"tagPath": f"[{{fc}}_SCADA_TAG_PROVIDER]System/{mcm_name}/IO_BLOCK/DPM/{dpm_name}/Communication_Faulted"
},
"transforms": [
{
"expression": "coalesce({value},{view.params.forceFaultStatus},\"comm\")",
"type": "expression"
},
{
"fallback": False,
"inputType": "scalar",
"mappings": [
{
"input": True,
"output": True
},
{
"input": "comm",
"output": "comm"
}
],
"outputType": "scalar",
"type": "map"
}
],
"type": "tag"
}
}
}
}
def determine_device_type(device_name: str) -> str:
"""Determine device type from name."""
name_upper = device_name.upper()
if '_VFD' in name_upper:
return 'APF'
elif '_EX' in name_upper:
return 'EXTENDO'
elif '_FIO' in name_upper or '_SIO' in name_upper or '_FIOM' in name_upper:
return 'FIO_SIO'
elif '_TIPPER' in name_upper:
return 'TIPPER'
elif '_PMM' in name_upper:
return 'PMM'
elif '_ZMX' in name_upper:
return 'ZMX'
else:
return 'APF' # Default
def determine_device_category(device_name: str, device_type: str) -> str:
"""Determine device category for tag path."""
if device_type == 'APF':
return 'Conveyor/VFD'
elif device_type == 'EXTENDO':
return 'Conveyor/EXTENDO'
elif device_type == 'TIPPER':
return 'Conveyor/TIPPER'
elif device_type == 'FIO_SIO':
return 'IO_BLOCK/FIO'
elif device_type == 'PMM':
return 'PMM'
elif device_type == 'ZMX':
return 'Chute/D2C'
else:
return 'Conveyor/VFD'
def create_device(mcm_name: str, device_name: str, device_ip: str,
x: float, y: float) -> Dict[str, Any]:
"""Create a device component."""
device_type = determine_device_type(device_name)
category = determine_device_category(device_name, device_type)
# For ZMX devices, replace _ZMX* suffix with _D2C_CHUTE in the tag path
tag_device_name = device_name
if device_type == 'ZMX':
# Replace _ZMX followed by any digits with _D2C_CHUTE
tag_device_name = re.sub(r'_ZMX\d*', '_D2C_CHUTE', device_name)
return {
"type": "ia.display.view",
"version": 0,
"props": {
"path": "Windows/Tabs/Enternet Windows/Components/Device",
"params": {
"ip": device_ip,
"name": device_name,
"type": device_type
}
},
"meta": {
"name": device_name
},
"position": {
"x": x,
"y": y,
"height": 0.1032,
"width": 0.0522
},
"custom": {},
"propConfig": {
"props.params.tag": {
"binding": {
"config": {
"fallbackDelay": 2.5,
"mode": "indirect",
"references": {
"fc": "{session.custom.fc}"
},
"tagPath": f"[{{fc}}_SCADA_TAG_PROVIDER]System/{mcm_name}/{category}/{tag_device_name}/Communication_Faulted"
},
"transforms": [
{
"expression": "coalesce({value},{view.params.forceFaultStatus},\"comm\")",
"type": "expression"
},
{
"fallback": False,
"inputType": "scalar",
"mappings": [
{
"input": True,
"output": True
},
{
"input": "comm",
"output": "comm"
}
],
"outputType": "scalar",
"type": "map"
}
],
"type": "tag"
}
}
}
}
def extract_mcm_number(full_name: str) -> str:
"""Extract MCM number from full name (e.g., 'SAT9_MCM02' -> 'MCM02')."""
# Look for MCM followed by digits
match = re.search(r'MCM\d+', full_name, re.IGNORECASE)
if match:
return match.group().upper()
return full_name
def generate_mcm_view(mcm_name: str, dpm_data: Dict[str, Dict]) -> List[Dict[str, Any]]:
"""
Generate the complete MCM view structure with multi-row support.
Args:
mcm_name: Name of the MCM (e.g., "MCM02")
dpm_data: Dictionary mapping DPM names to their info and devices
Returns:
List containing the root container with all children
"""
children = []
switch_counter = 1
# Layout constants
start_x = 0.0188
plc_y = 0.0024
plc_switch_y = 0.1028
dpm_start_y = 0.2034
vertical_switch_offset = 0.0966 # Distance from DPM to vertical switch between DPMs
base_dpm_spacing = 0.1974 # Base vertical spacing between DPMs
horizontal_spacing = 0.0527 # Horizontal spacing between devices
device_below_switch = 0.0997 # Distance from switch to device below it
row_height = 0.20 # Height for each row (switch + device + gap)
# Maximum devices per row (fits in 1920px width)
max_devices_per_row = 17
# Add PLC at the top
children.append(create_plc_device(mcm_name, start_x, plc_y))
# Add PLC Switch connecting to DPMs
children.append(create_plc_switch(start_x, plc_switch_y, down=True, up=True))
# Track cumulative Y offset for variable DPM spacing
cumulative_y = dpm_start_y
# Add DPMs with their devices
for dpm_idx, (dpm_name, dpm_info) in enumerate(dpm_data.items()):
dpm_y = cumulative_y
# Add DPM
children.append(create_dpm_device(
mcm_name,
dpm_name,
dpm_info['dpm_ip'],
start_x,
dpm_y
))
# Add devices horizontally from this DPM (with multi-row support)
devices = dpm_info.get('devices', [])
num_rows = max(1, (len(devices) + max_devices_per_row - 1) // max_devices_per_row) if devices else 1
# If multiple rows, create vertical chain from DPM (Lines + Switch)
# Position vertical chain at DPM location to avoid overlap with first row
row_connector_x = start_x
if num_rows > 1:
# Calculate position of last device in first row
first_row_device_count = min(max_devices_per_row, len(devices))
last_device_pos_in_row = first_row_device_count - 1
first_row_last_device_x = start_x + ((last_device_pos_in_row + 1) * horizontal_spacing)
# Add 3 Lines connecting last device of first row to vertical chain
first_row_switch_y = dpm_y # Y position of first row switches
first_row_device_y = first_row_switch_y + device_below_switch # Y position of first row devices
second_row_switch_y = dpm_y + row_height # Y position of second row switches
# Position lines to the right - full spacing away (same distance as between switches)
lines_x = first_row_last_device_x + horizontal_spacing
# Line 1: At same height as first row switch, connects left to switch and down
line1_y = first_row_switch_y
children.append(create_line(
lines_x,
line1_y,
down=True,
up=False,
left=True,
right=False,
name="FirstRowLine1"
))
# Line 2: At same height as first row devices, vertical line up and down
line2_y = first_row_device_y
children.append(create_line(
lines_x,
line2_y,
down=True,
up=True,
left=False,
right=False,
name="FirstRowLine2"
))
# Line 3: At same height as second row switches, connects up and left to vertical chain
line3_y = second_row_switch_y
children.append(create_line(
lines_x,
line3_y,
down=False,
up=True,
left=True,
right=False,
name="FirstRowLine3"
))
# Create vertical connectors: Line - Switch - Line pattern
# Need 3 connectors for proper connection: Line, Switch (branches), Line
num_vertical_connectors = 3 if num_rows == 2 else (num_rows - 1) * 2 + 1
for v_idx in range(num_vertical_connectors):
v_y = dpm_y + ((v_idx + 1) * row_height / 2)
# Determine which row this connector branches to (if any)
connects_to_row = None
if num_rows == 2 and v_idx == 1:
connects_to_row = 1 # Second connector (Switch) connects to second row
elif num_rows > 2:
# For more rows, connect at appropriate intervals
for r in range(1, num_rows):
if v_idx == (r * 2 - 1):
connects_to_row = r
break
# All connectors connect upward and downward in the chain
has_up = True
has_down = True
# Pattern: Line (idx 0), Switch (idx 1), Line (idx 2)
if v_idx == 1:
# Middle is Switch (vertical only, no branching)
children.append(create_switch(
row_connector_x,
v_y,
down=has_down,
up=has_up,
left=False,
right=False, # No branching
name=f"Switch{switch_counter}"
))
switch_counter += 1
else:
# First and third are Lines
line_num = 1 if v_idx == 0 else 2
children.append(create_line(
row_connector_x,
v_y,
down=has_down,
up=has_up,
left=False,
right=False, # Lines don't branch
name=f"VerticalLine{line_num}"
))
for dev_idx, device in enumerate(devices):
# Calculate which row and position in row
row_num = dev_idx // max_devices_per_row
pos_in_row = dev_idx % max_devices_per_row
# Calculate position
if row_num == 0:
# First row: left to right, starting after DPM
device_x = start_x + ((pos_in_row + 1) * horizontal_spacing)
else:
# Additional rows: right to left, starting from first row's last switch position
# Start at same X as last switch of first row and go backwards (leftward)
if num_rows > 1:
device_x = first_row_last_device_x - (pos_in_row * horizontal_spacing)
else:
device_x = start_x + ((pos_in_row + 1) * horizontal_spacing)
switch_y = dpm_y + (row_num * row_height)
device_y = switch_y + device_below_switch
# Determine switch connections
is_first_in_row = (pos_in_row == 0)
is_last_in_row = (pos_in_row == max_devices_per_row - 1) or (dev_idx == len(devices) - 1)
is_last_device_of_first_row = (row_num == 0 and is_last_in_row and num_rows > 1)
# Switch connections
if row_num == 0:
# First row: left to right
connects_left = True
connects_right = (not is_last_in_row) or (num_rows > 1) # Last switch connects right if multiple rows
else:
# Additional rows: right to left (reversed)
connects_right = True
connects_left = not is_last_in_row
# Add switch above device
children.append(create_switch(
device_x,
switch_y,
down=True,
up=False,
left=connects_left,
right=connects_right,
name=f"Switch{switch_counter}"
))
switch_counter += 1
# Add device
children.append(create_device(
mcm_name,
device['name'],
device['ip'],
device_x,
device_y
))
# Calculate total height used by this DPM including all rows
if devices and num_rows > 1:
# Multi-row: With vertical connectors, height includes the vertical Line/Switch chain
num_vertical_connectors = 3 if num_rows == 2 else (num_rows - 1) * 2 + 1
# Vertical connectors start at (v_idx + 1) * row_height / 2
# Last connector position will be where next DPM connects
dpm_total_height = ((num_vertical_connectors + 1) * row_height / 2)
cumulative_y += dpm_total_height
else:
# Single row or no devices - use original simple spacing
# Just add base DPM spacing, vertical switch will be added separately
cumulative_y += base_dpm_spacing
# Add vertical switch below DPM to connect to next DPM (except for the last DPM)
# Only needed if single row or no devices - vertical chain already handles multi-row
if dpm_idx < len(dpm_data) - 1 and (num_rows == 1 or not devices):
switch_y = dpm_y + vertical_switch_offset
children.append(create_switch(
start_x,
switch_y,
down=True,
up=True,
left=False,
right=False,
name=f"Switch{switch_counter}"
))
switch_counter += 1
# Create root container
root = {
"type": "ia.container.coord",
"version": 0,
"props": {
"mode": "percent"
},
"meta": {
"name": mcm_name
},
"position": {},
"custom": {},
"children": children
}
return [root]
def read_csv_file(csv_path: str) -> Dict[str, Dict]:
"""
Read CSV file and extract DPM and device information.
CSV format: DPM,DPM_IP,Name,IP
Returns:
Dictionary mapping DPM names to their info and devices
"""
dpm_data = defaultdict(lambda: {'dpm_ip': None, 'devices': []})
with open(csv_path, 'r', encoding='utf-8-sig') as f: # utf-8-sig handles BOM
reader = csv.DictReader(f)
# Debug: print available columns
first_row = True
for row in reader:
if first_row:
print(f"CSV columns found: {list(row.keys())}")
first_row = False
# Handle potential column name variations
dpm_key = 'DPM' if 'DPM' in row else next((k for k in row.keys() if 'DPM' in k.upper()), None)
dpm_ip_key = 'DPM_IP' if 'DPM_IP' in row else next((k for k in row.keys() if 'DPM_IP' in k.upper() or 'DPM IP' in k.upper()), None)
name_key = 'Name' if 'Name' in row else next((k for k in row.keys() if k.upper() == 'NAME'), None)
ip_key = 'IP' if 'IP' in row else next((k for k in row.keys() if k.upper() == 'IP' and k != dpm_ip_key), None)
if not dpm_key or not dpm_ip_key:
print(f"Warning: Could not find DPM columns. Available: {list(row.keys())}")
continue
dpm_name = row[dpm_key].strip()
dpm_ip = row[dpm_ip_key].strip()
# Skip empty rows
if not dpm_name or not dpm_ip:
continue
# Set DPM IP if not already set
if dpm_data[dpm_name]['dpm_ip'] is None:
dpm_data[dpm_name]['dpm_ip'] = dpm_ip
# Add device if name and IP are provided
if name_key and ip_key:
device_name = row[name_key].strip()
device_ip = row[ip_key].strip()
if device_name and device_ip:
dpm_data[dpm_name]['devices'].append({
'name': device_name,
'ip': device_ip
})
return dict(dpm_data)
def main():
if len(sys.argv) < 3:
print("Usage: python generate_network_topology.py <csv_file> <mcm_name> [output_file]")
print("\nCSV format: DPM,DPM_IP,Name,IP")
print("Example: python generate_network_topology.py dpms.csv SAT9_MCM02 output.json")
sys.exit(1)
csv_file = sys.argv[1]
mcm_name_input = sys.argv[2]
# Extract just the MCM number (e.g., "SAT9_MCM02" -> "MCM02")
mcm_name = extract_mcm_number(mcm_name_input)
output_file = sys.argv[3] if len(sys.argv) > 3 else f"{mcm_name}_view.json"
print(f"Processing {csv_file}...")
print(f"MCM Name: {mcm_name}")
# Read CSV
dpm_data = read_csv_file(csv_file)
# Count total devices
total_devices = sum(len(dpm_info['devices']) for dpm_info in dpm_data.values())
# Generate view
view = generate_mcm_view(mcm_name, dpm_data)
# Write output
with open(output_file, 'w') as f:
json.dump(view, f, indent=2)
print(f"\n✓ Generated view for {mcm_name}")
print(f" - {len(dpm_data)} DPMs")
print(f" - {total_devices} total devices")
print(f"✓ Output written to: {output_file}")
if __name__ == "__main__":
main()