565 lines
22 KiB
Python
565 lines
22 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
M12DR Module Boilerplate Model
|
|
==============================
|
|
|
|
Model for M12DR (5032-8IOLM12DR/A) modules with support for different configurations.
|
|
Supports PalletBuildMaster and D2CMaster variants.
|
|
"""
|
|
|
|
from typing import Dict, Optional, TYPE_CHECKING
|
|
import xml.etree.ElementTree as ET
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
import os
|
|
import re
|
|
|
|
if TYPE_CHECKING:
|
|
from excel_data_processor import ModuleData, IOPathMapping
|
|
|
|
from .mcm_pattern_utils import get_parent_for_m12dr, check_and_register_en4tr_for_device
|
|
|
|
|
|
@dataclass
|
|
class M12DRModuleConfig:
|
|
"""Configuration for an M12DR module instance."""
|
|
name: str # Module name (e.g., "PalletBuildMaster1")
|
|
variant: str # Module variant: "PalletBuildMaster" or "D2CMaster"
|
|
ip_address: str = "192.168.1.1"
|
|
parent_module: str = "SLOT1_EN4TR"
|
|
parent_port_id: str = "2"
|
|
inhibited: bool = False
|
|
major_fault: bool = False
|
|
input_comments: Optional[Dict[str, str]] = None # Key: operand (e.g., ".IOLINK00"), Value: comment
|
|
output_comments: Optional[Dict[str, str]] = None # Key: operand (e.g., ".PT07"), Value: comment
|
|
|
|
|
|
class M12DRModuleGenerator:
|
|
"""Generator for M12DR module XML with different variant support."""
|
|
|
|
# Mapping of variants to default boilerplate filenames (fallback)
|
|
VARIANT_BOILERPLATE_MAP = {
|
|
"PalletBuildMaster": "PalletBuildMaster_Module.L5X",
|
|
"D2CMaster": "D2CMaster_Module.L5X",
|
|
"PDP_FIO": "PDP_FIO_Module.L5X",
|
|
"Sorter": "Sorter_FIOM_Module.L5X",
|
|
"Sorter_Solenoid": "Sorter_Solenoid_FIO_Module.L5X",
|
|
"FIOM2_Master": "FIOM2_Master_Module.L5X",
|
|
#"UL_FIO": "PDP_FIO_Module.L5X", # Fallback to PDP_FIO if specific UL boilerplate not found
|
|
#"FIO_GENERIC": "PDP_FIO_Module.L5X" # Fallback to PDP_FIO for generic FIO modules
|
|
}
|
|
|
|
def __init__(self, config: M12DRModuleConfig):
|
|
self.config = config
|
|
|
|
# Determine the correct boilerplate file
|
|
self.boilerplate_filename = self._determine_boilerplate_filename()
|
|
# Use project-specific boilerplate directory if set, otherwise default
|
|
boilerplate_dir = os.environ.get('MCM_BOILERPLATE_DIR', 'boilerplate')
|
|
self.boilerplate_path = os.path.join(boilerplate_dir, self.boilerplate_filename)
|
|
self.tree = None
|
|
self.root = None
|
|
|
|
# Cache for operand patterns extracted from boilerplate
|
|
self._operand_patterns = None
|
|
|
|
def _determine_boilerplate_filename(self) -> str:
|
|
"""Determine the boilerplate filename to use.
|
|
|
|
Priority:
|
|
1. Check for module-specific boilerplate: {module_name}_Module.L5X
|
|
2. Fall back to variant-based boilerplate
|
|
"""
|
|
# First, try module-specific boilerplate
|
|
module_specific_filename = f"{self.config.name}_Module.L5X"
|
|
# Use project-specific boilerplate directory if set, otherwise default
|
|
boilerplate_dir = os.environ.get('MCM_BOILERPLATE_DIR', 'boilerplate')
|
|
module_specific_path = os.path.join(boilerplate_dir, module_specific_filename)
|
|
|
|
if os.path.exists(module_specific_path):
|
|
print(f" {self.config.name} (FIO {self.config.variant}): Using module-specific boilerplate {module_specific_filename}")
|
|
return module_specific_filename
|
|
|
|
# Fall back to variant-based boilerplate
|
|
if self.config.variant not in self.VARIANT_BOILERPLATE_MAP:
|
|
raise ValueError(f"Unsupported variant: {self.config.variant}. Supported variants: {list(self.VARIANT_BOILERPLATE_MAP.keys())}")
|
|
|
|
fallback_filename = self.VARIANT_BOILERPLATE_MAP[self.config.variant]
|
|
print(f" {self.config.name} (FIO {self.config.variant}): Using variant boilerplate {fallback_filename}")
|
|
return fallback_filename
|
|
|
|
def load_boilerplate(self):
|
|
"""Load the appropriate boilerplate template based on variant."""
|
|
if not os.path.exists(self.boilerplate_path):
|
|
raise FileNotFoundError(f"Boilerplate file not found: {self.boilerplate_path}")
|
|
|
|
self.tree = ET.parse(self.boilerplate_path)
|
|
self.root = self.tree.getroot()
|
|
|
|
def _extract_operand_patterns(self) -> Dict[str, str]:
|
|
"""Extract operand patterns from the boilerplate XML structure.
|
|
|
|
Returns dict mapping terminal number to operand format:
|
|
e.g., {"04": ".PT04.DATA", "00": ".IOLINK00"}
|
|
"""
|
|
if self._operand_patterns is not None:
|
|
return self._operand_patterns
|
|
|
|
patterns = {}
|
|
|
|
# Find InputTag and OutputTag structures
|
|
input_tag = self.root.find(".//Connection[@Name='Data']/InputTag")
|
|
output_tag = self.root.find(".//Connection[@Name='Data']/OutputTag")
|
|
|
|
# Get variant-specific formatting rules
|
|
use_data_suffix = self._should_use_data_suffix()
|
|
|
|
# Process InputTag structure
|
|
if input_tag is not None:
|
|
structure = input_tag.find(".//Structure")
|
|
if structure is not None:
|
|
self._extract_patterns_from_structure(structure, patterns, use_data_suffix)
|
|
|
|
# Process OutputTag structure
|
|
if output_tag is not None:
|
|
structure = output_tag.find(".//Structure")
|
|
if structure is not None:
|
|
self._extract_patterns_from_structure(structure, patterns, use_data_suffix)
|
|
|
|
self._operand_patterns = patterns
|
|
return patterns
|
|
|
|
def _should_use_data_suffix(self) -> bool:
|
|
"""Determine if this variant should use .DATA suffix for digital I/O based on variant type."""
|
|
# PDP_FIO uses .DATA suffix for digital I/O
|
|
return True
|
|
|
|
def _extract_patterns_from_structure(self, structure: ET.Element, patterns: Dict[str, str], use_data_suffix: bool):
|
|
"""Extract operand patterns from a Structure element."""
|
|
for member in structure.findall("StructureMember"):
|
|
name = member.get("Name")
|
|
data_type = member.get("DataType")
|
|
|
|
if not name or not data_type:
|
|
continue
|
|
|
|
# Handle IOLink channels
|
|
if name.startswith("IOLink") and "IOL" in data_type:
|
|
# Extract number from IOLink00, IOLink02, etc.
|
|
number_match = re.search(r'IOLink(\d+)', name)
|
|
if number_match:
|
|
number = number_match.group(1)
|
|
patterns[number] = f".IOLINK{number}"
|
|
|
|
# Handle Point channels (digital I/O)
|
|
elif name.startswith("Pt") and ("DI" in data_type or "DO" in data_type):
|
|
# Extract number from Pt04, Pt05, etc.
|
|
number_match = re.search(r'Pt(\d+)', name)
|
|
if number_match:
|
|
number = number_match.group(1)
|
|
if use_data_suffix:
|
|
patterns[number] = f".PT{number}.DATA"
|
|
else:
|
|
patterns[number] = f".PT{number}"
|
|
|
|
def _convert_terminal_to_operand_dynamic(self, terminal: str, signal: str) -> str:
|
|
"""Convert terminal and signal to operand using patterns from boilerplate.
|
|
|
|
Args:
|
|
terminal: Terminal like "IO4", "IO00", etc.
|
|
signal: Signal type like "I", "O", "IOLINK", "SPARE"
|
|
|
|
Returns:
|
|
Operand string like ".PT04.DATA", ".IOLINK00", etc.
|
|
"""
|
|
if not terminal or not signal:
|
|
return ""
|
|
|
|
# Get operand patterns from boilerplate
|
|
patterns = self._extract_operand_patterns()
|
|
|
|
terminal = terminal.upper().strip()
|
|
signal = signal.upper().strip()
|
|
|
|
# Extract number from terminal (IO4 -> 04, IO00 -> 00, etc.)
|
|
if terminal.startswith("IO"):
|
|
try:
|
|
terminal_num = terminal[2:] # Remove "IO" prefix
|
|
# Pad to 2 digits if needed
|
|
terminal_num = terminal_num.zfill(2)
|
|
|
|
# For IOLINK signals, look for IOLINK pattern
|
|
if signal == "IOLINK":
|
|
iolink_operand = f".IOLINK{terminal_num}"
|
|
if terminal_num in patterns and patterns[terminal_num] == iolink_operand:
|
|
return iolink_operand
|
|
|
|
# For digital I/O signals, look for PT pattern
|
|
if signal in ("I", "O", "SPARE"):
|
|
# Check if we have a pattern for this terminal
|
|
if terminal_num in patterns:
|
|
return patterns[terminal_num]
|
|
# Fallback to .PT format if no pattern found
|
|
return f".PT{terminal_num}.DATA"
|
|
|
|
except (ValueError, IndexError):
|
|
return ""
|
|
|
|
# Handle direct operand format (starts with .)
|
|
if terminal.startswith('.'):
|
|
return terminal
|
|
|
|
return ""
|
|
|
|
def update_module_name(self):
|
|
"""Update the module name throughout the XML."""
|
|
# Update in root attributes
|
|
self.root.set("TargetName", self.config.name)
|
|
|
|
# Update Module element
|
|
module = self.root.find(".//Module[@Use='Target']")
|
|
if module is not None:
|
|
module.set("Name", self.config.name)
|
|
|
|
def update_ip_address(self):
|
|
"""Update the IP address in the Ethernet port."""
|
|
port = self.root.find(".//Port[@Type='Ethernet']")
|
|
if port is not None:
|
|
port.set("Address", self.config.ip_address)
|
|
|
|
def update_parent_module(self):
|
|
"""Update parent module references."""
|
|
module = self.root.find(".//Module[@Use='Target']")
|
|
if module is not None:
|
|
module.set("ParentModule", self.config.parent_module)
|
|
module.set("ParentModPortId", self.config.parent_port_id)
|
|
|
|
def update_major_revision(self):
|
|
"""Update the Major revision to 1."""
|
|
module = self.root.find(".//Module[@Use='Target']")
|
|
if module is not None:
|
|
module.set("Major", "1")
|
|
|
|
def update_input_comments(self):
|
|
"""Update input tag comments."""
|
|
input_tag = self.root.find(".//Connection[@Name='Data']/InputTag")
|
|
if input_tag is not None and self.config.input_comments:
|
|
# Find or create Comments section
|
|
comments_section = input_tag.find("Comments")
|
|
if comments_section is None:
|
|
# Create Comments section as the first child
|
|
comments_section = ET.Element("Comments")
|
|
input_tag.insert(0, comments_section)
|
|
else:
|
|
# Clear existing comments
|
|
comments_section.clear()
|
|
|
|
# Add new comments
|
|
for operand, comment_text in self.config.input_comments.items():
|
|
comment = ET.SubElement(comments_section, "Comment")
|
|
comment.set("Operand", operand)
|
|
comment.text = comment_text
|
|
|
|
def update_output_comments(self):
|
|
"""Update output tag comments."""
|
|
output_tag = self.root.find(".//Connection[@Name='Data']/OutputTag")
|
|
if output_tag is not None and self.config.output_comments:
|
|
# Find or create Comments section
|
|
comments_section = output_tag.find("Comments")
|
|
if comments_section is None:
|
|
# Create Comments section as the first child
|
|
comments_section = ET.Element("Comments")
|
|
output_tag.insert(0, comments_section)
|
|
else:
|
|
# Clear existing comments
|
|
comments_section.clear()
|
|
|
|
# Add new comments
|
|
for operand, comment_text in self.config.output_comments.items():
|
|
comment = ET.SubElement(comments_section, "Comment")
|
|
comment.set("Operand", operand)
|
|
comment.text = comment_text
|
|
|
|
def update_export_date(self):
|
|
"""Update the export date to current time."""
|
|
export_date = datetime.now().strftime("%a %b %d %H:%M:%S %Y")
|
|
self.root.set("ExportDate", export_date)
|
|
|
|
def apply_updates(self):
|
|
"""Apply all updates to the boilerplate."""
|
|
self.update_module_name()
|
|
self.update_ip_address()
|
|
self.update_parent_module()
|
|
self.update_major_revision()
|
|
self.update_input_comments()
|
|
self.update_output_comments()
|
|
self.update_export_date()
|
|
|
|
def save(self, output_path: str):
|
|
"""Save the updated module to file with proper formatting."""
|
|
if self.tree is None:
|
|
raise RuntimeError("No boilerplate loaded. Call load_boilerplate() first.")
|
|
|
|
# Add proper indentation to the XML
|
|
self._indent_xml(self.root)
|
|
|
|
# Save with proper formatting
|
|
self.tree.write(output_path, encoding='UTF-8', xml_declaration=True)
|
|
|
|
def _indent_xml(self, elem, level=0):
|
|
"""Add proper indentation to XML elements for readable output."""
|
|
indent = "\n" + level * " "
|
|
if len(elem):
|
|
if not elem.text or not elem.text.strip():
|
|
elem.text = indent + " "
|
|
if not elem.tail or not elem.tail.strip():
|
|
elem.tail = indent
|
|
for child in elem:
|
|
self._indent_xml(child, level + 1)
|
|
if not child.tail or not child.tail.strip():
|
|
child.tail = indent
|
|
else:
|
|
if level and (not elem.tail or not elem.tail.strip()):
|
|
elem.tail = indent
|
|
|
|
def get_xml_string(self) -> str:
|
|
"""Get the XML as a string."""
|
|
if self.tree is None:
|
|
raise RuntimeError("No boilerplate loaded. Call load_boilerplate() first.")
|
|
|
|
return ET.tostring(self.root, encoding='unicode')
|
|
|
|
# ------------------------------------------------------------------
|
|
# High-level helper for generator refactor
|
|
# ------------------------------------------------------------------
|
|
|
|
@classmethod
|
|
def from_excel(
|
|
cls,
|
|
module_data: "ModuleData",
|
|
*,
|
|
parent_module: str = None,
|
|
parent_port_id: str = "2",
|
|
) -> "M12DRModuleGenerator":
|
|
"""Create, configure, and return a generator using only Excel data.
|
|
|
|
The calling code can then directly access ``generator.root`` or save
|
|
the file. It fully replaces the manual logic previously present in
|
|
EnhancedMCMGenerator._add_iolm_modules.
|
|
"""
|
|
|
|
variant = _determine_variant(module_data)
|
|
|
|
# Determine parent module using pattern matching if not explicitly provided
|
|
if parent_module is None:
|
|
has_ip = bool(module_data.ip_address and module_data.ip_address.strip())
|
|
parent_module = check_and_register_en4tr_for_device(module_data.tagname, has_ip_address=has_ip)
|
|
|
|
# Create generator to access operand patterns
|
|
config = M12DRModuleConfig(
|
|
name=module_data.tagname,
|
|
variant=variant,
|
|
ip_address=module_data.ip_address or "192.168.1.1",
|
|
parent_module=parent_module,
|
|
parent_port_id=parent_port_id,
|
|
)
|
|
|
|
gen = cls(config)
|
|
gen.load_boilerplate()
|
|
|
|
# Now process comments using dynamic operand conversion
|
|
input_comments: Dict[str, str] = {}
|
|
output_comments: Dict[str, str] = {}
|
|
|
|
for m in module_data.io_mappings:
|
|
operand = gen._convert_terminal_to_operand_dynamic(m.terminal, m.signal)
|
|
if not operand:
|
|
continue
|
|
comment_text = "SPARE" if m.description and m.description.upper() == "SPARE" else m.description
|
|
if not comment_text:
|
|
continue
|
|
if _is_output_signal(m.signal, m.io_path):
|
|
output_comments[operand] = comment_text
|
|
else:
|
|
input_comments[operand] = comment_text
|
|
|
|
# Update config with processed comments
|
|
gen.config.input_comments = input_comments if input_comments else None
|
|
gen.config.output_comments = output_comments if output_comments else None
|
|
|
|
gen.apply_updates()
|
|
return gen
|
|
|
|
|
|
def create_m12dr_module(name: str, variant: str, ip_address: str = "192.168.1.1",
|
|
parent_module: str = "SLOT1_EN4TR", parent_port_id: str = "2",
|
|
input_comments: Optional[Dict[str, str]] = None,
|
|
output_comments: Optional[Dict[str, str]] = None) -> M12DRModuleConfig:
|
|
"""Factory function to create an M12DR module configuration.
|
|
|
|
Note: input_comments and output_comments default to None (no comments).
|
|
Use the get_*_default_*_comments() helper functions if you want default templates.
|
|
"""
|
|
return M12DRModuleConfig(
|
|
name=name,
|
|
variant=variant,
|
|
ip_address=ip_address,
|
|
parent_module=parent_module,
|
|
parent_port_id=parent_port_id,
|
|
input_comments=input_comments,
|
|
output_comments=output_comments
|
|
)
|
|
|
|
|
|
# Helper functions to get default comment structures for PDP_FIO variant
|
|
def get_pdp_fio_default_input_comments() -> Dict[str, str]:
|
|
"""Get default input comments for PDP_FIO variant."""
|
|
return {
|
|
".PT00.DATA": "Input 1",
|
|
".PT01.DATA": "Input 2",
|
|
".PT02.DATA": "Input 3",
|
|
".PT03.DATA": "Input 4",
|
|
".PT04.DATA": "Input 5",
|
|
".PT06.DATA": "Input 6",
|
|
".PT08.DATA": "Input 7",
|
|
".PT09.DATA": "Input 8",
|
|
".PT10.DATA": "Input 9",
|
|
".PT11.DATA": "Input 10",
|
|
".PT12.DATA": "Input 11",
|
|
".IOLINK14": "Smart Device"
|
|
}
|
|
|
|
|
|
def get_pdp_fio_default_output_comments() -> Dict[str, str]:
|
|
"""Get default output comments for PDP_FIO variant."""
|
|
return {
|
|
".PT05.DATA": "Output 1",
|
|
".PT07.DATA": "Output 2",
|
|
".PT13.DATA": "Output 3"
|
|
}
|
|
|
|
|
|
# --------------------------------------------------------------------------------------
|
|
# Utility helpers (ported from EnhancedMCMGenerator for 100 % behaviour parity)
|
|
# --------------------------------------------------------------------------------------
|
|
|
|
|
|
def _determine_variant(module_data: "ModuleData") -> str:
|
|
"""Determine M12DR variant based on module name and FIOH patterns.
|
|
|
|
Logic:
|
|
1. If module name matches specific FIOM patterns (VS01A_FIOM*, VS01C_FIOM*), use FIOM2_Master variant
|
|
2. If module name starts with S0 and is FIO9-FIO19, use Sorter_Solenoid variant
|
|
3. If module name starts with S0, use Sorter variant
|
|
4. If module name contains "PDP", use PDP_FIO variant
|
|
5. If FIOH is found in IO4 or IO12 descriptions, use PalletBuildMaster
|
|
6. Otherwise, use D2CMaster
|
|
"""
|
|
module_name = module_data.tagname.upper()
|
|
|
|
# Check for specific FIOM pattern modules first
|
|
if _is_fiom_master_pattern(module_name):
|
|
return "FIOM2_Master"
|
|
|
|
# Check for Sorter FIO modules first (S0 prefix detection)
|
|
if module_name.startswith("S0") and "FIO" in module_name:
|
|
# Check if this is a solenoid FIO (FIO9-FIO19)
|
|
if _is_solenoid_fio_pattern(module_name):
|
|
return "Sorter_Solenoid"
|
|
else:
|
|
return "Sorter"
|
|
|
|
# Check if module name contains PDP
|
|
if "PDP" in module_name:
|
|
return "PDP_FIO"
|
|
|
|
# Check for FIOH in IO4 or IO12 descriptions
|
|
terminal_desc: Dict[str, str] = {}
|
|
for m in module_data.io_mappings:
|
|
if m.terminal and m.description:
|
|
terminal_desc[m.terminal.upper()] = m.description.upper()
|
|
|
|
io12 = terminal_desc.get("IO12", "")
|
|
io4 = terminal_desc.get("IO4", "")
|
|
|
|
# If any of the IO4/IO12 terminals contains FIOH, then choose PalletBuildMaster
|
|
if any("FIOH" in t for t in (io12, io4)):
|
|
return "PalletBuildMaster"
|
|
|
|
# Default to D2CMaster for all other cases
|
|
return "D2CMaster"
|
|
|
|
|
|
def _is_fiom_master_pattern(module_name: str) -> bool:
|
|
"""Check if module name matches specific FIOM patterns that should use FIOM2_Master boilerplate.
|
|
|
|
|
|
"""
|
|
# Specific module names that should use FIOM2_Master boilerplate
|
|
fiom_master_modules = {
|
|
#"VS01A_FIOM13", "VS01A_FIOM21", "VS01A_FIOM5", "VS01C_FIOM18"
|
|
|
|
}
|
|
return module_name in fiom_master_modules
|
|
|
|
|
|
def _is_master_pattern(module_name: str) -> bool:
|
|
"""Check if module name matches master patterns like FIO2M1, FIO3M, etc."""
|
|
import re
|
|
# Match patterns like FIO2M1, FIO3M, etc.
|
|
master_pattern = re.compile(r'^.*FIO\d+M\d*$')
|
|
return bool(master_pattern.match(module_name))
|
|
|
|
|
|
def _is_solenoid_fio_pattern(module_name: str) -> bool:
|
|
"""Check if module name matches solenoid FIO patterns (FIO9-FIO19)."""
|
|
import re
|
|
# Extract FIO number from patterns like S02_1_FIO9, S02_2_FIO19, etc.
|
|
fio_match = re.search(r'FIO(\d+)', module_name)
|
|
if fio_match:
|
|
fio_number = int(fio_match.group(1))
|
|
# Check if FIO number is between 9 and 19 (inclusive)
|
|
return 9 <= fio_number <= 19
|
|
return False
|
|
|
|
|
|
def _is_output_signal(signal: str, io_path: str) -> bool:
|
|
if not signal:
|
|
return False
|
|
s = signal.upper().strip()
|
|
if s == "O":
|
|
return True
|
|
if s in ("I", "IOLINK"):
|
|
return False
|
|
if io_path and ":O." in io_path.upper():
|
|
return True
|
|
return False
|
|
|
|
# --------------------------------------------------------------------------------------
|
|
|
|
# Example usage
|
|
if __name__ == "__main__":
|
|
# Example: Create a PDP_FIO module
|
|
pdp_fio_config = create_m12dr_module(
|
|
name="PDP_FIO1",
|
|
variant="PDP_FIO",
|
|
ip_address="123.121.231.231",
|
|
input_comments={
|
|
".PT00.DATA": "Emergency Stop",
|
|
".PT01.DATA": "Start Button",
|
|
".PT02.DATA": "Reset Button",
|
|
".IOLINK14": "Smart Sensor"
|
|
},
|
|
output_comments={
|
|
".PT05.DATA": "Status Light",
|
|
".PT07.DATA": "Warning Light",
|
|
".PT13.DATA": "Alarm Horn"
|
|
}
|
|
)
|
|
|
|
generator = M12DRModuleGenerator(pdp_fio_config)
|
|
generator.load_boilerplate()
|
|
generator.apply_updates()
|
|
generator.save("generated/PDP_FIO1.L5X")
|
|
|
|
print(f"Generated PDP_FIO module: {pdp_fio_config.name}") |