#!/usr/bin/env python3 """ TL70 Beacon Module Boilerplate Model ==================================== Model for TL70 Pro with IO-Link beacon modules. Supports configuring module name, parent module, port address, and basic segment settings. """ from typing import Dict, Optional, Tuple import xml.etree.ElementTree as ET from dataclasses import dataclass from datetime import datetime import os @dataclass class TL70BeaconConfig: """Configuration for a TL70 beacon instance.""" name: str # Module name (e.g., "BEACON1") parent_module: str = "IOLM1" # Parent IO-Link master module parent_port_id: str = "4" # Port on the IO-Link master (always 4 for IOLM modules) port_address: str = "0" # IO-Link port address inhibited: bool = False major_fault: bool = False application_tag: str = "***" # Application specific tag (up to 29 chars) segment_1_color: int = 0 # Segment 1 basic color (0-15) segment_1_flash_rate: int = 0 # Segment 1 flash rate (0-15) segment_2_color: int = 0 # Segment 2 basic color (0-15) segment_2_flash_rate: int = 0 # Segment 2 flash rate (0-15) segment_3_color: int = 9 # Segment 3 basic color (0-15) segment_3_flash_rate: int = 0 # Segment 3 flash rate (0-15) segment_4_color: int = 0 # Segment 4 basic color (0-15) segment_4_flash_rate: int = 0 # Segment 4 flash rate (0-15) segment_5_color: int = 0 # Segment 5 basic color (0-15) segment_5_flash_rate: int = 0 # Segment 5 flash rate (0-15) segment_6_color: int = 0 # Segment 6 basic color (0-15) segment_6_flash_rate: int = 0 # Segment 6 flash rate (0-15) operating_mode: int = 1 # Operating mode: 0=Basic, 1=Run, 2=Level, 3=Gauge, 4=Advanced class TL70BeaconGenerator: """Generator for TL70 beacon module XML.""" def __init__(self, config: TL70BeaconConfig): self.config = config # Use project-specific boilerplate directory if set, otherwise default boilerplate_dir = os.environ.get('MCM_BOILERPLATE_DIR', 'boilerplate') self.boilerplate_path = os.path.join(boilerplate_dir, "TL70_Module.L5X") self.tree = None self.root = None def load_boilerplate(self): """Load the TL70 boilerplate template.""" if not os.path.exists(self.boilerplate_path): raise FileNotFoundError(f"Boilerplate file not found: {self.boilerplate_path}") self.tree = ET.parse(self.boilerplate_path) self.root = self.tree.getroot() def update_module_name(self): """Update the module name throughout the XML.""" # Update in root attributes self.root.set("TargetName", self.config.name) # Update Module element module = self.root.find(".//Module[@Use='Target']") if module is not None: module.set("Name", self.config.name) def update_parent_module(self): """Update parent module references.""" module = self.root.find(".//Module[@Use='Target']") if module is not None: module.set("ParentModule", self.config.parent_module) # For IO-Link Master modules (M12DR), always use port 4 (the IO-Link port) # regardless of what was specified in the config parent_port_id = "4" # Always use port 4 for IOLM modules module.set("ParentModPortId", parent_port_id) def update_port_address(self): """Update the IO-Link port address.""" port = self.root.find(".//Port[@Type='IO-Link']") if port is not None: port.set("Address", self.config.port_address) def update_inhibited_status(self): """Update the inhibited status.""" module = self.root.find(".//Module[@Use='Target']") if module is not None: module.set("Inhibited", str(self.config.inhibited).lower()) module.set("MajorFault", str(self.config.major_fault).lower()) def update_application_tag(self): """Update the application specific tag.""" # Find the Application_specific_Tag structure app_tag_len = self.root.find(".//StructureMember[@Name='Application_specific_Tag']//DataValueMember[@Name='LEN']") app_tag_data = self.root.find(".//StructureMember[@Name='Application_specific_Tag']//DataValueMember[@Name='DATA']") if app_tag_len is not None and app_tag_data is not None: # Update length app_tag_len.set("Value", str(len(self.config.application_tag))) # Update data - need to format as CDATA with proper padding padded_tag = self.config.application_tag + "$00" * (29 - len(self.config.application_tag)) # Just set the text directly, we'll handle CDATA in save() app_tag_data.text = f"'{padded_tag}'" # Also update the L5K data format config_tag = self.root.find(".//ConfigTag//Data[@Format='L5K']") if config_tag is not None: # The L5K format contains the tag data - we need to update it # This is complex binary data, so we'll focus on the Decorated format pass def update_segment_configuration(self): """Update segment color and flash rate configurations.""" segments = [ (1, self.config.segment_1_color, self.config.segment_1_flash_rate), (2, self.config.segment_2_color, self.config.segment_2_flash_rate), (3, self.config.segment_3_color, self.config.segment_3_flash_rate), (4, self.config.segment_4_color, self.config.segment_4_flash_rate), (5, self.config.segment_5_color, self.config.segment_5_flash_rate), (6, self.config.segment_6_color, self.config.segment_6_flash_rate), ] for segment_num, color, flash_rate in segments: # Update basic color color_elem = self.root.find(f".//StructureMember[@Name='Segment_{segment_num}_Config']//DataValueMember[@Name='Segment_{segment_num}_Settings_Basic_Color']") if color_elem is not None: color_elem.set("Value", str(color)) # Update flash rate flash_elem = self.root.find(f".//StructureMember[@Name='Segment_{segment_num}_Config']//DataValueMember[@Name='Segment_{segment_num}_Settings_Basic_Flash_Rate']") if flash_elem is not None: flash_elem.set("Value", str(flash_rate)) def update_output_tag_segment_data(self): """Update the output tag segment data - only Color_1 values are configured, everything else is zero.""" # List of all segment-related output fields that should be set to 0 segment_fields = [ # Segment 1 fields "Segment_1_Color_2", "Segment_1_Color_1_Intensity", "Segment_1_Color_2_Intensity", "Segment_1_Pulse_Pattern", "Segment_1_Speed", "Segment_1_Animation_Type", # Segment 2 fields "Segment_2_Color_2", "Segment_2_Color_1_Intensity", "Segment_2_Color_2_Intensity", "Segment_2_Pulse_Pattern", "Segment_2_Speed", "Segment_2_Animation_Type", # Segment 3 fields "Segment_3_Color_2", "Segment_3_Color_1_Intensity", "Segment_3_Color_2_Intensity", "Segment_3_Pulse_Pattern", "Segment_3_Speed", "Segment_3_Animation_Type", # Segment 4 fields "Segment_4_Color_1", "Segment_4_Color_2", "Segment_4_Color_1_Intensity", "Segment_4_Color_2_Intensity", "Segment_4_Pulse_Pattern", "Segment_4_Speed", "Segment_4_Animation_Type", # Segment 5 fields "Segment_5_Color_1", "Segment_5_Color_2", "Segment_5_Color_1_Intensity", "Segment_5_Color_2_Intensity", "Segment_5_Pulse_Pattern", "Segment_5_Speed", "Segment_5_Animation_Type", # Segment 6 fields "Segment_6_Color_1", "Segment_6_Color_2", "Segment_6_Color_1_Intensity", "Segment_6_Color_2_Intensity", "Segment_6_Pulse_Pattern", "Segment_6_Speed", "Segment_6_Animation_Type", # Audible "Audible" ] # Set all segment fields to 0 for field_name in segment_fields: field_elem = self.root.find(f".//OutputTag//DataValueMember[@Name='{field_name}']") if field_elem is not None: field_elem.set("Value", "0") # Now set only the Color_1 values for segments 1, 2, 3 to configured values seg1_color1 = self.root.find(".//OutputTag//DataValueMember[@Name='Segment_1_Color_1']") if seg1_color1 is not None: seg1_color1.set("Value", str(self.config.segment_1_color)) seg2_color1 = self.root.find(".//OutputTag//DataValueMember[@Name='Segment_2_Color_1']") if seg2_color1 is not None: seg2_color1.set("Value", str(self.config.segment_2_color)) seg3_color1 = self.root.find(".//OutputTag//DataValueMember[@Name='Segment_3_Color_1']") if seg3_color1 is not None: seg3_color1.set("Value", str(self.config.segment_3_color)) def update_export_date(self): """Update the export date to current time.""" export_date = datetime.now().strftime("%a %b %d %H:%M:%S %Y") self.root.set("ExportDate", export_date) def apply_updates(self): """Apply all updates to the boilerplate.""" self.update_module_name() self.update_parent_module() self.update_port_address() self.update_inhibited_status() self.update_application_tag() self.update_segment_configuration() self.update_output_tag_segment_data() self.update_export_date() def save(self, output_path: str): """Save the updated module to file.""" if self.tree is None: raise RuntimeError("No boilerplate loaded. Call load_boilerplate() first.") # Save with proper formatting and preserve CDATA sections xml_string = ET.tostring(self.root, encoding='unicode') # Fix CDATA wrapper for L5K data and DATA members - ElementTree strips CDATA sections import re # Pattern to find L5K data and DATA members that need CDATA wrapper l5k_pattern = r'()(\s*\[.*?\]|\s*\(.*?\))\s*()' data_pattern = r'(]*>)([^<]*)()' def replace_with_cdata(match): opening_tag = match.group(1) data_content = match.group(2).strip() closing_tag = match.group(3) # Add proper indentation and line breaks return f'{opening_tag}\n\n{closing_tag}' # Apply CDATA wrapper to L5K data and DATA members xml_string = re.sub(l5k_pattern, replace_with_cdata, xml_string, flags=re.DOTALL | re.MULTILINE) xml_string = re.sub(data_pattern, replace_with_cdata, xml_string, flags=re.DOTALL | re.MULTILINE) # Write the corrected XML with open(output_path, 'w', encoding='utf-8') as f: f.write('\n') f.write(xml_string) def get_xml_string(self) -> str: """Get the XML as a string.""" if self.tree is None: raise RuntimeError("No boilerplate loaded. Call load_boilerplate() first.") return ET.tostring(self.root, encoding='unicode') # ------------------------------------------------------------------ # Convenience helper used by EnhancedMCMGenerator's factory dispatch. # ------------------------------------------------------------------ @classmethod def from_mapping(cls, mapping: Dict[str, str]) -> "TL70BeaconGenerator": """Create and fully configure a beacon generator from the Excel-derived `beacon_modules` entry (a plain dict). The structure expected is the one produced in EnhancedMCMGenerator._organize_modules_by_type().""" # Determine segment colors based on description description = mapping.get("description", "").upper() segment_1_color, segment_2_color, segment_3_color = _determine_segment_colors(description) cfg = TL70BeaconConfig( name=mapping["name"], parent_module=mapping["parent_module"], parent_port_id=mapping["parent_port_id"], port_address=mapping["port_address"], application_tag=mapping["application_tag"], segment_1_color=segment_1_color, segment_2_color=segment_2_color, segment_3_color=segment_3_color, ) gen = cls(cfg) gen.load_boilerplate() gen.apply_updates() return gen @classmethod def from_excel(cls, module_data, *, parent_module: str = "IOLM1", port_address: str = "0") -> "TL70BeaconGenerator": """Create, configure, and return a generator using Excel data.""" # Get DESB description from module data to determine segment colors description = "" if hasattr(module_data, 'description') and module_data.description: description = module_data.description elif hasattr(module_data, 'io_mappings') and module_data.io_mappings: # Try to get DESB first, then fall back to description from IO mappings for mapping in module_data.io_mappings: if hasattr(mapping, 'desb') and mapping.desb: description = mapping.desb break elif mapping.description: description = mapping.description # Determine segment colors based on DESB description segment_1_color, segment_2_color, segment_3_color = _determine_segment_colors(description.upper()) cfg = TL70BeaconConfig( name=module_data.tagname, parent_module=parent_module, parent_port_id="4", # Always use port 4 for IO-Link port_address=port_address, application_tag="***", # Default application tag segment_1_color=segment_1_color, segment_2_color=segment_2_color, segment_3_color=segment_3_color, ) gen = cls(cfg) gen.load_boilerplate() gen.apply_updates() return gen def create_tl70_beacon(name: str, parent_module: str = "IOLM1", parent_port_id: str = "4", port_address: str = "0", application_tag: str = "***", segment_1_color: int = 0, segment_1_flash_rate: int = 0, segment_2_color: int = 0, segment_2_flash_rate: int = 0, segment_3_color: int = 9, segment_3_flash_rate: int = 0, segment_4_color: int = 0, segment_4_flash_rate: int = 0, segment_5_color: int = 0, segment_5_flash_rate: int = 0, segment_6_color: int = 0, segment_6_flash_rate: int = 0, operating_mode: int = 1) -> TL70BeaconConfig: """Factory function to create a TL70 beacon configuration.""" return TL70BeaconConfig( name=name, parent_module=parent_module, parent_port_id=parent_port_id, port_address=port_address, application_tag=application_tag, segment_1_color=segment_1_color, segment_1_flash_rate=segment_1_flash_rate, segment_2_color=segment_2_color, segment_2_flash_rate=segment_2_flash_rate, segment_3_color=segment_3_color, segment_3_flash_rate=segment_3_flash_rate, segment_4_color=segment_4_color, segment_4_flash_rate=segment_4_flash_rate, segment_5_color=segment_5_color, segment_5_flash_rate=segment_5_flash_rate, segment_6_color=segment_6_color, segment_6_flash_rate=segment_6_flash_rate, operating_mode=operating_mode ) # Color constants for easy reference class TL70Colors: """Color constants for TL70 beacon segments.""" GREEN = 0 # L5X value 0 = GREEN RED = 1 # L5X value 1 = RED COLOR_2 = 2 # L5X value 2 = (unknown color) AMBER = 3 # L5X value 3 = AMBER COLOR_4 = 4 # L5X value 4 = (unknown color) MAGENTA = 5 # L5X value 5 = MAGENTA CYAN = 6 # L5X value 6 = CYAN WHITE = 7 # L5X value 7 = WHITE CUSTOM_1 = 8 # L5X value 8 = CUSTOM_1 BLUE = 9 # L5X value 9 = BLUE # Colors 10-15 are additional custom colors # Flash rate constants class TL70FlashRates: """Flash rate constants for TL70 beacon segments.""" STEADY = 0 SLOW = 1 MEDIUM = 2 FAST = 3 # Additional rates 4-15 available def _determine_segment_colors(description: str) -> Tuple[int, int, int]: """Determine TL70 beacon segment colors based on DESB description patterns. Args: description: DESB description text to analyze Returns: Tuple of (segment_1_color, segment_2_color, segment_3_color) Logic: - If "3" in DESB: segment 1=0 (GREEN), segment 2=3 (AMBER), segment 3=9 (BLUE) - If "2" in DESB: segment 1=0 (GREEN), segment 2=9 (BLUE), segment 3=0 (GREEN) - Default: segment 1=0 (GREEN), segment 2=0 (GREEN), segment 3=9 (BLUE) """ description = description.upper() if "3" in description: return (0, 3, 9) # segment 1=0 (GREEN), segment 2=3 (AMBER), segment 3=9 (BLUE) elif "2" in description: return (0, 9, 0) # segment 1=0 (GREEN), segment 2=9 (BLUE), segment 3=0 (GREEN) else: return (0, 0, 9) # Default: segment 1=0 (GREEN), segment 2=0 (GREEN), segment 3=9 (BLUE) # Example usage if __name__ == "__main__": # Example 1: Create a TL70 beacon with manual configuration config1 = create_tl70_beacon( name="BEACON1", parent_module="IOLM1", parent_port_id="4", port_address="0", application_tag="STATUS_BEACON", segment_1_color=TL70Colors.GREEN, segment_1_flash_rate=TL70FlashRates.STEADY, segment_3_color=TL70Colors.RED, segment_3_flash_rate=TL70FlashRates.SLOW ) generator1 = TL70BeaconGenerator(config1) generator1.load_boilerplate() generator1.apply_updates() generator1.save("generated/BEACON1.L5X") print(f"Generated TL70 beacon module: {config1.name}") print(f"Parent module: {config1.parent_module}") print(f"Port: {config1.parent_port_id}") print(f"Address: {config1.port_address}") # Example 2: Test automatic segment color detection print("\n--- Testing automatic segment color detection ---") # Test description with "3" colors_3 = _determine_segment_colors("BEACON WITH 3 SEGMENTS") print(f"Description with '3': Segments = {colors_3} (Segment 1=GREEN, Segment 2=AMBER, Segment 3=BLUE)") # Test description with "2" colors_2 = _determine_segment_colors("BEACON WITH 2 SEGMENTS") print(f"Description with '2': Segments = {colors_2} (Segment 1=GREEN, Segment 2=BLUE, Segment 3=GREEN)") # Test default colors_default = _determine_segment_colors("BEACON STATUS") print(f"Default description: Segments = {colors_default} (Segment 1=GREEN, Segment 2=GREEN, Segment 3=BLUE)") # Example 3: Create beacon with automatic segment detection for "3" config2 = create_tl70_beacon( name="BEACON2", parent_module="PDP_FIO1", port_address="2", application_tag="AUTO_3_SEG" ) # Apply automatic segment detection seg1, seg2, seg3 = _determine_segment_colors("BEACON 3") config2.segment_1_color = seg1 config2.segment_2_color = seg2 config2.segment_3_color = seg3 generator2 = TL70BeaconGenerator(config2) generator2.load_boilerplate() generator2.apply_updates() generator2.save("generated/BEACON2.L5X") print(f"\nGenerated auto-configured beacon: {config2.name} with segments {(seg1, seg2, seg3)}")