2025-08-26 21:29:02 +04:00

828 lines
33 KiB
Python

#!/usr/bin/env python3
"""
Turck Hub Module Boilerplate Model
==================================
Model for Turck Hub (TBIL-M1-16DXP) modules with support for different configurations.
Supports Chute_Load, Chute_Chute, Load_Chute, and PDP_FIOH variants.
Important Constraints:
- Port addresses must be even numbers only (0, 2, 4, 6, 8, 10, 12, 14)
- Maximum port address is 14
"""
from typing import Dict, Optional, List
from typing import TYPE_CHECKING
import xml.etree.ElementTree as ET
from dataclasses import dataclass
from datetime import datetime
import os
if TYPE_CHECKING:
from excel_data_processor import ModuleData, IOPathMapping
@dataclass
class TurckHubModuleConfig:
"""Configuration for a Turck Hub module instance."""
name: str # Module name (e.g., "Chute_Load_Hub1")
variant: str # Module variant: "Chute_Load", "Chute_Chute", "Load_Chute", or "PDP_FIOH"
parent_module: str = "D2CMaster"
parent_port_id: str = "4"
port_address: str = "2" # Port address: Must be even number (0, 2, 4, 6, 8, 10, 12, 14)
inhibited: bool = False
major_fault: bool = False
input_comments: Optional[Dict[str, str]] = None # Key: operand (e.g., ".PROCESSDATAIN.CONNECTOR_4_A_PIN_4"), Value: comment
output_comments: Optional[Dict[str, str]] = None # Key: operand (e.g., ".PROCESSDATAOUT.CONNECTOR_3_B_PIN_2"), Value: comment
def __post_init__(self):
"""Validate configuration after initialization."""
self._validate_port_address()
def _validate_port_address(self):
"""Validate that port address is an even number between 0-14."""
try:
addr = int(self.port_address)
if addr < 0 or addr > 14:
raise ValueError(f"Port address must be between 0-14, got: {addr}")
if addr % 2 != 0:
raise ValueError(f"Port address must be even number, got: {addr}")
except ValueError as e:
if "invalid literal" in str(e):
raise ValueError(f"Port address must be a valid integer, got: '{self.port_address}'")
raise
class TurckHubModuleGenerator:
"""Generator for Turck Hub module XML with different variant support.
Note: Port addresses must be even numbers between 0-14 (0, 2, 4, 6, 8, 10, 12, 14).
"""
# Mapping of variants to boilerplate filenames
VARIANT_BOILERPLATE_MAP = {
"Chute_Load": "Chute_Load_Hub_Module.L5X",
"Chute_Chute": "Chute_Chute_Hub_Module.L5X",
"Load_Chute": "Load_Chute_Hub_Module.L5X",
"PDP_FIOH": "PDP_FIOH_Module.L5X",
"FL_Hub": "FL_Hub_Module.L5X",
"Sorter": "Sorter_FIOH_Module.L5X"
}
# Default port addresses for each variant (FIOH must be on 6 or 14)
VARIANT_PORT_ADDRESSES = {
"Chute_Load": "6", # Fixed: was "2", now proper FIOH address
"Chute_Chute": "6", # Fixed: was "0", now proper FIOH address
"Load_Chute": "14", # Fixed: was "8", now proper FIOH address
"PDP_FIOH": "6", # PDP FIOH modules default to address 6
"FL_Hub": "6", # FL Hub modules default to address 6
"Sorter": "4" # Sorter FIOH modules default to address 4
}
def __init__(self, config: TurckHubModuleConfig):
self.config = config
# Determine the correct boilerplate file
self.boilerplate_filename = self._determine_boilerplate_filename()
# Use project-specific boilerplate directory if set, otherwise default
boilerplate_dir = os.environ.get('MCM_BOILERPLATE_DIR', 'boilerplate')
self.boilerplate_path = os.path.join(boilerplate_dir, self.boilerplate_filename)
# Set default port address if not specified
if not self.config.port_address:
self.config.port_address = self.VARIANT_PORT_ADDRESSES[self.config.variant]
self.tree = None
self.root = None
def _determine_boilerplate_filename(self) -> str:
"""Determine the boilerplate filename to use.
Priority:
1. If "FL" is in the module name, use FL_Hub_Module.L5X
2. Check for module-specific boilerplate: {module_name}_Module.L5X
3. Fall back to variant-based boilerplate
"""
# First, check if "FL" is in the module name
if "FL" in self.config.name.upper():
fl_hub_filename = "FL_Hub_Module.L5X"
print(f" {self.config.name}: Detected 'FL' in name, using FL_Hub boilerplate {fl_hub_filename}")
return fl_hub_filename
# Second, try module-specific boilerplate
module_specific_filename = f"{self.config.name}_Module.L5X"
# Use project-specific boilerplate directory if set, otherwise default
boilerplate_dir = os.environ.get('MCM_BOILERPLATE_DIR', 'boilerplate')
module_specific_path = os.path.join(boilerplate_dir, module_specific_filename)
if os.path.exists(module_specific_path):
print(f" {self.config.name} (FIOH {self.config.variant}): Using module-specific boilerplate {module_specific_filename}")
return module_specific_filename
# Fall back to variant-based boilerplate
if self.config.variant not in self.VARIANT_BOILERPLATE_MAP:
raise ValueError(f"Unsupported variant: {self.config.variant}. Supported variants: {list(self.VARIANT_BOILERPLATE_MAP.keys())}")
fallback_filename = self.VARIANT_BOILERPLATE_MAP[self.config.variant]
print(f" {self.config.name} (FIOH {self.config.variant}): Using variant boilerplate {fallback_filename}")
return fallback_filename
def load_boilerplate(self):
"""Load the appropriate boilerplate template based on variant."""
if not os.path.exists(self.boilerplate_path):
raise FileNotFoundError(f"Boilerplate file not found: {self.boilerplate_path}")
self.tree = ET.parse(self.boilerplate_path)
self.root = self.tree.getroot()
def update_module_name(self):
"""Update the module name throughout the XML."""
# Update in root attributes
self.root.set("TargetName", self.config.name)
# Update Module element
module = self.root.find(".//Module[@Use='Target']")
if module is not None:
module.set("Name", self.config.name)
def update_parent_module(self):
"""Update parent module references."""
module = self.root.find(".//Module[@Use='Target']")
if module is not None:
module.set("ParentModule", self.config.parent_module)
module.set("ParentModPortId", self.config.parent_port_id)
def update_port_address(self):
"""Update the port address."""
port = self.root.find(".//Port[@Type='IO-Link']")
if port is not None:
port.set("Address", self.config.port_address)
else:
print(f" ERROR: Could not find IO-Link port for {self.config.name}")
def update_inhibited_status(self):
"""Update the inhibited status."""
module = self.root.find(".//Module[@Use='Target']")
if module is not None:
module.set("Inhibited", "true" if self.config.inhibited else "false")
module.set("MajorFault", "true" if self.config.major_fault else "false")
def update_input_comments(self):
"""Update input tag comments."""
if self.config.input_comments:
input_tag = self.root.find(".//Connection[@Name='_2004250069802D0028802D005304']/InputTag")
if input_tag is not None:
# Find or create Comments section
input_comments = input_tag.find("Comments")
if input_comments is None:
# Create Comments section as the first child
input_comments = ET.Element("Comments")
input_tag.insert(0, input_comments)
else:
# Clear existing comments
input_comments.clear()
# Add new comments
for operand, comment_text in self.config.input_comments.items():
comment = ET.SubElement(input_comments, "Comment")
comment.set("Operand", operand)
comment.text = comment_text
def update_output_comments(self):
"""Update output tag comments."""
if self.config.output_comments:
output_tag = self.root.find(".//Connection[@Name='_2004250069802D0028802D005304']/OutputTag")
if output_tag is not None:
# Find or create Comments section
output_comments = output_tag.find("Comments")
if output_comments is None:
# Create Comments section as the first child
output_comments = ET.Element("Comments")
output_tag.insert(0, output_comments)
else:
# Clear existing comments
output_comments.clear()
# Add new comments
for operand, comment_text in self.config.output_comments.items():
comment = ET.SubElement(output_comments, "Comment")
comment.set("Operand", operand)
comment.text = comment_text
def update_export_date(self):
"""Update the export date to current time."""
export_date = datetime.now().strftime("%a %b %d %H:%M:%S %Y")
self.root.set("ExportDate", export_date)
def apply_updates(self):
"""Apply all updates to the boilerplate."""
self.update_module_name()
self.update_parent_module()
self.update_port_address()
self.update_inhibited_status()
self.update_input_comments()
self.update_output_comments()
self.update_export_date()
def save(self, output_path: str):
"""Save the updated module to file, preserving CDATA sections."""
if self.tree is None:
raise RuntimeError("No boilerplate loaded. Call load_boilerplate() first.")
# Read the original boilerplate file to preserve formatting and CDATA
with open(self.boilerplate_path, 'r', encoding='utf-8') as f:
original_content = f.read()
# Apply our updates by doing string replacements on the original content
updated_content = self._apply_updates_to_content(original_content)
# Write the updated content
with open(output_path, 'w', encoding='utf-8') as f:
f.write(updated_content)
def _apply_updates_to_content(self, content: str) -> str:
"""Apply updates to the original XML content via string replacement."""
import re
# Update TargetName in root element
content = re.sub(
r'TargetName="[^"]*"',
f'TargetName="{self.config.name}"',
content
)
# Update ExportDate
export_date = datetime.now().strftime("%a %b %d %H:%M:%S %Y")
content = re.sub(
r'ExportDate="[^"]*"',
f'ExportDate="{export_date}"',
content
)
# Update Module Name
content = re.sub(
r'<Module Use="Target" Name="[^"]*"',
f'<Module Use="Target" Name="{self.config.name}"',
content
)
# Update ParentModule and ParentModPortId
content = re.sub(
r'ParentModule="[^"]*"',
f'ParentModule="{self.config.parent_module}"',
content
)
content = re.sub(
r'ParentModPortId="[^"]*"',
f'ParentModPortId="{self.config.parent_port_id}"',
content
)
# Update Port Address
content = re.sub(
r'<Port Id="2" Address="[^"]*"',
f'<Port Id="2" Address="{self.config.port_address}"',
content
)
# Update Inhibited and MajorFault status
content = re.sub(
r'Inhibited="[^"]*"',
f'Inhibited="{"true" if self.config.inhibited else "false"}"',
content
)
content = re.sub(
r'MajorFault="[^"]*"',
f'MajorFault="{"true" if self.config.major_fault else "false"}"',
content
)
# Update comments sections
if self.config.input_comments:
content = self._update_input_comments_in_content(content)
if self.config.output_comments:
content = self._update_output_comments_in_content(content)
return content
def _update_input_comments_in_content(self, content: str) -> str:
"""Update input comments in the content string."""
import re
# First try to find existing Comments section
pattern = r'(<InputTag[^>]*>\s*<Comments>)(.*?)(</Comments>)'
def replace_comments(match):
start = match.group(1)
end = match.group(3)
# Determine base indentation from existing block
m_indent = re.search(r"\n(\s*)<", start)
base_indent = m_indent.group(1) if m_indent else " " # 12 spaces as fallback
# Build comments exactly like boiler-plate
pieces = []
for operand, txt in self.config.input_comments.items():
pieces.extend([
f"{base_indent}<Comment Operand=\"{operand}\">",
f"{base_indent} <![CDATA[{txt}]]>",
f"{base_indent}</Comment>"
])
return f"{start}\n" + "\n".join(pieces) + f"\n{base_indent}{end}"
# Try to replace existing Comments section
new_content = re.sub(pattern, replace_comments, content, flags=re.DOTALL)
# If no replacement was made, we need to create the Comments section
if new_content == content:
# Find InputTag and insert Comments section
input_tag_pattern = r'(<InputTag[^>]*>)(\s*<Data)'
def insert_comments(match):
tag_start = match.group(1)
data_start = match.group(2)
# Determine indentation
m_indent = re.search(r"\n(\s*)<Data", data_start)
base_indent = m_indent.group(1) if m_indent else " "
# Build comments section
pieces = [f"{tag_start}", f"{base_indent}<Comments>"]
for operand, txt in self.config.input_comments.items():
pieces.extend([
f"{base_indent}<Comment Operand=\"{operand}\">",
f"{base_indent} <![CDATA[{txt}]]>",
f"{base_indent}</Comment>"
])
pieces.append(f"{base_indent}</Comments>")
return "\n".join(pieces) + data_start
new_content = re.sub(input_tag_pattern, insert_comments, content, flags=re.DOTALL)
return new_content
def _update_output_comments_in_content(self, content: str) -> str:
"""Update output comments in the content string."""
import re
# First try to find existing Comments section
pattern = r'(<OutputTag[^>]*>\s*<Comments>)(.*?)(</Comments>)'
def replace_comments(match):
start = match.group(1)
end = match.group(3)
import re
m_indent = re.search(r"\n(\s*)<", start)
base_indent = m_indent.group(1) if m_indent else " "
pieces = []
for operand, txt in self.config.output_comments.items():
pieces.extend([
f"{base_indent}<Comment Operand=\"{operand}\">",
f"{base_indent} <![CDATA[{txt}]]>",
f"{base_indent}</Comment>"
])
return f"{start}\n" + "\n".join(pieces) + f"\n{base_indent}{end}"
# Try to replace existing Comments section
new_content = re.sub(pattern, replace_comments, content, flags=re.DOTALL)
# If no replacement was made, we need to create the Comments section
if new_content == content:
# Find OutputTag and insert Comments section
output_tag_pattern = r'(<OutputTag[^>]*>)(\s*<Data)'
def insert_comments(match):
tag_start = match.group(1)
data_start = match.group(2)
# Determine indentation
m_indent = re.search(r"\n(\s*)<Data", data_start)
base_indent = m_indent.group(1) if m_indent else " "
# Build comments section
pieces = [f"{tag_start}", f"{base_indent}<Comments>"]
for operand, txt in self.config.output_comments.items():
pieces.extend([
f"{base_indent}<Comment Operand=\"{operand}\">",
f"{base_indent} <![CDATA[{txt}]]>",
f"{base_indent}</Comment>"
])
pieces.append(f"{base_indent}</Comments>")
return "\n".join(pieces) + data_start
new_content = re.sub(output_tag_pattern, insert_comments, content, flags=re.DOTALL)
return new_content
def get_xml_string(self) -> str:
"""Get the XML as a string."""
if self.tree is None:
raise RuntimeError("No boilerplate loaded. Call load_boilerplate() first.")
return ET.tostring(self.root, encoding='unicode')
# ------------------------------------------------------------------
# Factory helper for EnhancedMCMGenerator refactor
# ------------------------------------------------------------------
@classmethod
def from_excel(cls, module_data: "ModuleData") -> "TurckHubModuleGenerator":
"""Create, configure, and return generator directly from Excel data."""
variant = _determine_variant(module_data)
parent_module, parent_port_id, port_address = _parent_info(module_data)
input_comments, output_comments = _extract_comments(module_data)
cfg = create_turck_hub_module(
name=module_data.tagname,
variant=variant,
parent_module=parent_module,
parent_port_id=parent_port_id,
port_address=port_address,
input_comments=input_comments if input_comments else None,
output_comments=output_comments if output_comments else None,
)
gen = cls(cfg)
gen.load_boilerplate()
gen.apply_updates()
return gen
def get_valid_port_addresses() -> List[str]:
"""Get list of valid port addresses for Turck Hub modules.
Returns:
List of valid port addresses: ['0', '2', '4', '6', '8', '10', '12', '14']
"""
return [str(i) for i in range(0, 15, 2)]
def create_turck_hub_module(name: str, variant: str, parent_module: str = "D2CMaster",
parent_port_id: str = "4", port_address: str = None,
input_comments: Optional[Dict[str, str]] = None,
output_comments: Optional[Dict[str, str]] = None,
inhibited: bool = False, major_fault: bool = False) -> TurckHubModuleConfig:
"""Factory function to create a Turck Hub module configuration.
Args:
name: Module name
variant: One of "Chute_Load", "Chute_Chute", "Load_Chute", "PDP_FIOH"
parent_module: Parent module name
parent_port_id: Parent module port ID
port_address: IO-Link port address - must be even number (0, 2, 4, 6, 8, 10, 12, 14)
If None, uses variant default
input_comments: Dict of input tag comments
output_comments: Dict of output tag comments
inhibited: Whether module starts inhibited
major_fault: Whether module starts with major fault
Returns:
TurckHubModuleConfig instance
Raises:
ValueError: If port_address is not a valid even number between 0-14
"""
return TurckHubModuleConfig(
name=name,
variant=variant,
parent_module=parent_module,
parent_port_id=parent_port_id,
port_address=port_address,
input_comments=input_comments,
output_comments=output_comments,
inhibited=inhibited,
major_fault=major_fault
)
# Helper functions to get default comment structures for each variant
def get_chute_load_default_input_comments() -> Dict[str, str]:
"""Get default input comments for Chute_Load variant."""
return {
".PROCESSDATAIN.CONNECTOR_4_A_PIN_4": "PE",
".PROCESSDATAIN.CONNECTOR_3_A_PIN_4": "PB In",
".PROCESSDATAIN.CONNECTOR_2_A_PIN_4": "PE 50",
".PROCESSDATAIN.CONNECTOR_6_A_PIN_4": "PB In",
".PROCESSDATAIN.CONNECTOR_5_A_PIN_4": "PE"
}
def get_chute_load_default_output_comments() -> Dict[str, str]:
"""Get default output comments for Chute_Load variant."""
return {
".PROCESSDATAOUT.CONNECTOR_3_B_PIN_2": "PB LT Out",
".PROCESSDATAOUT.CONNECTOR_1_B_PIN_2": "Beacon Segment2",
".PROCESSDATAOUT.CONNECTOR_1_A_PIN_4": "Beacon Segment1",
".PROCESSDATAOUT.CONNECTOR_8_A_PIN_4": "Sol",
".PROCESSDATAOUT.CONNECTOR_7_B_PIN_2": "Beacon Segment2",
".PROCESSDATAOUT.CONNECTOR_7_A_PIN_4": "Beacon Segment1"
}
def get_chute_chute_default_input_comments() -> Dict[str, str]:
"""Get default input comments for Chute_Chute variant."""
return {
".PROCESSDATAIN.CONNECTOR_4_A_PIN_4": "PE 100",
".PROCESSDATAIN.CONNECTOR_3_A_PIN_4": "PE 100",
".PROCESSDATAIN.CONNECTOR_2_A_PIN_4": "PE 50",
".PROCESSDATAIN.CONNECTOR_1_A_PIN_4": "PE 50",
".PROCESSDATAIN.CONNECTOR_6_A_PIN_4": "PB In",
".PROCESSDATAIN.CONNECTOR_5_A_PIN_4": "PB In"
}
def get_chute_chute_default_output_comments() -> Dict[str, str]:
"""Get default output comments for Chute_Chute variant."""
return {
".PROCESSDATAOUT.CONNECTOR_8_A_PIN_4": "Sol",
".PROCESSDATAOUT.CONNECTOR_7_A_PIN_4": "Sol"
}
def get_load_chute_default_input_comments() -> Dict[str, str]:
"""Get default input comments for Load_Chute variant."""
return {
".PROCESSDATAIN.CONNECTOR_4_A_PIN_4": "PB In",
".PROCESSDATAIN.CONNECTOR_3_A_PIN_4": "PE 100",
".PROCESSDATAIN.CONNECTOR_1_A_PIN_4": "PE 50",
".PROCESSDATAIN.CONNECTOR_6_A_PIN_4": "PE",
".PROCESSDATAIN.CONNECTOR_5_A_PIN_4": "PB In"
}
def get_load_chute_default_output_comments() -> Dict[str, str]:
"""Get default output comments for Load_Chute variant."""
return {
".PROCESSDATAOUT.CONNECTOR_4_B_PIN_2": "PB Out",
".PROCESSDATAOUT.CONNECTOR_2_B_PIN_2": "Beacon Segment2",
".PROCESSDATAOUT.CONNECTOR_2_A_PIN_4": "Beacon Segment1",
".PROCESSDATAOUT.CONNECTOR_8_B_PIN_2": "Beacon Segment2",
".PROCESSDATAOUT.CONNECTOR_8_A_PIN_4": "Beacon Segment1",
".PROCESSDATAOUT.CONNECTOR_7_A_PIN_4": "Sol"
}
def get_pdp_fioh_default_input_comments() -> Dict[str, str]:
"""Get default input comments for PDP_FIOH variant."""
return {
".PROCESSDATAIN.CONNECTOR_1_A_PIN_4": "Circuit Breaker 1",
".PROCESSDATAIN.CONNECTOR_1_B_PIN_2": "Circuit Breaker 2",
".PROCESSDATAIN.CONNECTOR_2_A_PIN_4": "Circuit Breaker 3",
".PROCESSDATAIN.CONNECTOR_2_B_PIN_2": "Circuit Breaker 4",
".PROCESSDATAIN.CONNECTOR_3_A_PIN_4": "Circuit Breaker 5",
".PROCESSDATAIN.CONNECTOR_3_B_PIN_2": "Circuit Breaker 6",
".PROCESSDATAIN.CONNECTOR_4_A_PIN_4": "Circuit Breaker 7",
".PROCESSDATAIN.CONNECTOR_4_B_PIN_2": "Circuit Breaker 8",
".PROCESSDATAIN.CONNECTOR_5_A_PIN_4": "Circuit Breaker 9",
".PROCESSDATAIN.CONNECTOR_5_B_PIN_2": "Circuit Breaker 10",
".PROCESSDATAIN.CONNECTOR_6_A_PIN_4": "Circuit Breaker 11",
".PROCESSDATAIN.CONNECTOR_6_B_PIN_2": "Circuit Breaker 12",
".PROCESSDATAIN.CONNECTOR_7_A_PIN_4": "Circuit Breaker 13",
".PROCESSDATAIN.CONNECTOR_7_B_PIN_2": "Circuit Breaker 14",
".PROCESSDATAIN.CONNECTOR_8_A_PIN_4": "Circuit Breaker 15",
".PROCESSDATAIN.CONNECTOR_8_B_PIN_2": "Circuit Breaker 16"
}
# --------------------------------------------------------------------------------
# Helper logic migrated from EnhancedMCMGenerator for behaviour parity
# --------------------------------------------------------------------------------
def _determine_variant(module_data: "ModuleData") -> str:
"""Determine Turck hub variant based on module name and DESC patterns."""
module_name = module_data.tagname.upper()
# Check for Sorter FIOH modules first (S0 prefix detection)
# Handle patterns like S0_FIO1H2, S0_FIO2H3, etc.
if (module_name.startswith("S0") or module_name.startswith("VS")) and ("FIOH" in module_name or _is_hub_pattern(module_name)):
return "Sorter"
# Check for PDP FIOH modules (name-based detection)
if "PDP" in module_name and "FIOH" in module_name:
return "PDP_FIOH"
# Build terminal to description mapping
terminal_desc: Dict[str, str] = {}
for m in module_data.io_mappings:
if m.terminal and m.description:
terminal_desc[m.terminal.upper()] = m.description.upper()
io4 = terminal_desc.get("IO4", "")
io10 = terminal_desc.get("IO10", "")
io11 = terminal_desc.get("IO11", "")
io12 = terminal_desc.get("IO12", "")
# Check for Chute_Load variant: JR1 in IO11 or IO10
if "JR1" in io11 or "JR1" in io10:
return "Chute_Load"
# Check for Chute_Chute variant: PR1 in IO4 or IO12
if "PR1" in io4 or "PR1" in io12:
return "Chute_Chute"
# Check for Load_Chute variant (different from Chute_Load)
# This might need additional logic - for now keeping as separate case
# You may need to specify the exact criteria for Load_Chute
# Default to Chute_Chute as fallback for other cases
return "Chute_Chute"
def _is_hub_pattern(module_name: str) -> bool:
"""Check if module name matches hub patterns like FIO1H2, FIO2H3, FIOH1, etc."""
import re
# Match patterns like FIO1H2, FIO2H3, FIOH1, etc.
hub_pattern = re.compile(r'^.*FIO(\d+)?H\d*$')
return bool(hub_pattern.match(module_name))
def _extract_comments(module_data: "ModuleData") -> tuple[Dict[str, str], Dict[str, str]]:
input_comments: Dict[str, str] = {}
output_comments: Dict[str, str] = {}
for m in module_data.io_mappings:
if not (m.io_path and m.description):
continue
comment_text = m.description
# For PDP FIOH modules, map IO terminals to connector operands
if module_data.tagname.upper().find("PDP") != -1 and module_data.tagname.upper().find("FIOH1") != -1:
operand = _map_pdp_io_to_connector(m.terminal, m.signal)
if operand:
if m.signal.upper() == "I":
input_comments[operand] = comment_text
elif m.signal.upper() == "O":
output_comments[operand] = comment_text
continue
# Original logic for other variants
if ":" in m.io_path:
_, io_part = m.io_path.split(":", 1)
else:
io_part = m.io_path # fallback
io_part_up = io_part.upper()
if "I.PROCESSDATAIN." in io_part_up:
connector = io_part_up.split("I.PROCESSDATAIN.", 1)[1]
input_comments[f".PROCESSDATAIN.{connector}"] = comment_text
elif "O.PROCESSDATAOUT." in io_part_up:
connector = io_part_up.split("O.PROCESSDATAOUT.", 1)[1]
output_comments[f".PROCESSDATAOUT.{connector}"] = comment_text
else:
# Handle path without colon by checking substrings
if "I.PROCESSDATAIN." in io_part_up:
connector = io_part_up.split("I.PROCESSDATAIN.", 1)[1]
input_comments[f".PROCESSDATAIN.{connector}"] = comment_text
elif "O.PROCESSDATAOUT." in io_part_up:
connector = io_part_up.split("O.PROCESSDATAOUT.", 1)[1]
output_comments[f".PROCESSDATAOUT.{connector}"] = comment_text
return input_comments, output_comments
def _map_pdp_io_to_connector(terminal: str, signal: str) -> str:
"""Map PDP FIOH IO terminal to connector operand.
Examples:
- IO00 -> .PROCESSDATAIN.CONNECTOR_1_A_PIN_4
- IO01 -> .PROCESSDATAIN.CONNECTOR_1_B_PIN_2
- IO02 -> .PROCESSDATAIN.CONNECTOR_2_A_PIN_4
- etc.
"""
if not terminal.upper().startswith("IO"):
return ""
try:
io_num = int(terminal.upper().replace("IO", ""))
except ValueError:
return ""
# Calculate connector number (1-based)
connector_num = (io_num // 2) + 1
# Determine pin type based on even/odd
if io_num % 2 == 0:
pin_type = "A_PIN_4"
else:
pin_type = "B_PIN_2"
# Build operand
if signal.upper() == "I":
return f".PROCESSDATAIN.CONNECTOR_{connector_num}_{pin_type}"
elif signal.upper() == "O":
return f".PROCESSDATAOUT.CONNECTOR_{connector_num}_{pin_type}"
else:
return ""
def _parent_info(module_data: "ModuleData") -> tuple[str, str, str]:
"""Mimic EnhancedMCMGenerator._get_parent_info for a single FIOH module."""
# Extract port address from terminal - use the actual terminal number
term = module_data.terminal.upper() if module_data.terminal else ""
if term.startswith("IO"):
# Extract the numeric part directly (IO4 -> 4, IO12 -> 12, IO14 -> 14)
try:
port_address = term[2:] # Get everything after "IO"
except:
port_address = "4" # Default to channel 4 if parsing fails
else:
port_address = "4" # Default to channel 4 if not an IO terminal
# The parent_module should be set from Excel data (e.g., "PDP1_FIO1")
parent_module = module_data.parent_module
if not parent_module:
raise ValueError(f"FIOH module {module_data.tagname} missing parent module information")
# For FIOH modules connected via IO-Link, always use port 4 (the IO-Link port on M12DR)
# The port_address is different - it's the address on the IO-Link network
parent_port_id = "4"
return parent_module, parent_port_id, port_address
# Example usage
if __name__ == "__main__":
# Example: Create a Chute_Load hub module
chute_load_config = create_turck_hub_module(
name="Chute_Load_Hub1",
variant="Chute_Load",
parent_module="D2CMaster",
parent_port_id="4",
input_comments={
".PROCESSDATAIN.CONNECTOR_4_A_PIN_4": "Emergency Stop",
".PROCESSDATAIN.CONNECTOR_3_A_PIN_4": "Reset Button",
".PROCESSDATAIN.CONNECTOR_2_A_PIN_4": "Photo Eye 1",
".PROCESSDATAIN.CONNECTOR_6_A_PIN_4": "Photo Eye 2"
},
output_comments={
".PROCESSDATAOUT.CONNECTOR_3_B_PIN_2": "Status Light",
".PROCESSDATAOUT.CONNECTOR_1_B_PIN_2": "Warning Beacon",
".PROCESSDATAOUT.CONNECTOR_8_A_PIN_4": "Conveyor Motor"
}
)
generator = TurckHubModuleGenerator(chute_load_config)
generator.load_boilerplate()
generator.apply_updates()
generator.save("generated_projects/Chute_Load_Hub1.L5X")
# Example: Create a Chute_Chute hub module with defaults
chute_chute_config = create_turck_hub_module(
name="Chute_Chute_Hub2",
variant="Chute_Chute",
input_comments=get_chute_chute_default_input_comments(),
output_comments=get_chute_chute_default_output_comments()
)
generator2 = TurckHubModuleGenerator(chute_chute_config)
generator2.load_boilerplate()
generator2.apply_updates()
generator2.save("generated_projects/Chute_Chute_Hub2.L5X")
# Example: Create a Load_Chute hub module
load_chute_config = create_turck_hub_module(
name="Load_Chute_Hub3",
variant="Load_Chute",
parent_module="IOLMMaster1",
parent_port_id="2",
input_comments=get_load_chute_default_input_comments(),
output_comments=get_load_chute_default_output_comments()
)
generator3 = TurckHubModuleGenerator(load_chute_config)
generator3.load_boilerplate()
generator3.apply_updates()
generator3.save("generated_projects/Load_Chute_Hub3.L5X")
# Example: Create a PDP_FIOH hub module
pdp_fioh_config = create_turck_hub_module(
name="PDP1_FIOH1",
variant="PDP_FIOH",
parent_module="SLOT2_EN4TR",
parent_port_id="2",
input_comments=get_pdp_fioh_default_input_comments()
)
generator4 = TurckHubModuleGenerator(pdp_fioh_config)
generator4.load_boilerplate()
generator4.apply_updates()
generator4.save("generated_projects/PDP1_FIOH1.L5X")
print(f"Generated Chute_Load hub module: {chute_load_config.name}")
print(f"Generated Chute_Chute hub module: {chute_chute_config.name}")
print(f"Generated Load_Chute hub module: {load_chute_config.name}")
print(f"Generated PDP_FIOH hub module: {pdp_fioh_config.name}")