2025-08-05 14:38:54 +04:00

577 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""ControllerBuilder
====================
Responsible for building the static parts of a 1756-L83ES controller project:
• Creates the base controller boilerplate (Local slot, etc.)
• Injects fixed chassis modules (EN4TR, IB16, OB16E, IB16S)
• Provides a hook (`modules_section`) so external callers can append additional
module <Module> elements (e.g. Excel-driven families).
• Finalises the project (programs/tasks, auxiliary sections, export date) and
writes it to an L5X file while preserving CDATA blocks.
The class intentionally keeps **zero** knowledge about Excel or individual
module families beyond the fixed chassis modules; that logic stays in
`EnhancedMCMGenerator` (or higher-level orchestrators).
"""
from __future__ import annotations
import os
import re
import xml.etree.ElementTree as ET
from datetime import datetime
from typing import Optional
# Base controller & fixed slot models
from models.l83es_boilerplate_model import (
create_l83es_controller,
L83ESControllerGenerator,
)
from models.en4tr_boilerplate_model import (
create_en4tr_module,
EN4TRModuleGenerator,
)
from models.ib16_boilerplate_model import (
create_ib16_module,
IB16ModuleGenerator,
)
from models.ob16e_boilerplate_model import (
create_ob16e_module,
OB16EModuleGenerator,
)
from models.ib16s_boilerplate_model import (
create_ib16s_module,
IB16SModuleGenerator,
)
def validate_export_date(export_date_str: str) -> bool:
"""Validate that ExportDate is not in the future.
Args:
export_date_str: ExportDate string in Rockwell format (e.g., "Wed Jul 03 11:47:56 2024")
Returns:
True if valid (not in future), False if invalid
"""
try:
# Parse the ExportDate string
parsed_date = datetime.strptime(export_date_str, "%a %b %d %H:%M:%S %Y")
current_date = datetime.now()
# Check if date is in the future (allow small buffer for processing time)
if parsed_date > current_date:
print(f"WARNING: Future ExportDate detected: {export_date_str}")
print(f" Current time: {current_date.strftime('%a %b %d %H:%M:%S %Y')}")
return False
return True
except ValueError as e:
print(f"WARNING: Invalid ExportDate format: {export_date_str} - {e}")
return False
class ControllerBuilder:
"""Constructs the controller XML tree and offers an API for callers to
attach extra modules before the project is saved.
"""
def __init__(self, controller_name: str, skip_chassis_modules: bool = False):
self.controller_name = controller_name
self.skip_chassis_modules = skip_chassis_modules
# 1. Build base controller from boilerplate
controller_cfg = create_l83es_controller(controller_name)
gen = L83ESControllerGenerator(controller_cfg)
gen.load_boilerplate()
gen.apply_updates()
self.tree: ET.ElementTree = gen.tree
self.root: ET.Element = gen.root
# 2. Apply controller-level attributes
self._configure_controller_settings()
# 3. Insert fixed chassis/IO modules
self._add_fixed_modules()
# ---------------------------------------------------------------------
# Public helpers
# ---------------------------------------------------------------------
def get_modules_section(self) -> ET.Element:
"""Expose the <Modules> container so external code can append modules."""
controller = self.root.find(".//Controller[@Use='Target']")
if controller is None:
raise ValueError("Controller element not found this should never happen")
modules = controller.find("Modules")
if modules is None:
raise ValueError("<Modules> section missing builder initialisation failed")
return modules
def finalise_and_save(self, filename: str):
"""Complete remaining sections and write the finished L5X file."""
# Add logical program/task scaffolding only once at the end so that any
# Excel-added AOIs etc. (if present) appear *before* Programs, matching
# the reference file ordering.
self._add_programs_and_tasks()
self._configure_additional_elements()
# Update export date Rockwell format: "Wed Jul 03 11:47:56 2024"
export_date = datetime.now().strftime("%a %b %d %H:%M:%S %Y")
# Validate the timestamp before setting it
if validate_export_date(export_date):
self.root.set("ExportDate", export_date)
else:
print(f"ERROR: Refusing to set invalid ExportDate: {export_date}")
raise ValueError(f"Invalid ExportDate generated: {export_date}")
# Indent & persist
self._save_project(self.tree, filename)
def add_generated_tags(self, routines_generator_dir: str, zones_dict=None):
"""Add the generated tags from the Routines Generator into this controller.
Args:
routines_generator_dir: Path to the Routines Generator directory containing generated files
zones_dict: Optional zones configuration to use instead of DEFAULT_ZONES
"""
controller = self.root.find(".//Controller[@Use='Target']")
if controller is None:
print("ERROR: Controller element not found")
return
# Find or create Tags section
tags_section = controller.find("Tags")
if tags_section is None:
tags_section = ET.SubElement(controller, "Tags")
try:
# Import the tag writer to get tag XML elements
import sys
sys.path.append(f"{routines_generator_dir}/src")
from writers.xml_tag_writer import create_limited_tag_xml_elements
from data_loader import DataLoader
# Create DataLoader for DESC_IP extraction
desc_ip_file = f"{routines_generator_dir}/DESC_IP_MERGED.xlsx"
data_loader = DataLoader(desc_ip_file, zones_dict=zones_dict)
# Generate tag XML elements
print(" Generating tags from DESC_IP data...")
tag_elements = create_limited_tag_xml_elements(desc_ip_file, data_loader)
# Add each tag element to the Tags section
tags_added = 0
for tag_element in tag_elements:
tags_section.append(tag_element)
tags_added += 1
print(f" Successfully embedded {tags_added} tags into complete project")
except Exception as e:
print(f"WARNING: Failed to embed tags: {e}")
# Continue without tags - the project will still have hardware modules
def add_generated_programs(self, routines_generator_dir: str):
"""Add the generated programs from the Routines Generator into this controller.
Args:
routines_generator_dir: Path to the Routines Generator directory containing generated L5X files
"""
controller = self.root.find(".//Controller[@Use='Target']")
if controller is None:
print("ERROR: Controller element not found")
return
# Find or create Programs section
programs = controller.find("Programs")
if programs is None:
programs = ET.SubElement(controller, "Programs")
# Load and embed SafetyProgram
safety_l5x_path = f"{routines_generator_dir}/SafetyProgram_Generated.L5X"
if os.path.exists(safety_l5x_path):
print(f" Loading SafetyProgram from {safety_l5x_path}")
safety_tree = ET.parse(safety_l5x_path)
safety_program = safety_tree.find(".//Program[@Name='SafetyProgram']")
if safety_program is not None:
# Extract SafetyTagMap and move it to SafetyInfo before embedding the program
safety_tag_map = safety_program.find("SafetyTagMap")
if safety_tag_map is not None:
# Find SafetyInfo element in controller
safety_info = controller.find("SafetyInfo")
if safety_info is not None:
# Remove SafetyTagMap from program and add to SafetyInfo
safety_program.remove(safety_tag_map)
safety_info.append(safety_tag_map)
print(" Moved SafetyTagMap from program to SafetyInfo")
else:
print(" WARNING: SafetyInfo element not found in controller")
programs.append(safety_program)
print(f" Successfully embedded SafetyProgram with {len(safety_program.find('Routines'))} routines")
else:
print(" WARNING: SafetyProgram element not found in L5X file")
else:
print(f" WARNING: SafetyProgram not found at {safety_l5x_path}")
# Load and embed MainProgram
main_l5x_path = f"{routines_generator_dir}/MainProgram_Generated.L5X"
if os.path.exists(main_l5x_path):
print(f" Loading MainProgram from {main_l5x_path}")
main_tree = ET.parse(main_l5x_path)
main_program = main_tree.find(".//Program[@Name='MainProgram']")
if main_program is not None:
programs.append(main_program)
print(f" Successfully embedded MainProgram with {len(main_program.find('Routines'))} routines")
else:
print(" WARNING: MainProgram element not found in L5X file")
else:
print(f" WARNING: MainProgram not found at {main_l5x_path}")
if len(list(programs)) > 0:
print(f" Successfully embedded {len(list(programs))} programs into complete project")
else:
print(" WARNING: No programs were embedded - complete project will have hardware only")
def _embed_program_from_l5x(self, programs_section: ET.Element, l5x_file_path: str):
"""Extract the Program element from an L5X file and embed it into the controller.
Args:
programs_section: The <Programs> element in the controller
l5x_file_path: Path to the L5X file containing the program to embed
"""
try:
# Parse the L5X file
tree = ET.parse(l5x_file_path)
root = tree.getroot()
# Find the Program element
program_element = root.find(".//Program[@Use='Target']")
if program_element is None:
print(f" ERROR: No Program with Use='Target' found in {l5x_file_path}")
return
# Remove the Use='Target' attribute since we're embedding it
if 'Use' in program_element.attrib:
del program_element.attrib['Use']
# Check if program already exists
program_name = program_element.get('Name', 'UnknownProgram')
existing = programs_section.find(f"Program[@Name='{program_name}']")
if existing is not None:
print(f" WARNING: Program '{program_name}' already exists, replacing it")
programs_section.remove(existing)
# Add the complete program to the controller
programs_section.append(program_element)
print(f" [SUCCESS] Embedded program '{program_name}'")
# Count routines for verification
routines = program_element.find("Routines")
if routines is not None:
routine_count = len(list(routines))
print(f" [INFO] Program contains {routine_count} routines")
except ET.ParseError as e:
print(f" ERROR: Failed to parse L5X file {l5x_file_path}: {e}")
except Exception as e:
print(f" ERROR: Failed to embed program from {l5x_file_path}: {e}")
# ------------------------------------------------------------------
# Internal helpers (mostly verbatim from previous implementation)
# ------------------------------------------------------------------
def _configure_controller_settings(self):
root = self.root
root.set("TargetName", self.controller_name)
root.set("TargetType", "Controller")
root.set("ContainsContext", "false")
root.set(
"ExportOptions",
"NoRawData L5KData DecoratedData ForceProtectedEncoding AllProjDocTrans",
)
controller = root.find(".//Controller[@Use='Target']")
if controller is None:
return
controller.set("Name", self.controller_name)
controller.set("ProcessorType", "1756-L83ES")
controller.set("MajorRev", "36")
controller.set("MinorRev", "11")
controller.set("ProjectSN", "16#0000_0000")
controller.set("MatchProjectToController", "false")
controller.set("CanUseRPIFromProducer", "false")
controller.set("InhibitAutomaticFirmwareUpdate", "0")
controller.set("PassThroughConfiguration", "EnabledWithAppend")
controller.set("DownloadProjectDocumentationAndExtendedProperties", "true")
controller.set("DownloadProjectCustomProperties", "true")
controller.set("ReportMinorOverflow", "false")
controller.set("AutoDiagsEnabled", "true")
controller.set("WebServerEnabled", "false")
safety_info = controller.find("SafetyInfo")
if safety_info is None:
safety_info = ET.SubElement(controller, "SafetyInfo")
safety_info.set("SafetyLocked", "false")
safety_info.set("SignatureRunModeProtect", "false")
safety_info.set("ConfigureSafetyIOAlways", "false")
safety_info.set("SafetyLevel", "SIL2/PLd")
# ------------------------------------------------------------------
# Fixed chassis modules
# ------------------------------------------------------------------
def _add_fixed_modules(self):
controller = self.root.find(".//Controller[@Use='Target']")
if controller is None:
raise ValueError("Controller element not found")
# Ensure <Modules> exists right after SafetyInfo
modules = controller.find("Modules")
if modules is None:
safety_info = controller.find("SafetyInfo")
if safety_info is not None:
idx = list(controller).index(safety_info) + 1
modules = ET.Element("Modules")
controller.insert(idx, modules)
else:
modules = ET.SubElement(controller, "Modules")
# Local backplane tweaks
self._configure_local_module(modules)
self._add_en4tr_module(modules)
# Only add chassis modules if not skipped (to avoid conflicts with Excel-driven generation)
if not self.skip_chassis_modules:
self._add_ib16_module(modules)
self._add_ob16e_module(modules)
self._add_ib16s_module(modules)
def _configure_local_module(self, modules_section: ET.Element):
local_module = modules_section.find("Module[@Name='Local']")
if local_module is not None:
ports = local_module.find("Ports")
if ports is not None:
for port in ports.findall("Port"):
if port.get("Id") == "1":
port.set("SafetyNetwork", "16#0000_4c33_031d_8f1b")
bus = port.find("Bus")
if bus is None:
bus = ET.SubElement(port, "Bus")
bus.set("Size", "10")
elif port.get("Id") == "2":
port.set("SafetyNetwork", "16#0000_4c33_031d_8f1c")
def _add_en4tr_module(self, modules_section: ET.Element):
cfg = create_en4tr_module("SLOT2_EN4TR", self.controller_name)
gen = EN4TRModuleGenerator(cfg)
gen.load_boilerplate()
gen.apply_updates()
mod = gen.root.find(".//Module[@Name='SLOT2_EN4TR']")
if mod is not None:
mod.set("ParentModule", "Local")
mod.set("ParentModPortId", "1")
port = mod.find(".//Port[@Type='Ethernet']")
if port is not None:
port.set("Address", "11.200.1.1")
modules_section.append(mod)
def _add_ib16_module(self, modules_section: ET.Element):
cfg = create_ib16_module("SLOT5_IB16", "5", "Local", "1", None)
gen = IB16ModuleGenerator(cfg)
gen.load_boilerplate()
gen.apply_updates()
mod = gen.root.find(".//Module[@Name='SLOT5_IB16']")
if mod is not None:
mod.set("ParentModule", "Local")
mod.set("ParentModPortId", "1")
icp = mod.find(".//Port[@Type='ICP']")
if icp is not None:
icp.set("Address", "5")
modules_section.append(mod)
def _add_ob16e_module(self, modules_section: ET.Element):
cfg = create_ob16e_module("SLOT6_OB16E", "6", "Local", "1", None)
gen = OB16EModuleGenerator(cfg)
gen.load_boilerplate()
gen.apply_updates()
mod = gen.root.find(".//Module[@Name='SLOT6_OB16E']")
if mod is not None:
mod.set("ParentModule", "Local")
mod.set("ParentModPortId", "1")
mod.set("AutoDiagsEnabled", "true")
icp = mod.find(".//Port[@Type='ICP']")
if icp is not None:
icp.set("Address", "6")
modules_section.append(mod)
def _add_ib16s_module(self, modules_section: ET.Element):
cfg = create_ib16s_module(
"SLOT7_IB16S",
"7",
"Local",
"1",
"16#0000_4c33_031d_8f1b",
None,
)
gen = IB16SModuleGenerator(cfg)
gen.load_boilerplate()
gen.apply_updates()
mod = gen.root.find(".//Module[@Name='SLOT7_IB16S']")
if mod is not None:
mod.set("ParentModule", "Local")
mod.set("ParentModPortId", "1")
mod.set("SafetyNetwork", "16#0000_4c33_031d_8f1b")
mod.set("SafetyEnabled", "true")
icp = mod.find(".//Port[@Type='ICP']")
if icp is not None:
icp.set("Address", "7")
modules_section.append(mod)
# ------------------------------------------------------------------
# Programs, tasks, misc sections
# ------------------------------------------------------------------
def _add_programs_and_tasks(self):
controller = self.root.find(".//Controller[@Use='Target']")
if controller is None:
return
programs = controller.find("Programs")
if programs is None:
programs = ET.SubElement(controller, "Programs")
# Only create empty MainProgram if no programs exist yet
# (they might have been embedded from Routines Generator)
if len(list(programs)) == 0:
print(" No programs found, creating empty MainProgram skeleton")
existing_main = programs.find("Program[@Name='MainProgram']")
if existing_main is None:
main_prog = ET.SubElement(programs, "Program")
main_prog.set("Name", "MainProgram")
main_prog.set("TestEdits", "false")
main_prog.set("MainRoutineName", "MainRoutine")
main_prog.set("Disabled", "false")
main_prog.set("Class", "Standard")
main_prog.set("UseAsFolder", "false")
ET.SubElement(main_prog, "Tags")
routines = ET.SubElement(main_prog, "Routines")
main_routine = ET.SubElement(routines, "Routine")
main_routine.set("Name", "MainRoutine")
main_routine.set("Type", "RLL")
else:
print(f" Programs already exist ({len(list(programs))} programs found), skipping empty program creation")
# Always ensure Tasks section exists
tasks = controller.find("Tasks")
if tasks is None:
tasks = ET.SubElement(controller, "Tasks")
existing_task = tasks.find("Task[@Name='MainTask']")
if existing_task is None:
main_task = ET.SubElement(tasks, "Task")
main_task.set("Name", "MainTask")
main_task.set("Type", "CONTINUOUS")
main_task.set("Priority", "10")
main_task.set("Watchdog", "500")
main_task.set("DisableUpdateOutputs", "false")
main_task.set("InhibitTask", "false")
main_task.set("Class", "Standard")
sched = ET.SubElement(main_task, "ScheduledPrograms")
sched_prog = ET.SubElement(sched, "ScheduledProgram")
sched_prog.set("Name", "MainProgram")
def _configure_additional_elements(self):
controller = self.root.find(".//Controller[@Use='Target']")
if controller is None:
return
# Ensure AddOnInstructionDefinitions, Tags, DataTypes exist
sections = ["AddOnInstructionDefinitions", "Tags", "DataTypes"]
for name in sections:
if controller.find(name) is None:
modules = controller.find("Modules")
idx = len(list(controller)) # default append
if modules is not None:
idx = list(controller).index(modules)
elem = ET.Element(name)
if name == "DataTypes":
controller.insert(idx, elem)
else:
controller.append(elem)
# Other empty placeholders
if controller.find("CST") is None:
cst = ET.SubElement(controller, "CST")
cst.set("MasterID", "0")
if controller.find("WallClockTime") is None:
wct = ET.SubElement(controller, "WallClockTime")
wct.set("LocalTimeAdjustment", "0")
wct.set("TimeZone", "0")
if controller.find("Trends") is None:
ET.SubElement(controller, "Trends")
if controller.find("DataLogs") is None:
ET.SubElement(controller, "DataLogs")
if controller.find("TimeSynchronize") is None:
ts = ET.SubElement(controller, "TimeSynchronize")
ts.set("Priority1", "128")
ts.set("Priority2", "128")
ts.set("PTPEnable", "false")
if controller.find("EthernetPorts") is None:
ports = ET.SubElement(controller, "EthernetPorts")
port = ET.SubElement(ports, "EthernetPort")
port.set("Port", "1")
port.set("Label", "1")
port.set("PortEnabled", "true")
# ------------------------------------------------------------------
# Saving helpers
# ------------------------------------------------------------------
def _save_project(self, tree: ET.ElementTree, filename: str):
self._indent(tree.getroot())
xml_str = ET.tostring(tree.getroot(), encoding="unicode")
full_xml = "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?>\n" + xml_str
# Re-wrap L5K and DATA blocks with CDATA (ElementTree loses them)
l5k_pattern = r'(<Data Format="L5K">)\s*(\[.*?\]|\(.*?\))\s*(</Data>)'
data_pattern = r'(<DataValueMember Name="DATA"[^>]*>)([^<]*)(</DataValueMember>)'
# Add pattern for RLL Text content
text_pattern = r'(<Text>)(.*?)(</Text>)'
def _to_cdata(match):
start, content, end = match.group(1), match.group(2), match.group(3)
return f"{start}<![CDATA[{content}]]>{end}"
full_xml = re.sub(l5k_pattern, _to_cdata, full_xml, flags=re.DOTALL)
full_xml = re.sub(data_pattern, _to_cdata, full_xml, flags=re.DOTALL)
# Apply CDATA wrapping to Text elements
full_xml = re.sub(text_pattern, lambda m: f"{m.group(1)}\n<![CDATA[{m.group(2)}]]>\n{m.group(3)}" if m.group(2).strip() else m.group(0), full_xml, flags=re.DOTALL)
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, "w", encoding="utf-8") as fh:
fh.write(full_xml)
def _indent(self, elem: ET.Element, level: int = 0):
i = "\n" + level * " "
if len(elem):
if not elem.text or not elem.text.strip():
elem.text = i + " "
if not elem.tail or not elem.tail.strip():
elem.tail = i
for child in elem:
self._indent(child, level + 1)
if not child.tail or not child.tail.strip():
child.tail = i
else:
if level and (not elem.tail or not elem.tail.strip()):
elem.tail = i