#!/usr/bin/env python3 """ Script to generate Ignition MCM view JSON from CSV input. CSV format: DPM,DPM_IP,Name,IP """ import csv import json import sys import re from typing import List, Dict, Any from collections import defaultdict def create_plc_device(mcm_name: str, x: float, y: float) -> Dict[str, Any]: """Create PLC device component.""" return { "type": "ia.display.view", "version": 0, "props": { "path": "Windows/Tabs/Enternet Windows/Components/Device", "params": { "name": mcm_name, "type": "PLC" } }, "meta": { "name": "PLC" }, "position": { "x": x, "y": y, "height": 0.1032, "width": 0.0522 }, "custom": {}, "propConfig": { "props.params.tag": { "binding": { "config": { "fallbackDelay": 2.5, "mode": "indirect", "references": { "fc": "{session.custom.fc}" }, "tagPath": f"[{{fc}}_SCADA_TAG_PROVIDER]System/{mcm_name}/Rack/Communication_Faulted" }, "transforms": [ { "expression": "coalesce({value},{view.params.forceFaultStatus},\"comm\")", "type": "expression" }, { "fallback": False, "inputType": "scalar", "mappings": [ { "input": True, "output": True }, { "input": "comm", "output": "comm" } ], "outputType": "scalar", "type": "map" } ], "type": "tag" } } } } def create_plc_switch(x: float, y: float, down: bool = True, up: bool = True, left: bool = False, right: bool = False, name: str = "PLC_Switch") -> Dict[str, Any]: """Create PLC Switch component.""" return { "type": "ia.display.view", "version": 0, "props": { "path": "Windows/Tabs/Enternet Windows/Components/PLC_Switch", "params": { "down": down, "left": left, "right": right, "up": up } }, "meta": { "name": name }, "position": { "x": x, "y": y, "height": 0.1032, "width": 0.0522 }, "custom": {} } def create_switch(x: float, y: float, down: bool = True, up: bool = True, left: bool = False, right: bool = False, name: str = "Switch") -> Dict[str, Any]: """Create Switch component.""" return { "type": "ia.display.view", "version": 0, "props": { "path": "Windows/Tabs/Enternet Windows/Components/Switch", "params": { "down": down, "left": left, "right": right, "up": up } }, "meta": { "name": name }, "position": { "x": x, "y": y, "height": 0.1032, "width": 0.0522 }, "custom": {} } def create_line(x: float, y: float, down: bool = True, up: bool = True, left: bool = False, right: bool = False, name: str = "Line") -> Dict[str, Any]: """Create Line component.""" return { "type": "ia.display.view", "version": 0, "props": { "path": "Windows/Tabs/Enternet Windows/Components/Line", "params": { "down": down, "left": left, "right": right, "up": up } }, "meta": { "name": name }, "position": { "x": x, "y": y, "height": 0.1032, "width": 0.0522 }, "custom": {} } def create_dpm_device(mcm_name: str, dpm_name: str, dpm_ip: str, x: float, y: float) -> Dict[str, Any]: """Create DPM device component.""" return { "type": "ia.display.view", "version": 0, "props": { "path": "Windows/Tabs/Enternet Windows/Components/Device", "params": { "ip": dpm_ip, "name": dpm_name, "type": "DPM_BLOCK" } }, "meta": { "name": dpm_name }, "position": { "x": x, "y": y, "height": 0.1032, "width": 0.0522 }, "custom": {}, "propConfig": { "props.params.tag": { "binding": { "config": { "fallbackDelay": 2.5, "mode": "indirect", "references": { "fc": "{session.custom.fc}" }, "tagPath": f"[{{fc}}_SCADA_TAG_PROVIDER]System/{mcm_name}/IO_BLOCK/DPM/{dpm_name}/Communication_Faulted" }, "transforms": [ { "expression": "coalesce({value},{view.params.forceFaultStatus},\"comm\")", "type": "expression" }, { "fallback": False, "inputType": "scalar", "mappings": [ { "input": True, "output": True }, { "input": "comm", "output": "comm" } ], "outputType": "scalar", "type": "map" } ], "type": "tag" } } } } def determine_device_type(device_name: str) -> str: """Determine device type from name.""" name_upper = device_name.upper() if '_VFD' in name_upper: return 'APF' elif '_EX' in name_upper: return 'EXTENDO' elif '_FIO' in name_upper or '_SIO' in name_upper or '_FIOM' in name_upper: return 'FIO_SIO' elif '_TIPPER' in name_upper: return 'TIPPER' elif '_PMM' in name_upper: return 'PMM' elif '_ZMX' in name_upper: return 'ZMX' else: return 'APF' # Default def determine_device_category(device_name: str, device_type: str) -> str: """Determine device category for tag path.""" if device_type == 'APF': return 'Conveyor/VFD' elif device_type == 'EXTENDO': return 'Conveyor/EXTENDO' elif device_type == 'TIPPER': return 'Conveyor/TIPPER' elif device_type == 'FIO_SIO': return 'IO_BLOCK/FIO' elif device_type == 'PMM': return 'PMM' elif device_type == 'ZMX': return 'Chute/D2C' else: return 'Conveyor/VFD' def create_device(mcm_name: str, device_name: str, device_ip: str, x: float, y: float) -> Dict[str, Any]: """Create a device component.""" device_type = determine_device_type(device_name) category = determine_device_category(device_name, device_type) # For ZMX devices, replace _ZMX* suffix with _D2C_CHUTE in the tag path tag_device_name = device_name if device_type == 'ZMX': # Replace _ZMX followed by any digits with _D2C_CHUTE tag_device_name = re.sub(r'_ZMX\d*', '_D2C_CHUTE', device_name) return { "type": "ia.display.view", "version": 0, "props": { "path": "Windows/Tabs/Enternet Windows/Components/Device", "params": { "ip": device_ip, "name": device_name, "type": device_type } }, "meta": { "name": device_name }, "position": { "x": x, "y": y, "height": 0.1032, "width": 0.0522 }, "custom": {}, "propConfig": { "props.params.tag": { "binding": { "config": { "fallbackDelay": 2.5, "mode": "indirect", "references": { "fc": "{session.custom.fc}" }, "tagPath": f"[{{fc}}_SCADA_TAG_PROVIDER]System/{mcm_name}/{category}/{tag_device_name}/Communication_Faulted" }, "transforms": [ { "expression": "coalesce({value},{view.params.forceFaultStatus},\"comm\")", "type": "expression" }, { "fallback": False, "inputType": "scalar", "mappings": [ { "input": True, "output": True }, { "input": "comm", "output": "comm" } ], "outputType": "scalar", "type": "map" } ], "type": "tag" } } } } def extract_mcm_number(full_name: str) -> str: """Extract MCM number from full name (e.g., 'SAT9_MCM02' -> 'MCM02').""" # Look for MCM followed by digits match = re.search(r'MCM\d+', full_name, re.IGNORECASE) if match: return match.group().upper() return full_name def generate_mcm_view(mcm_name: str, dpm_data: Dict[str, Dict]) -> List[Dict[str, Any]]: """ Generate the complete MCM view structure with multi-row support. Args: mcm_name: Name of the MCM (e.g., "MCM02") dpm_data: Dictionary mapping DPM names to their info and devices Returns: List containing the root container with all children """ children = [] switch_counter = 1 # Layout constants start_x = 0.0188 plc_y = 0.0024 plc_switch_y = 0.1028 dpm_start_y = 0.2034 vertical_switch_offset = 0.0966 # Distance from DPM to vertical switch between DPMs base_dpm_spacing = 0.1974 # Base vertical spacing between DPMs horizontal_spacing = 0.0527 # Horizontal spacing between devices device_below_switch = 0.0997 # Distance from switch to device below it row_height = 0.20 # Height for each row (switch + device + gap) # Maximum devices per row (fits in 1920px width) max_devices_per_row = 17 # Add PLC at the top children.append(create_plc_device(mcm_name, start_x, plc_y)) # Add PLC Switch connecting to DPMs children.append(create_plc_switch(start_x, plc_switch_y, down=True, up=True)) # Track cumulative Y offset for variable DPM spacing cumulative_y = dpm_start_y # Add DPMs with their devices for dpm_idx, (dpm_name, dpm_info) in enumerate(dpm_data.items()): dpm_y = cumulative_y # Add DPM children.append(create_dpm_device( mcm_name, dpm_name, dpm_info['dpm_ip'], start_x, dpm_y )) # Add devices horizontally from this DPM (with multi-row support) devices = dpm_info.get('devices', []) num_rows = max(1, (len(devices) + max_devices_per_row - 1) // max_devices_per_row) if devices else 1 # If multiple rows, create vertical chain from DPM (Lines + Switch) # Position vertical chain at DPM location to avoid overlap with first row row_connector_x = start_x if num_rows > 1: # Calculate position of last device in first row first_row_device_count = min(max_devices_per_row, len(devices)) last_device_pos_in_row = first_row_device_count - 1 first_row_last_device_x = start_x + ((last_device_pos_in_row + 1) * horizontal_spacing) # Add 3 Lines connecting last device of first row to vertical chain first_row_switch_y = dpm_y # Y position of first row switches first_row_device_y = first_row_switch_y + device_below_switch # Y position of first row devices second_row_switch_y = dpm_y + row_height # Y position of second row switches # Position lines to the right - full spacing away (same distance as between switches) lines_x = first_row_last_device_x + horizontal_spacing # Line 1: At same height as first row switch, connects left to switch and down line1_y = first_row_switch_y children.append(create_line( lines_x, line1_y, down=True, up=False, left=True, right=False, name="FirstRowLine1" )) # Line 2: At same height as first row devices, vertical line up and down line2_y = first_row_device_y children.append(create_line( lines_x, line2_y, down=True, up=True, left=False, right=False, name="FirstRowLine2" )) # Line 3: At same height as second row switches, connects up and left to vertical chain line3_y = second_row_switch_y children.append(create_line( lines_x, line3_y, down=False, up=True, left=True, right=False, name="FirstRowLine3" )) # Create vertical connectors: Line - Switch - Line pattern # Need 3 connectors for proper connection: Line, Switch (branches), Line num_vertical_connectors = 3 if num_rows == 2 else (num_rows - 1) * 2 + 1 for v_idx in range(num_vertical_connectors): v_y = dpm_y + ((v_idx + 1) * row_height / 2) # Determine which row this connector branches to (if any) connects_to_row = None if num_rows == 2 and v_idx == 1: connects_to_row = 1 # Second connector (Switch) connects to second row elif num_rows > 2: # For more rows, connect at appropriate intervals for r in range(1, num_rows): if v_idx == (r * 2 - 1): connects_to_row = r break # All connectors connect upward and downward in the chain has_up = True has_down = True # Pattern: Line (idx 0), Switch (idx 1), Line (idx 2) if v_idx == 1: # Middle is Switch (vertical only, no branching) children.append(create_switch( row_connector_x, v_y, down=has_down, up=has_up, left=False, right=False, # No branching name=f"Switch{switch_counter}" )) switch_counter += 1 else: # First and third are Lines line_num = 1 if v_idx == 0 else 2 children.append(create_line( row_connector_x, v_y, down=has_down, up=has_up, left=False, right=False, # Lines don't branch name=f"VerticalLine{line_num}" )) for dev_idx, device in enumerate(devices): # Calculate which row and position in row row_num = dev_idx // max_devices_per_row pos_in_row = dev_idx % max_devices_per_row # Calculate position if row_num == 0: # First row: left to right, starting after DPM device_x = start_x + ((pos_in_row + 1) * horizontal_spacing) else: # Additional rows: right to left, starting from first row's last switch position # Start at same X as last switch of first row and go backwards (leftward) if num_rows > 1: device_x = first_row_last_device_x - (pos_in_row * horizontal_spacing) else: device_x = start_x + ((pos_in_row + 1) * horizontal_spacing) switch_y = dpm_y + (row_num * row_height) device_y = switch_y + device_below_switch # Determine switch connections is_first_in_row = (pos_in_row == 0) is_last_in_row = (pos_in_row == max_devices_per_row - 1) or (dev_idx == len(devices) - 1) is_last_device_of_first_row = (row_num == 0 and is_last_in_row and num_rows > 1) # Switch connections if row_num == 0: # First row: left to right connects_left = True connects_right = (not is_last_in_row) or (num_rows > 1) # Last switch connects right if multiple rows else: # Additional rows: right to left (reversed) connects_right = True connects_left = not is_last_in_row # Add switch above device children.append(create_switch( device_x, switch_y, down=True, up=False, left=connects_left, right=connects_right, name=f"Switch{switch_counter}" )) switch_counter += 1 # Add device children.append(create_device( mcm_name, device['name'], device['ip'], device_x, device_y )) # Calculate total height used by this DPM including all rows if devices and num_rows > 1: # Multi-row: With vertical connectors, height includes the vertical Line/Switch chain num_vertical_connectors = 3 if num_rows == 2 else (num_rows - 1) * 2 + 1 # Vertical connectors start at (v_idx + 1) * row_height / 2 # Last connector position will be where next DPM connects dpm_total_height = ((num_vertical_connectors + 1) * row_height / 2) cumulative_y += dpm_total_height else: # Single row or no devices - use original simple spacing # Just add base DPM spacing, vertical switch will be added separately cumulative_y += base_dpm_spacing # Add vertical switch below DPM to connect to next DPM (except for the last DPM) # Only needed if single row or no devices - vertical chain already handles multi-row if dpm_idx < len(dpm_data) - 1 and (num_rows == 1 or not devices): switch_y = dpm_y + vertical_switch_offset children.append(create_switch( start_x, switch_y, down=True, up=True, left=False, right=False, name=f"Switch{switch_counter}" )) switch_counter += 1 # Create root container root = { "type": "ia.container.coord", "version": 0, "props": { "mode": "percent" }, "meta": { "name": mcm_name }, "position": {}, "custom": {}, "children": children } return [root] def read_csv_file(csv_path: str) -> Dict[str, Dict]: """ Read CSV file and extract DPM and device information. CSV format: DPM,DPM_IP,Name,IP Returns: Dictionary mapping DPM names to their info and devices """ dpm_data = defaultdict(lambda: {'dpm_ip': None, 'devices': []}) with open(csv_path, 'r', encoding='utf-8-sig') as f: # utf-8-sig handles BOM reader = csv.DictReader(f) # Debug: print available columns first_row = True for row in reader: if first_row: print(f"CSV columns found: {list(row.keys())}") first_row = False # Handle potential column name variations dpm_key = 'DPM' if 'DPM' in row else next((k for k in row.keys() if 'DPM' in k.upper()), None) dpm_ip_key = 'DPM_IP' if 'DPM_IP' in row else next((k for k in row.keys() if 'DPM_IP' in k.upper() or 'DPM IP' in k.upper()), None) name_key = 'Name' if 'Name' in row else next((k for k in row.keys() if k.upper() == 'NAME'), None) ip_key = 'IP' if 'IP' in row else next((k for k in row.keys() if k.upper() == 'IP' and k != dpm_ip_key), None) if not dpm_key or not dpm_ip_key: print(f"Warning: Could not find DPM columns. Available: {list(row.keys())}") continue dpm_name = row[dpm_key].strip() dpm_ip = row[dpm_ip_key].strip() # Skip empty rows if not dpm_name or not dpm_ip: continue # Set DPM IP if not already set if dpm_data[dpm_name]['dpm_ip'] is None: dpm_data[dpm_name]['dpm_ip'] = dpm_ip # Add device if name and IP are provided if name_key and ip_key: device_name = row[name_key].strip() device_ip = row[ip_key].strip() if device_name and device_ip: dpm_data[dpm_name]['devices'].append({ 'name': device_name, 'ip': device_ip }) return dict(dpm_data) def main(): if len(sys.argv) < 3: print("Usage: python generate_network_topology.py [output_file]") print("\nCSV format: DPM,DPM_IP,Name,IP") print("Example: python generate_network_topology.py dpms.csv SAT9_MCM02 output.json") sys.exit(1) csv_file = sys.argv[1] mcm_name_input = sys.argv[2] # Extract just the MCM number (e.g., "SAT9_MCM02" -> "MCM02") mcm_name = extract_mcm_number(mcm_name_input) output_file = sys.argv[3] if len(sys.argv) > 3 else f"{mcm_name}_view.json" print(f"Processing {csv_file}...") print(f"MCM Name: {mcm_name}") # Read CSV dpm_data = read_csv_file(csv_file) # Count total devices total_devices = sum(len(dpm_info['devices']) for dpm_info in dpm_data.values()) # Generate view view = generate_mcm_view(mcm_name, dpm_data) # Write output with open(output_file, 'w') as f: json.dump(view, f, indent=2) print(f"\nāœ“ Generated view for {mcm_name}") print(f" - {len(dpm_data)} DPMs") print(f" - {total_devices} total devices") print(f"āœ“ Output written to: {output_file}") if __name__ == "__main__": main()