368 lines
15 KiB
Python
368 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
APF Module Boilerplate Model
|
|
============================
|
|
|
|
Model for APF (Armor PowerFlex) modules with support for different horsepower ratings.
|
|
Supports 1, 2, 3, 5, 7.5, and 10 HP variants.
|
|
"""
|
|
|
|
from typing import Dict, Optional, TYPE_CHECKING
|
|
import xml.etree.ElementTree as ET
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
import os
|
|
|
|
if TYPE_CHECKING:
|
|
from excel_data_processor import ModuleData
|
|
|
|
from .mcm_pattern_utils import get_parent_for_apf
|
|
|
|
|
|
@dataclass
|
|
class APFModuleConfig:
|
|
"""Configuration for an APF module instance."""
|
|
name: str # Module name (e.g., "APF1")
|
|
hp: str # Horsepower rating: "1", "2", "3", "5", "7_5", or "10"
|
|
ip_address: str = "192.168.1.10"
|
|
parent_module: str = "SLOT2_EN4TR"
|
|
parent_port_id: str = "2"
|
|
inhibited: bool = False
|
|
major_fault: bool = False
|
|
safety_network: str = "16#0000_4c14_03e7_33a8"
|
|
safety_enabled: bool = True
|
|
input_device_names: Optional[Dict[int, str]] = None
|
|
output_device_names: Optional[Dict[int, str]] = None
|
|
safety_input_names: Optional[Dict[int, str]] = None
|
|
safety_output_names: Optional[Dict[int, str]] = None
|
|
|
|
|
|
class APFModuleGenerator:
|
|
"""Generator for APF module XML with different HP support."""
|
|
|
|
# Mapping of HP values to boilerplate filenames
|
|
HP_BOILERPLATE_MAP = {
|
|
"1": "APF_Module_1_HP.L5X",
|
|
"2": "APF_Module_2_HP.L5X",
|
|
"3": "APF_Module_3_HP.L5X",
|
|
"5": "APF_Module_5_HP.L5X",
|
|
"7_5": "APF_Module_7_5_HP.L5X",
|
|
"7.5": "APF_Module_7_5_HP.L5X", # Allow both formats
|
|
"10": "APF_Module_10_HP.L5X"
|
|
}
|
|
|
|
def __init__(self, config: APFModuleConfig):
|
|
self.config = config
|
|
# Normalize HP value
|
|
if self.config.hp == "7.5":
|
|
self.config.hp = "7_5"
|
|
|
|
# Determine the correct boilerplate file
|
|
if self.config.hp not in self.HP_BOILERPLATE_MAP:
|
|
raise ValueError(f"Unsupported HP value: {self.config.hp}. Supported values: 1, 2, 3, 5, 7.5 (or 7_5), 10")
|
|
|
|
self.boilerplate_filename = self.HP_BOILERPLATE_MAP[self.config.hp]
|
|
# Use project-specific boilerplate directory if set, otherwise default
|
|
boilerplate_dir = os.environ.get('MCM_BOILERPLATE_DIR', 'boilerplate')
|
|
self.boilerplate_path = os.path.join(boilerplate_dir, self.boilerplate_filename)
|
|
self.tree = None
|
|
self.root = None
|
|
|
|
def load_boilerplate(self):
|
|
"""Load the appropriate boilerplate template based on HP rating."""
|
|
if not os.path.exists(self.boilerplate_path):
|
|
raise FileNotFoundError(f"Boilerplate file not found: {self.boilerplate_path}")
|
|
|
|
self.tree = ET.parse(self.boilerplate_path)
|
|
self.root = self.tree.getroot()
|
|
|
|
def update_module_name(self):
|
|
"""Update the module name throughout the XML."""
|
|
# Update in root attributes
|
|
self.root.set("TargetName", self.config.name)
|
|
|
|
# Update Module element
|
|
module = self.root.find(".//Module[@Use='Target']")
|
|
if module is not None:
|
|
module.set("Name", self.config.name)
|
|
|
|
def update_ip_address(self):
|
|
"""Update the IP address in the Ethernet port."""
|
|
port = self.root.find(".//Port[@Type='Ethernet']")
|
|
if port is not None:
|
|
port.set("Address", self.config.ip_address)
|
|
|
|
def update_parent_module(self):
|
|
"""Update parent module references."""
|
|
module = self.root.find(".//Module[@Use='Target']")
|
|
if module is not None:
|
|
module.set("ParentModule", self.config.parent_module)
|
|
module.set("ParentModPortId", self.config.parent_port_id)
|
|
|
|
def update_comments(self):
|
|
"""Update comments for inputs and outputs."""
|
|
# Update standard connection input comments
|
|
if self.config.input_device_names:
|
|
input_comments = self.root.find(".//Connection[@Name='A_Standard_Rev2']/InputTag/Comments")
|
|
if input_comments is not None:
|
|
# Clear existing comments
|
|
input_comments.clear()
|
|
# Add new comments
|
|
for index, name in self.config.input_device_names.items():
|
|
comment = ET.SubElement(input_comments, "Comment")
|
|
if index < 4: # IN_0 through IN_3
|
|
comment.set("Operand", f".IN_{index}")
|
|
else: # IO_0 and IO_1 (index 4 and 5)
|
|
comment.set("Operand", f".IO_{index-4}")
|
|
comment.text = name
|
|
|
|
# Update standard connection output comments
|
|
if self.config.output_device_names:
|
|
output_comments = self.root.find(".//Connection[@Name='A_Standard_Rev2']/OutputTag/Comments")
|
|
if output_comments is not None:
|
|
# Clear existing comments
|
|
output_comments.clear()
|
|
# Add new comments
|
|
for index, name in self.config.output_device_names.items():
|
|
comment = ET.SubElement(output_comments, "Comment")
|
|
comment.set("Operand", f".IO_{index}")
|
|
comment.text = name
|
|
|
|
# Update safety input comments
|
|
if self.config.safety_input_names:
|
|
safety_input_comments = self.root.find(".//Connection[@Name='D_Safety_Input']/InputTag/Comments")
|
|
if safety_input_comments is not None:
|
|
# Clear existing comments
|
|
safety_input_comments.clear()
|
|
# Add new comments
|
|
for index, name in self.config.safety_input_names.items():
|
|
comment = ET.SubElement(safety_input_comments, "Comment")
|
|
comment.set("Operand", f".IN0{index}DATA")
|
|
comment.text = name
|
|
|
|
# Update safety output comments
|
|
if self.config.safety_output_names:
|
|
safety_output_comments = self.root.find(".//Connection[@Name='C_Safety_Output']/OutputTag/Comments")
|
|
if safety_output_comments is not None:
|
|
# Clear existing comments
|
|
safety_output_comments.clear()
|
|
# Add new comments
|
|
for index, name in self.config.safety_output_names.items():
|
|
comment = ET.SubElement(safety_output_comments, "Comment")
|
|
comment.set("Operand", f".OUT0{index}OUTPUT")
|
|
comment.text = name
|
|
|
|
def update_export_date(self):
|
|
"""Update the export date to current time."""
|
|
export_date = datetime.now().strftime("%a %b %d %H:%M:%S %Y")
|
|
self.root.set("ExportDate", export_date)
|
|
|
|
def apply_updates(self):
|
|
"""Apply all updates to the boilerplate."""
|
|
self.update_module_name()
|
|
self.update_ip_address()
|
|
self.update_parent_module()
|
|
self.update_comments()
|
|
self.update_export_date()
|
|
|
|
def save(self, output_path: str):
|
|
"""Save the updated module to file."""
|
|
if self.tree is None:
|
|
raise RuntimeError("No boilerplate loaded. Call load_boilerplate() first.")
|
|
|
|
# Save with proper formatting
|
|
self.tree.write(output_path, encoding='UTF-8', xml_declaration=True)
|
|
|
|
def get_xml_string(self) -> str:
|
|
"""Get the XML as a string."""
|
|
if self.tree is None:
|
|
raise RuntimeError("No boilerplate loaded. Call load_boilerplate() first.")
|
|
|
|
return ET.tostring(self.root, encoding='unicode')
|
|
|
|
# ------------------------------------------------------------------
|
|
# Convenience helpers for EnhancedMCMGenerator refactor
|
|
# ------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def _extract_comment_dictionaries(module_data: 'ModuleData') -> (
|
|
Dict[int, str], Dict[int, str], Dict[int, str], Dict[int, str]
|
|
):
|
|
"""Translate the raw Excel `ModuleData` into the comment dictionaries
|
|
expected by APFModuleGenerator.update_comments().
|
|
|
|
The logic is a verbatim copy of what previously lived in
|
|
EnhancedMCMGenerator._add_apf_modules so that behaviour stays
|
|
identical after the refactor.
|
|
"""
|
|
|
|
input_device_names: Dict[int, str] = {}
|
|
output_device_names: Dict[int, str] = {}
|
|
safety_input_names: Dict[int, str] = {}
|
|
safety_output_names: Dict[int, str] = {}
|
|
|
|
for io_mapping in module_data.io_mappings:
|
|
if not io_mapping.description:
|
|
continue
|
|
|
|
# Handle SPARE entries explicitly
|
|
comment = (
|
|
"SPARE" if io_mapping.description.upper() == "SPARE" else io_mapping.description
|
|
)
|
|
|
|
io_path = io_mapping.io_path
|
|
if not io_path or ":" not in io_path:
|
|
# Skip malformed IO_PATH strings (keeps old behaviour)
|
|
continue
|
|
|
|
path_parts = io_path.split(":", 1)[1]
|
|
if "." not in path_parts:
|
|
continue
|
|
|
|
channel, terminal = path_parts.split(".", 1)
|
|
|
|
channel_upper = channel.upper()
|
|
|
|
if channel_upper == "I" and terminal.startswith("In_"):
|
|
# Standard input: I.In_0, I.In_1, …
|
|
try:
|
|
index = int(terminal.split("_")[1])
|
|
input_device_names[index] = comment
|
|
except (ValueError, IndexError):
|
|
continue
|
|
|
|
elif terminal.startswith("IO_"):
|
|
# IO channel: I.IO_0, O.IO_1, …
|
|
try:
|
|
index = int(terminal.split("_")[1])
|
|
except (ValueError, IndexError):
|
|
continue
|
|
|
|
if channel_upper == "O":
|
|
output_device_names[index] = comment
|
|
elif channel_upper == "I":
|
|
# For inputs IO channels start at index 4
|
|
input_device_names[index + 4] = comment
|
|
|
|
elif channel_upper == "SI":
|
|
# Safety input: SI.In00Data, SI.In01Data, …
|
|
if terminal.startswith("In") and terminal.endswith("Data"):
|
|
try:
|
|
index = int(terminal[2:-4]) # extract NN from InNNDATA
|
|
safety_input_names[index] = comment
|
|
except ValueError:
|
|
continue
|
|
|
|
elif channel_upper == "SO":
|
|
# Safety output: SO.Out00Output, …
|
|
if terminal.startswith("Out") and terminal.endswith("Output"):
|
|
try:
|
|
index = int(terminal[3:-6]) # extract NN from OutNNOUTPUT
|
|
safety_output_names[index] = comment
|
|
except ValueError:
|
|
continue
|
|
|
|
# Any other variants are ignored (same as before)
|
|
|
|
return (
|
|
input_device_names,
|
|
output_device_names,
|
|
safety_input_names,
|
|
safety_output_names,
|
|
)
|
|
|
|
@classmethod
|
|
def from_excel(
|
|
cls,
|
|
module_data: 'ModuleData',
|
|
hp: str,
|
|
*,
|
|
ip_address: str = "",
|
|
parent_module: str = None,
|
|
parent_port_id: str = "2",
|
|
) -> 'APFModuleGenerator':
|
|
"""Factory that builds a fully-configured generator directly from
|
|
ExcelDataProcessor.ModuleData.
|
|
|
|
It returns an *instance* (already loaded and updated) so callers can
|
|
access .root or save it immediately.
|
|
"""
|
|
|
|
from excel_data_processor import ModuleData # local import to avoid cycle at top level
|
|
|
|
if not isinstance(module_data, ModuleData):
|
|
raise TypeError("module_data must be an Excel ModuleData instance")
|
|
|
|
# Determine parent module using pattern matching if not explicitly provided
|
|
if parent_module is None:
|
|
has_ip = bool(module_data.ip_address and module_data.ip_address.strip())
|
|
parent_module = get_parent_for_apf(module_data.tagname, has_ip_address=has_ip)
|
|
|
|
(
|
|
input_device_names,
|
|
output_device_names,
|
|
safety_input_names,
|
|
safety_output_names,
|
|
) = cls._extract_comment_dictionaries(module_data)
|
|
|
|
config = create_apf_module(
|
|
name=module_data.tagname,
|
|
hp=hp,
|
|
ip_address=ip_address or module_data.ip_address or "192.168.1.10",
|
|
parent_module=parent_module,
|
|
parent_port_id=parent_port_id,
|
|
input_device_names=input_device_names if input_device_names else None,
|
|
output_device_names=output_device_names if output_device_names else None,
|
|
safety_input_names=safety_input_names if safety_input_names else None,
|
|
safety_output_names=safety_output_names if safety_output_names else None,
|
|
)
|
|
|
|
generator = cls(config)
|
|
generator.load_boilerplate()
|
|
generator.apply_updates()
|
|
return generator
|
|
|
|
|
|
def create_apf_module(name: str, hp: str, ip_address: str = "192.168.1.10",
|
|
parent_module: str = "SLOT2_EN4TR", parent_port_id: str = "2",
|
|
input_device_names: Optional[Dict[int, str]] = None,
|
|
output_device_names: Optional[Dict[int, str]] = None,
|
|
safety_input_names: Optional[Dict[int, str]] = None,
|
|
safety_output_names: Optional[Dict[int, str]] = None) -> APFModuleConfig:
|
|
"""Factory function to create an APF module configuration."""
|
|
return APFModuleConfig(
|
|
name=name,
|
|
hp=hp,
|
|
ip_address=ip_address,
|
|
parent_module=parent_module,
|
|
parent_port_id=parent_port_id,
|
|
input_device_names=input_device_names,
|
|
output_device_names=output_device_names,
|
|
safety_input_names=safety_input_names,
|
|
safety_output_names=safety_output_names
|
|
)
|
|
|
|
|
|
# Example usage
|
|
if __name__ == "__main__":
|
|
# Example: Create a 5 HP APF module
|
|
config = create_apf_module(
|
|
name="APF1_5HP",
|
|
hp="5", # Specify the horsepower
|
|
ip_address="192.168.1.10",
|
|
input_device_names={
|
|
0: "E-Stop",
|
|
1: "Start PB",
|
|
2: "Stop PB",
|
|
3: "Reset",
|
|
4: "Speed Ref",
|
|
5: "Enable"
|
|
}
|
|
)
|
|
|
|
generator = APFModuleGenerator(config)
|
|
generator.load_boilerplate()
|
|
generator.apply_updates()
|
|
generator.save("generated/APF1_5HP.L5X")
|
|
|
|
print(f"Generated {config.hp} HP APF module: {config.name}") |