#!/usr/bin/env python3
"""
TL70 Beacon Module Boilerplate Model
====================================
Model for TL70 Pro with IO-Link beacon modules.
Supports configuring module name, parent module, port address, and basic segment settings.
"""
from typing import Dict, Optional, Tuple
import xml.etree.ElementTree as ET
from dataclasses import dataclass
from datetime import datetime
import os
@dataclass
class TL70BeaconConfig:
"""Configuration for a TL70 beacon instance."""
name: str # Module name (e.g., "BEACON1")
parent_module: str = "IOLM1" # Parent IO-Link master module
parent_port_id: str = "4" # Port on the IO-Link master (always 4 for IOLM modules)
port_address: str = "0" # IO-Link port address
inhibited: bool = False
major_fault: bool = False
application_tag: str = "***" # Application specific tag (up to 29 chars)
segment_1_color: int = 0 # Segment 1 basic color (0-15)
segment_1_flash_rate: int = 0 # Segment 1 flash rate (0-15)
segment_2_color: int = 0 # Segment 2 basic color (0-15)
segment_2_flash_rate: int = 0 # Segment 2 flash rate (0-15)
segment_3_color: int = 9 # Segment 3 basic color (0-15)
segment_3_flash_rate: int = 0 # Segment 3 flash rate (0-15)
segment_4_color: int = 0 # Segment 4 basic color (0-15)
segment_4_flash_rate: int = 0 # Segment 4 flash rate (0-15)
segment_5_color: int = 0 # Segment 5 basic color (0-15)
segment_5_flash_rate: int = 0 # Segment 5 flash rate (0-15)
segment_6_color: int = 0 # Segment 6 basic color (0-15)
segment_6_flash_rate: int = 0 # Segment 6 flash rate (0-15)
operating_mode: int = 1 # Operating mode: 0=Basic, 1=Run, 2=Level, 3=Gauge, 4=Advanced
class TL70BeaconGenerator:
"""Generator for TL70 beacon module XML."""
def __init__(self, config: TL70BeaconConfig):
self.config = config
# Use project-specific boilerplate directory if set, otherwise default
boilerplate_dir = os.environ.get('MCM_BOILERPLATE_DIR', 'boilerplate')
self.boilerplate_path = os.path.join(boilerplate_dir, "TL70_Module.L5X")
self.tree = None
self.root = None
def load_boilerplate(self):
"""Load the TL70 boilerplate template."""
if not os.path.exists(self.boilerplate_path):
raise FileNotFoundError(f"Boilerplate file not found: {self.boilerplate_path}")
self.tree = ET.parse(self.boilerplate_path)
self.root = self.tree.getroot()
def update_module_name(self):
"""Update the module name throughout the XML."""
# Update in root attributes
self.root.set("TargetName", self.config.name)
# Update Module element
module = self.root.find(".//Module[@Use='Target']")
if module is not None:
module.set("Name", self.config.name)
def update_parent_module(self):
"""Update parent module references."""
module = self.root.find(".//Module[@Use='Target']")
if module is not None:
module.set("ParentModule", self.config.parent_module)
# For IO-Link Master modules (M12DR), always use port 4 (the IO-Link port)
# regardless of what was specified in the config
parent_port_id = "4" # Always use port 4 for IOLM modules
module.set("ParentModPortId", parent_port_id)
def update_port_address(self):
"""Update the IO-Link port address."""
port = self.root.find(".//Port[@Type='IO-Link']")
if port is not None:
port.set("Address", self.config.port_address)
def update_inhibited_status(self):
"""Update the inhibited status."""
module = self.root.find(".//Module[@Use='Target']")
if module is not None:
module.set("Inhibited", str(self.config.inhibited).lower())
module.set("MajorFault", str(self.config.major_fault).lower())
def update_application_tag(self):
"""Update the application specific tag."""
# Find the Application_specific_Tag structure
app_tag_len = self.root.find(".//StructureMember[@Name='Application_specific_Tag']//DataValueMember[@Name='LEN']")
app_tag_data = self.root.find(".//StructureMember[@Name='Application_specific_Tag']//DataValueMember[@Name='DATA']")
if app_tag_len is not None and app_tag_data is not None:
# Update length
app_tag_len.set("Value", str(len(self.config.application_tag)))
# Update data - need to format as CDATA with proper padding
padded_tag = self.config.application_tag + "$00" * (29 - len(self.config.application_tag))
# Just set the text directly, we'll handle CDATA in save()
app_tag_data.text = f"'{padded_tag}'"
# Also update the L5K data format
config_tag = self.root.find(".//ConfigTag//Data[@Format='L5K']")
if config_tag is not None:
# The L5K format contains the tag data - we need to update it
# This is complex binary data, so we'll focus on the Decorated format
pass
def update_segment_configuration(self):
"""Update segment color and flash rate configurations."""
segments = [
(1, self.config.segment_1_color, self.config.segment_1_flash_rate),
(2, self.config.segment_2_color, self.config.segment_2_flash_rate),
(3, self.config.segment_3_color, self.config.segment_3_flash_rate),
(4, self.config.segment_4_color, self.config.segment_4_flash_rate),
(5, self.config.segment_5_color, self.config.segment_5_flash_rate),
(6, self.config.segment_6_color, self.config.segment_6_flash_rate),
]
for segment_num, color, flash_rate in segments:
# Update basic color
color_elem = self.root.find(f".//StructureMember[@Name='Segment_{segment_num}_Config']//DataValueMember[@Name='Segment_{segment_num}_Settings_Basic_Color']")
if color_elem is not None:
color_elem.set("Value", str(color))
# Update flash rate
flash_elem = self.root.find(f".//StructureMember[@Name='Segment_{segment_num}_Config']//DataValueMember[@Name='Segment_{segment_num}_Settings_Basic_Flash_Rate']")
if flash_elem is not None:
flash_elem.set("Value", str(flash_rate))
def update_output_tag_segment_data(self):
"""Update the output tag segment data - only Color_1 values are configured, everything else is zero."""
# List of all segment-related output fields that should be set to 0
segment_fields = [
# Segment 1 fields
"Segment_1_Color_2", "Segment_1_Color_1_Intensity", "Segment_1_Color_2_Intensity",
"Segment_1_Pulse_Pattern", "Segment_1_Speed", "Segment_1_Animation_Type",
# Segment 2 fields
"Segment_2_Color_2", "Segment_2_Color_1_Intensity", "Segment_2_Color_2_Intensity",
"Segment_2_Pulse_Pattern", "Segment_2_Speed", "Segment_2_Animation_Type",
# Segment 3 fields
"Segment_3_Color_2", "Segment_3_Color_1_Intensity", "Segment_3_Color_2_Intensity",
"Segment_3_Pulse_Pattern", "Segment_3_Speed", "Segment_3_Animation_Type",
# Segment 4 fields
"Segment_4_Color_1", "Segment_4_Color_2", "Segment_4_Color_1_Intensity", "Segment_4_Color_2_Intensity",
"Segment_4_Pulse_Pattern", "Segment_4_Speed", "Segment_4_Animation_Type",
# Segment 5 fields
"Segment_5_Color_1", "Segment_5_Color_2", "Segment_5_Color_1_Intensity", "Segment_5_Color_2_Intensity",
"Segment_5_Pulse_Pattern", "Segment_5_Speed", "Segment_5_Animation_Type",
# Segment 6 fields
"Segment_6_Color_1", "Segment_6_Color_2", "Segment_6_Color_1_Intensity", "Segment_6_Color_2_Intensity",
"Segment_6_Pulse_Pattern", "Segment_6_Speed", "Segment_6_Animation_Type",
# Audible
"Audible"
]
# Set all segment fields to 0
for field_name in segment_fields:
field_elem = self.root.find(f".//OutputTag//DataValueMember[@Name='{field_name}']")
if field_elem is not None:
field_elem.set("Value", "0")
# Now set only the Color_1 values for segments 1, 2, 3 to configured values
seg1_color1 = self.root.find(".//OutputTag//DataValueMember[@Name='Segment_1_Color_1']")
if seg1_color1 is not None:
seg1_color1.set("Value", str(self.config.segment_1_color))
seg2_color1 = self.root.find(".//OutputTag//DataValueMember[@Name='Segment_2_Color_1']")
if seg2_color1 is not None:
seg2_color1.set("Value", str(self.config.segment_2_color))
seg3_color1 = self.root.find(".//OutputTag//DataValueMember[@Name='Segment_3_Color_1']")
if seg3_color1 is not None:
seg3_color1.set("Value", str(self.config.segment_3_color))
def update_export_date(self):
"""Update the export date to current time."""
export_date = datetime.now().strftime("%a %b %d %H:%M:%S %Y")
self.root.set("ExportDate", export_date)
def apply_updates(self):
"""Apply all updates to the boilerplate."""
self.update_module_name()
self.update_parent_module()
self.update_port_address()
self.update_inhibited_status()
self.update_application_tag()
self.update_segment_configuration()
self.update_output_tag_segment_data()
self.update_export_date()
def save(self, output_path: str):
"""Save the updated module to file."""
if self.tree is None:
raise RuntimeError("No boilerplate loaded. Call load_boilerplate() first.")
# Save with proper formatting and preserve CDATA sections
xml_string = ET.tostring(self.root, encoding='unicode')
# Fix CDATA wrapper for L5K data and DATA members - ElementTree strips CDATA sections
import re
# Pattern to find L5K data and DATA members that need CDATA wrapper
l5k_pattern = r'()(\s*\[.*?\]|\s*\(.*?\))\s*()'
data_pattern = r'(]*>)([^<]*)()'
def replace_with_cdata(match):
opening_tag = match.group(1)
data_content = match.group(2).strip()
closing_tag = match.group(3)
# Add proper indentation and line breaks
return f'{opening_tag}\n\n{closing_tag}'
# Apply CDATA wrapper to L5K data and DATA members
xml_string = re.sub(l5k_pattern, replace_with_cdata, xml_string, flags=re.DOTALL | re.MULTILINE)
xml_string = re.sub(data_pattern, replace_with_cdata, xml_string, flags=re.DOTALL | re.MULTILINE)
# Write the corrected XML
with open(output_path, 'w', encoding='utf-8') as f:
f.write('\n')
f.write(xml_string)
def get_xml_string(self) -> str:
"""Get the XML as a string."""
if self.tree is None:
raise RuntimeError("No boilerplate loaded. Call load_boilerplate() first.")
return ET.tostring(self.root, encoding='unicode')
# ------------------------------------------------------------------
# Convenience helper used by EnhancedMCMGenerator's factory dispatch.
# ------------------------------------------------------------------
@classmethod
def from_mapping(cls, mapping: Dict[str, str]) -> "TL70BeaconGenerator":
"""Create and fully configure a beacon generator from the Excel-derived
`beacon_modules` entry (a plain dict). The structure expected is the one
produced in EnhancedMCMGenerator._organize_modules_by_type()."""
# Determine segment colors based on description
description = mapping.get("description", "").upper()
segment_1_color, segment_2_color, segment_3_color = _determine_segment_colors(description)
cfg = TL70BeaconConfig(
name=mapping["name"],
parent_module=mapping["parent_module"],
parent_port_id=mapping["parent_port_id"],
port_address=mapping["port_address"],
application_tag=mapping["application_tag"],
segment_1_color=segment_1_color,
segment_2_color=segment_2_color,
segment_3_color=segment_3_color,
)
gen = cls(cfg)
gen.load_boilerplate()
gen.apply_updates()
return gen
@classmethod
def from_excel(cls, module_data, *, parent_module: str = "IOLM1", port_address: str = "0") -> "TL70BeaconGenerator":
"""Create, configure, and return a generator using Excel data."""
# Get DESB description from module data to determine segment colors
description = ""
if hasattr(module_data, 'description') and module_data.description:
description = module_data.description
elif hasattr(module_data, 'io_mappings') and module_data.io_mappings:
# Try to get DESB first, then fall back to description from IO mappings
for mapping in module_data.io_mappings:
if hasattr(mapping, 'desb') and mapping.desb:
description = mapping.desb
break
elif mapping.description:
description = mapping.description
# Determine segment colors based on DESB description
segment_1_color, segment_2_color, segment_3_color = _determine_segment_colors(description.upper())
cfg = TL70BeaconConfig(
name=module_data.tagname,
parent_module=parent_module,
parent_port_id="4", # Always use port 4 for IO-Link
port_address=port_address,
application_tag="***", # Default application tag
segment_1_color=segment_1_color,
segment_2_color=segment_2_color,
segment_3_color=segment_3_color,
)
gen = cls(cfg)
gen.load_boilerplate()
gen.apply_updates()
return gen
def create_tl70_beacon(name: str, parent_module: str = "IOLM1", parent_port_id: str = "4",
port_address: str = "0", application_tag: str = "***",
segment_1_color: int = 0, segment_1_flash_rate: int = 0,
segment_2_color: int = 0, segment_2_flash_rate: int = 0,
segment_3_color: int = 9, segment_3_flash_rate: int = 0,
segment_4_color: int = 0, segment_4_flash_rate: int = 0,
segment_5_color: int = 0, segment_5_flash_rate: int = 0,
segment_6_color: int = 0, segment_6_flash_rate: int = 0,
operating_mode: int = 1) -> TL70BeaconConfig:
"""Factory function to create a TL70 beacon configuration."""
return TL70BeaconConfig(
name=name,
parent_module=parent_module,
parent_port_id=parent_port_id,
port_address=port_address,
application_tag=application_tag,
segment_1_color=segment_1_color,
segment_1_flash_rate=segment_1_flash_rate,
segment_2_color=segment_2_color,
segment_2_flash_rate=segment_2_flash_rate,
segment_3_color=segment_3_color,
segment_3_flash_rate=segment_3_flash_rate,
segment_4_color=segment_4_color,
segment_4_flash_rate=segment_4_flash_rate,
segment_5_color=segment_5_color,
segment_5_flash_rate=segment_5_flash_rate,
segment_6_color=segment_6_color,
segment_6_flash_rate=segment_6_flash_rate,
operating_mode=operating_mode
)
# Color constants for easy reference
class TL70Colors:
"""Color constants for TL70 beacon segments."""
GREEN = 0 # L5X value 0 = GREEN
RED = 1 # L5X value 1 = RED
COLOR_2 = 2 # L5X value 2 = (unknown color)
AMBER = 3 # L5X value 3 = AMBER
COLOR_4 = 4 # L5X value 4 = (unknown color)
MAGENTA = 5 # L5X value 5 = MAGENTA
CYAN = 6 # L5X value 6 = CYAN
WHITE = 7 # L5X value 7 = WHITE
CUSTOM_1 = 8 # L5X value 8 = CUSTOM_1
BLUE = 9 # L5X value 9 = BLUE
# Colors 10-15 are additional custom colors
# Flash rate constants
class TL70FlashRates:
"""Flash rate constants for TL70 beacon segments."""
STEADY = 0
SLOW = 1
MEDIUM = 2
FAST = 3
# Additional rates 4-15 available
def _determine_segment_colors(description: str) -> Tuple[int, int, int]:
"""Determine TL70 beacon segment colors based on DESB description patterns.
Args:
description: DESB description text to analyze
Returns:
Tuple of (segment_1_color, segment_2_color, segment_3_color)
Logic:
- If "3" in DESB: segment 1=0 (GREEN), segment 2=3 (AMBER), segment 3=9 (BLUE)
- If "2" in DESB: segment 1=0 (GREEN), segment 2=9 (BLUE), segment 3=0 (GREEN)
- Default: segment 1=0 (GREEN), segment 2=0 (GREEN), segment 3=9 (BLUE)
"""
description = description.upper()
if "3" in description:
return (0, 3, 9) # segment 1=0 (GREEN), segment 2=3 (AMBER), segment 3=9 (BLUE)
elif "2" in description:
return (0, 9, 0) # segment 1=0 (GREEN), segment 2=9 (BLUE), segment 3=0 (GREEN)
else:
return (0, 0, 9) # Default: segment 1=0 (GREEN), segment 2=0 (GREEN), segment 3=9 (BLUE)
# Example usage
if __name__ == "__main__":
# Example 1: Create a TL70 beacon with manual configuration
config1 = create_tl70_beacon(
name="BEACON1",
parent_module="IOLM1",
parent_port_id="4",
port_address="0",
application_tag="STATUS_BEACON",
segment_1_color=TL70Colors.GREEN,
segment_1_flash_rate=TL70FlashRates.STEADY,
segment_3_color=TL70Colors.RED,
segment_3_flash_rate=TL70FlashRates.SLOW
)
generator1 = TL70BeaconGenerator(config1)
generator1.load_boilerplate()
generator1.apply_updates()
generator1.save("generated/BEACON1.L5X")
print(f"Generated TL70 beacon module: {config1.name}")
print(f"Parent module: {config1.parent_module}")
print(f"Port: {config1.parent_port_id}")
print(f"Address: {config1.port_address}")
# Example 2: Test automatic segment color detection
print("\n--- Testing automatic segment color detection ---")
# Test description with "3"
colors_3 = _determine_segment_colors("BEACON WITH 3 SEGMENTS")
print(f"Description with '3': Segments = {colors_3} (Segment 1=GREEN, Segment 2=AMBER, Segment 3=BLUE)")
# Test description with "2"
colors_2 = _determine_segment_colors("BEACON WITH 2 SEGMENTS")
print(f"Description with '2': Segments = {colors_2} (Segment 1=GREEN, Segment 2=BLUE, Segment 3=GREEN)")
# Test default
colors_default = _determine_segment_colors("BEACON STATUS")
print(f"Default description: Segments = {colors_default} (Segment 1=GREEN, Segment 2=GREEN, Segment 3=BLUE)")
# Example 3: Create beacon with automatic segment detection for "3"
config2 = create_tl70_beacon(
name="BEACON2",
parent_module="PDP_FIO1",
port_address="2",
application_tag="AUTO_3_SEG"
)
# Apply automatic segment detection
seg1, seg2, seg3 = _determine_segment_colors("BEACON 3")
config2.segment_1_color = seg1
config2.segment_2_color = seg2
config2.segment_3_color = seg3
generator2 = TL70BeaconGenerator(config2)
generator2.load_boilerplate()
generator2.apply_updates()
generator2.save("generated/BEACON2.L5X")
print(f"\nGenerated auto-configured beacon: {config2.name} with segments {(seg1, seg2, seg3)}")