359 lines
15 KiB
Python
359 lines
15 KiB
Python
"""Base Generator Classes using Template Method Pattern.
|
|
|
|
Provides structured, extensible base classes for PLC program generation
|
|
using XML Builder and Plugin systems.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import xml.etree.ElementTree as ET
|
|
from abc import ABC, abstractmethod
|
|
from pathlib import Path
|
|
from typing import Optional, List, Dict, Any
|
|
|
|
from .config import GeneratorConfig
|
|
from .data_loader import DataLoader
|
|
from .xml_builder import L5XBuilder, L5XBuilderFactory
|
|
from .plugin_system import RoutineManager, RoutineContext
|
|
from .logging_config import get_logger
|
|
|
|
class BaseGenerator(ABC):
|
|
"""Base class implementing the Template Method pattern for PLC generation."""
|
|
|
|
def __init__(self, config: GeneratorConfig, data_loader: DataLoader,
|
|
xml_builder_factory: L5XBuilderFactory):
|
|
self.config = config
|
|
self.data_loader = data_loader
|
|
self.xml_builder_factory = xml_builder_factory
|
|
self.logger = get_logger(self.__class__.__name__)
|
|
|
|
# Will be set during generation
|
|
self.builder: Optional[L5XBuilder] = None
|
|
self.routine_manager: Optional[RoutineManager] = None
|
|
# Metrics for summary logging
|
|
self.metrics: Dict[str, Any] = {}
|
|
|
|
def generate(self) -> ET.Element:
|
|
"""Template method for generating PLC programs.
|
|
|
|
This defines the algorithm structure while allowing subclasses
|
|
to customize specific steps.
|
|
"""
|
|
self.logger.info(f"Starting {self.__class__.__name__} generation")
|
|
|
|
try:
|
|
# Step 1: Create XML structure
|
|
self.builder = self.create_xml_structure()
|
|
|
|
# Step 2: Add controller-level elements
|
|
self.add_controller_elements()
|
|
|
|
# Step 3: Create routine context and manager
|
|
self.setup_routine_manager()
|
|
|
|
# Step 4: Generate routines
|
|
self.generate_routines()
|
|
|
|
# Step 5: Add program-level elements
|
|
self.add_program_elements()
|
|
|
|
# Step 6: Finalize
|
|
self.finalize_generation()
|
|
|
|
xml_tree = self.builder.build()
|
|
self.logger.info(f"Successfully completed {self.__class__.__name__} generation")
|
|
return xml_tree
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to generate {self.__class__.__name__}: {e}")
|
|
raise
|
|
|
|
@abstractmethod
|
|
def create_xml_structure(self) -> L5XBuilder:
|
|
"""Create the basic XML structure for this program type."""
|
|
pass
|
|
|
|
def add_controller_elements(self) -> None:
|
|
"""Add controller-level elements. Override if needed."""
|
|
pass
|
|
|
|
def setup_routine_manager(self) -> None:
|
|
"""Set up the routine manager with proper context."""
|
|
if not self.builder:
|
|
raise ValueError("XML builder must be created first")
|
|
|
|
context = RoutineContext(
|
|
data_loader=self.data_loader,
|
|
config=self.config,
|
|
routines_element=self.builder.get_routines_section(),
|
|
program_element=self.builder.get_program_element(),
|
|
metadata=self.get_context_metadata()
|
|
)
|
|
|
|
from .plugin_system import RoutineManager, get_default_registry
|
|
self.routine_manager = RoutineManager(context, get_default_registry())
|
|
|
|
@abstractmethod
|
|
def generate_routines(self) -> None:
|
|
"""Generate the routines for this program type."""
|
|
pass
|
|
|
|
def add_program_elements(self) -> None:
|
|
"""Add program-level elements. Override if needed."""
|
|
pass
|
|
|
|
def finalize_generation(self) -> None:
|
|
"""Finalize the generation process. Override if needed."""
|
|
# Default: log a concise program summary
|
|
try:
|
|
self._log_program_summary()
|
|
except Exception:
|
|
# Summary is best-effort; do not break generation
|
|
return
|
|
|
|
def _log_program_summary(self) -> None:
|
|
if not self.builder:
|
|
return
|
|
routines_el = self.builder.get_routines_section()
|
|
program_el = self.builder.get_program_element()
|
|
# Collect routines
|
|
routines = [r.get('Name', '') for r in routines_el.findall('Routine')]
|
|
routines = [r for r in routines if r]
|
|
routines.sort()
|
|
if routines:
|
|
self.logger.info(f"Routines created ({len(routines)}): {', '.join(routines)}", stage="summary")
|
|
# For each routine, log rung count and one example rung text (first non-empty)
|
|
for r in routines_el.findall('Routine'):
|
|
name = r.get('Name', '')
|
|
rll = r.find('RLLContent')
|
|
if rll is None:
|
|
continue
|
|
rungs = rll.findall('Rung')
|
|
rung_count = len(rungs)
|
|
example = ''
|
|
for rung in rungs:
|
|
txt = rung.find('Text')
|
|
if txt is not None and (txt.text or '').strip():
|
|
example = (txt.text or '').strip()
|
|
break
|
|
if example:
|
|
# Keep example on one line, clipped
|
|
snippet = example.replace('\n', ' ')
|
|
if len(snippet) > 160:
|
|
snippet = snippet[:157] + '...'
|
|
self.logger.info(f"- {name}: {rung_count} rungs | example: {snippet}", stage="summary")
|
|
else:
|
|
self.logger.info(f"- {name}: {rung_count} rungs", stage="summary")
|
|
|
|
def get_context_metadata(self) -> Dict[str, Any]:
|
|
"""Get metadata for routine context. Override to add custom metadata."""
|
|
return {
|
|
'generator_type': self.__class__.__name__,
|
|
'config': self.config,
|
|
'excel_file': str(self.data_loader.excel_path)
|
|
}
|
|
|
|
def build_xml_string(self) -> str:
|
|
"""Build and return the formatted XML string."""
|
|
if not self.builder:
|
|
raise ValueError("Generation must be completed first")
|
|
return self.builder.build_xml_string()
|
|
|
|
def write(self, output_file: str | Path) -> None:
|
|
"""Write the generated XML to a file."""
|
|
if not self.builder:
|
|
# Generate if not already done
|
|
self.generate()
|
|
self.builder.write_to_file(output_file)
|
|
self.logger.info(f"Written {self.__class__.__name__} to {output_file}")
|
|
|
|
class SafetyProgramGenerator(BaseGenerator):
|
|
"""Template Method implementation for SafetyProgram generation."""
|
|
|
|
def create_xml_structure(self) -> L5XBuilder:
|
|
"""Create SafetyProgram XML structure."""
|
|
self.logger.debug("Creating SafetyProgram XML structure")
|
|
return self.xml_builder_factory.create_safety_program_builder("SafetyProgram")
|
|
|
|
def generate_routines(self) -> None:
|
|
"""Generate safety routines."""
|
|
if not self.routine_manager:
|
|
raise ValueError("Routine manager must be set up first")
|
|
|
|
self.logger.info("Generating safety routines...")
|
|
|
|
# If a config-driven routine plan is provided, use only entries targeting SafetyProgram
|
|
if getattr(self.config, 'routine_plan', None):
|
|
results = {}
|
|
for entry in sorted(
|
|
[e for e in self.config.routine_plan if e.enabled and e.program == 'SafetyProgram'],
|
|
key=lambda e: e.order
|
|
):
|
|
# enrich context with per-routine params/filters
|
|
self.routine_manager.context.metadata['params'] = entry.params or {}
|
|
self.routine_manager.context.metadata['filters'] = self.config.filters.for_routine(entry.name)
|
|
results[entry.name] = self.routine_manager.generate_routine(entry.plugin)
|
|
self.logger.info(f"Safety routine generation results: {results}")
|
|
return
|
|
|
|
# Fallback: default fixed set
|
|
safety_routines = ['inputs', 'outputs', 'resets', 'estops', 'safety_tag_mapping']
|
|
results = {name: self.routine_manager.generate_routine(name) for name in safety_routines}
|
|
self.logger.info(f"Safety routine generation results: {results}")
|
|
|
|
def write(self, output_file: str | Path) -> None:
|
|
"""Write the generated XML to a file and create safety tag mapping file."""
|
|
# First write the safety XML file
|
|
super().write(output_file)
|
|
|
|
# Then write the safety tag mapping file in the same directory
|
|
output_path = Path(output_file)
|
|
output_dir = output_path.parent
|
|
|
|
try:
|
|
from .writers.safety_tag_mapping_writer import write_safety_tag_mapping_file
|
|
write_safety_tag_mapping_file(self.data_loader, output_dir)
|
|
except Exception as e:
|
|
self.logger.warning(f"Failed to write safety tag mapping file: {e}")
|
|
|
|
def add_program_elements(self) -> None:
|
|
"""Add safety-specific program elements."""
|
|
if not self.builder:
|
|
raise ValueError("XML builder must be created first")
|
|
|
|
# Add safety signatures
|
|
self.builder.add_safety_signatures()
|
|
|
|
# Add safety tag map if needed
|
|
self._add_safety_tag_map()
|
|
|
|
# Zones routine is now handled through routine_plan configuration
|
|
# No longer adding zones routine here to avoid duplication
|
|
|
|
# Ensure a MainRoutine exists and references generated safety routines
|
|
# The ProgramAttributes set MainRoutineName to config.routines.main_routine_name
|
|
# so we must create that routine here.
|
|
program_el = self.builder.get_program_element()
|
|
routines_el = self.builder.get_routines_section()
|
|
|
|
from .config import get_config
|
|
cfg_local = get_config()
|
|
main_routine_name = cfg_local.routines.main_routine_name
|
|
import xml.etree.ElementTree as ET
|
|
|
|
# Create MainRoutine only if it does not already exist
|
|
if not any(r.get('Name') == main_routine_name for r in routines_el.findall('Routine')):
|
|
routine = ET.SubElement(routines_el, 'Routine', Name=main_routine_name, Type='RLL')
|
|
rll_content = ET.SubElement(routine, 'RLLContent')
|
|
rung = ET.SubElement(rll_content, 'Rung', Number='0', Type='N')
|
|
text = ET.SubElement(rung, 'Text')
|
|
nm = cfg_local.routines.name_map
|
|
# Ensure ESTOPS before ZONES so MCM_EPB_DCS_CTRL exists before ZONES references .O1
|
|
calls = [
|
|
nm.get('inputs', 'R010_INPUTS'),
|
|
nm.get('outputs', 'R011_OUTPUTS'),
|
|
nm.get('resets', 'R012_RESETS'),
|
|
nm.get('estops', 'R020_ESTOPS'),
|
|
nm.get('zones', 'R030_ZONES'),
|
|
]
|
|
text.text = '[' + ' ,'.join(f'JSR({c},0)' for c in calls) + ' ];'
|
|
|
|
def _add_safety_tag_map(self) -> None:
|
|
"""Add safety tag map to the program."""
|
|
program_element = self.builder.get_program_element()
|
|
|
|
# Get safety tags from data
|
|
safety_tags = self.data_loader.safety_tags_from_pb
|
|
|
|
if safety_tags:
|
|
from .utils.safety_tag_map import create_safety_tag_map
|
|
create_safety_tag_map(program_element, safety_tags, set())
|
|
self.logger.debug(f"Added safety tag map with {len(safety_tags)} tags")
|
|
|
|
class MainProgramGenerator(BaseGenerator):
|
|
"""Template Method implementation for MainProgram generation."""
|
|
|
|
def create_xml_structure(self) -> L5XBuilder:
|
|
"""Create MainProgram XML structure."""
|
|
self.logger.debug("Creating MainProgram XML structure")
|
|
return self.xml_builder_factory.create_main_program_builder("MainProgram")
|
|
|
|
def add_controller_elements(self) -> None:
|
|
"""Add controller tags for MainProgram."""
|
|
if not self.builder:
|
|
raise ValueError("XML builder must be created first")
|
|
|
|
self.logger.debug("Adding controller tags...")
|
|
|
|
# Generate and add controller tags
|
|
from .writers.xml_tag_writer import create_limited_tag_xml_elements
|
|
tag_elements = create_limited_tag_xml_elements(
|
|
self.data_loader.excel_path,
|
|
data_loader=self.data_loader
|
|
)
|
|
|
|
self.builder.add_controller_tags(tag_elements)
|
|
self.logger.info(f"Added {len(tag_elements)} controller tags")
|
|
|
|
def generate_routines(self) -> None:
|
|
"""Generate main program routines."""
|
|
if not self.routine_manager:
|
|
raise ValueError("Routine manager must be set up first")
|
|
|
|
self.logger.info("Generating main program routines...")
|
|
|
|
# Config-driven plan if available (MainProgram only)
|
|
if getattr(self.config, 'routine_plan', None):
|
|
results = {}
|
|
# Sort entries by order, but always run main_routine last so it can JSR to generated routines
|
|
entries = sorted(
|
|
[e for e in self.config.routine_plan if e.enabled and e.program == 'MainProgram'],
|
|
key=lambda e: e.order
|
|
)
|
|
non_main = [e for e in entries if getattr(e, 'plugin', '') != 'main_routine' and getattr(e, 'name', '') != 'main_routine']
|
|
mains = [e for e in entries if getattr(e, 'plugin', '') == 'main_routine' or getattr(e, 'name', '') == 'main_routine']
|
|
|
|
def _run(entry):
|
|
self.routine_manager.context.metadata['params'] = entry.params or {}
|
|
self.routine_manager.context.metadata['filters'] = self.config.filters.for_routine(entry.name)
|
|
results[entry.name] = self.routine_manager.generate_routine(entry.plugin)
|
|
|
|
for entry in non_main:
|
|
_run(entry)
|
|
for entry in mains:
|
|
_run(entry)
|
|
|
|
self.logger.info(f"Main routine generation results: {results}")
|
|
return
|
|
|
|
# Fallback default set (ensure MainRoutine is generated last so it can JSR to routines that exist)
|
|
main_routines = ['safety_tag_map', 'rack', 'estop_check', 'main_routine']
|
|
results = {name: self.routine_manager.generate_routine(name) for name in main_routines}
|
|
self.logger.info(f"Main routine generation results: {results}")
|
|
|
|
|
|
class LegacyCompatibilityMixin:
|
|
"""Mixin to provide backward compatibility with legacy generators."""
|
|
|
|
def __init__(self, excel_path: str | Path, zones_dict: Optional[List[Dict[str, str]]] = None):
|
|
"""Legacy constructor interface."""
|
|
# This will be mixed with the new generators to maintain compatibility
|
|
from .config import get_config
|
|
from .data_loader import DataLoader
|
|
from .xml_builder import L5XBuilderFactory
|
|
|
|
config = get_config()
|
|
data_loader = DataLoader(excel_path=excel_path, zones_dict=zones_dict)
|
|
xml_builder_factory = L5XBuilderFactory(config)
|
|
|
|
# Call the parent constructor
|
|
super().__init__(config, data_loader, xml_builder_factory)
|
|
|
|
# Backward compatible generator classes
|
|
class ModernSafetyProgramGenerator(LegacyCompatibilityMixin, SafetyProgramGenerator):
|
|
"""Modern SafetyProgram generator with legacy interface."""
|
|
pass
|
|
|
|
class ModernMainProgramGenerator(LegacyCompatibilityMixin, MainProgramGenerator):
|
|
"""Modern MainProgram generator with legacy interface."""
|
|
pass |