#!/usr/bin/env python3 """ Streamlined PLC Generation Workflow =================================== This script demonstrates the complete streamlined generation workflow that: 1. Uses DESC_IP data extraction for safety devices 2. Generates MainProgram and SafetyProgram with proper routines 3. Creates IO Tree configuration with modules 4. Integrates everything into a complete L5X file with proper CDATA wrapping 5. Compiles to ACD format Usage: python streamlined_generation.py --excel-file DESC_IP_MERGED.xlsx --project-name MTN6_MCM01_UL1_UL3 """ import os import sys import subprocess import xml.etree.ElementTree as ET import re from pathlib import Path import argparse import pandas as pd # Added for safety_gen.loader.rst # Force unbuffered output sys.stdout = sys.__stdout__ if hasattr(sys.stdout, 'reconfigure'): sys.stdout.reconfigure(line_buffering=True) # Import zones configuration from zones_config import ZONES_CONFIGS, DEFAULT_ZONES # Set UTF-8 encoding for Windows if sys.platform == 'win32': os.environ['PYTHONIOENCODING'] = 'utf-8' def run_plc_data_generator(excel_file: str) -> bool: """Run the PLC Data Generator to create DESC_IP_MERGED.xlsx.""" print("=" * 60) print("PHASE 0: PLC Data Generation") print("=" * 60) # Save current directory orig_dir = os.getcwd() try: # Change to PLC Data Generator directory os.chdir("PLC Data Generator") # The Excel file path should be relative to the root, so we need to go up one level if excel_file.startswith("PLC Data Generator/"): # Remove the "PLC Data Generator/" prefix since we're already in that directory excel_file_arg = excel_file[len("PLC Data Generator/"):] else: # Add ../ to go up to the root directory excel_file_arg = f"../{excel_file}" # Run the main.py script cmd = [sys.executable, "main.py", excel_file_arg] print(f"Processing Excel file: {excel_file_arg}") result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8') if result.returncode != 0: print("PLC Data Generator failed:") if result.stdout: print(result.stdout) if result.stderr: print(f"Error: {result.stderr}") return False # Success - print output if result.stdout: print(result.stdout) # Check if DESC_IP_MERGED.xlsx was created output_file = Path("DESC_IP_MERGED.xlsx") if output_file.exists(): # Copy it to the root directory for other phases import shutil dest = Path(orig_dir) / "DESC_IP_MERGED.xlsx" shutil.copy2(output_file, dest) print(f"Created DESC_IP_MERGED.xlsx in root directory") return True else: print("ERROR: DESC_IP_MERGED.xlsx was not created") return False finally: os.chdir(orig_dir) def run_routines_generator(excel_file: str, use_desc_ip: bool = True, zones_dict=None) -> dict: """Run the Routines Generator to create program files. Args: excel_file: Path to Excel file use_desc_ip: Whether to use DESC_IP extraction mode zones_dict: Dictionary-based zones data. If None, uses default. """ print("\n" + "=" * 60) print("PHASE 1: Routines Generation") print("=" * 60) # If zones_dict is not provided, use default if zones_dict is None: zones_dict = DEFAULT_ZONES print("Using default zones configuration (MCM04)") # Change to Routines Generator directory orig_dir = os.getcwd() routines_dir = Path("Routines Generator") os.chdir(routines_dir) try: # Always use dictionary-based zones instead of Excel print("Using dictionary-based zones data...") print(f"Number of zones: {len(zones_dict)}") for zone in zones_dict: print(f" - {zone['name']}: ", end="") if zone['start'] and zone['stop']: print(f"{zone['start']} to {zone['stop']}", end="") else: print("(No range)", end="") if zone['interlock']: print(f" [Interlock: {zone['interlock']}]") else: print() # Add current directory to Python path to fix import issues sys.path.insert(0, os.getcwd()) # Also add src directory to handle imports within the modules src_path = os.path.join(os.getcwd(), 'src') if src_path not in sys.path: sys.path.insert(0, src_path) # Copy DESC_IP_MERGED.xlsx from parent directory if it exists there parent_desc_ip = Path(orig_dir) / "DESC_IP_MERGED.xlsx" local_desc_ip = Path("DESC_IP_MERGED.xlsx") if parent_desc_ip.exists(): import shutil shutil.copy2(parent_desc_ip, local_desc_ip) print(f"Copied DESC_IP_MERGED.xlsx to Routines Generator directory") from src.generators.safety_program import LimitedSafetyProgramGenerator from src.generators.main_program import LimitedMainProgramGenerator # Create generators with zones_dict safety_gen = LimitedSafetyProgramGenerator("DESC_IP_MERGED.xlsx", use_desc_ip_extraction=use_desc_ip, zones_dict=zones_dict) main_gen = LimitedMainProgramGenerator("DESC_IP_MERGED.xlsx", use_desc_ip_extraction=use_desc_ip, zones_dict=zones_dict) # Generate files safety_gen.write('SafetyProgram_Limited.L5X') print("[SUCCESS] Generated SafetyProgram_Limited.L5X") main_gen.write('MainProgram_Limited.L5X') print("[SUCCESS] Generated MainProgram_Limited.L5X") sys.stdout.flush() # Generate CSV and tag mapping (these need to be updated too) # For now, we'll skip these as they would need zones_dict support as well print("Note: CSV and tag mapping generation skipped when using dictionary zones") sys.stdout.flush() # Generate safety tag mapping even with dictionary zones print("Generating safety tag mapping...") sys.stdout.flush() from src.writers.mapping_writer import create_safety_tag_mapping # Collect safety tags from RST data safety_tags = set() safety_tags.add("MCM_S_PB") # Static MCM tag # Get RST data from the data loader rst_df = safety_gen.loader.rst for _, row in rst_df.iterrows(): if pd.notna(row['DESCA']) and (any(k in row['DESCA'] for k in ('S1_PB', 'S2_PB')) or row['DESCA'].endswith('SPB')): safety_tags.add(row['DESCA']) # We don't need beacon mappings in the safety tag map beacon_sft_mappings = set() # Create the safety tag mapping file mapping_file = 'SafetyTagMapping_Limited.txt' create_safety_tag_mapping(safety_tags, set(), beacon_sft_mappings, mapping_file) print(f"[SUCCESS] Generated {mapping_file}") # Return paths to generated files return { 'safety_program': routines_dir / 'SafetyProgram_Limited.L5X', 'main_program': routines_dir / 'MainProgram_Limited.L5X', 'tags_csv': routines_dir / 'MTN6_MCM01_Controller_Tags_Limited.CSV', 'tag_mapping': routines_dir / 'SafetyTagMapping_Limited.txt' } finally: os.chdir(orig_dir) def run_io_tree_generator(excel_file: str, project_name: str) -> str: """Run the IO Tree Configuration Generator to create module configuration.""" print("\n" + "=" * 60) print("PHASE 2: IO Tree Configuration") print("=" * 60) # Change to IO Tree Configuration Generator directory orig_dir = os.getcwd() io_tree_dir = Path("IO Tree Configuration Generator") os.chdir(io_tree_dir) try: # Check if we already have a project file #existing_file = Path(f"generated_projects/{project_name}.L5X") #if existing_file.exists(): #print(f"[SUCCESS] Found existing IO tree configuration: {existing_file}") #print(" Skipping generation and using existing file") #return Path(os.getcwd()) / existing_file # Return absolute path # If not, try to generate one print("No existing IO tree found, attempting to generate...") # Use DESC_IP_MERGED.xlsx which has the DESC_IP sheet desc_ip_file = Path(orig_dir) / "DESC_IP_MERGED.xlsx" if not desc_ip_file.exists(): print(f"[ERROR] DESC_IP_MERGED.xlsx not found at {desc_ip_file}") print(" Cannot generate IO tree without DESC_IP data") return None # Run the enhanced MCM generator # enhanced_mcm_generator expects positional args: [--split] cmd = [sys.executable, "enhanced_mcm_generator.py", str(desc_ip_file), project_name] # Suppress error output to avoid confusion result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8') if result.returncode != 0: print("[ERROR] IO Tree generation failed") print(" This is expected if the required Excel file is not available") return None try: print(result.stdout) except UnicodeEncodeError: safe_output = result.stdout.encode('ascii', 'ignore').decode('ascii') print(safe_output) return Path(os.getcwd()) / f"generated_projects/{project_name}.L5X" # Return absolute path finally: os.chdir(orig_dir) def integrate_programs_into_controller(controller_file, routines_files, project_name=None): """Integrate MainProgram and SafetyProgram into the controller L5X.""" print("\n============================================================") print("PHASE 3: Program Integration") print("============================================================") # Parse controller file tree = ET.parse(controller_file) root = tree.getroot() controller_elem = root.find(".//Controller") # Load SafetyProgram safety_tree = ET.parse(routines_files['safety_program']) safety_root = safety_tree.getroot() # Find or create Programs section programs_elem = controller_elem.find("Programs") if programs_elem is None: programs_elem = ET.SubElement(controller_elem, "Programs") # Add SafetyProgram safety_program_new = safety_root.find(".//Program[@Name='SafetyProgram']") if safety_program_new is not None: # Check if SafetyProgram already exists existing_safety = programs_elem.find("Program[@Name='SafetyProgram']") if existing_safety is not None: programs_elem.remove(existing_safety) programs_elem.append(safety_program_new) print("[SUCCESS] Integrated SafetyProgram with routines") # Load MainProgram main_tree = ET.parse(routines_files['main_program']) main_root = main_tree.getroot() # Update MainProgram routines main_program_new = main_root.find(".//Program[@Name='MainProgram']") if main_program_new is not None: # Find existing MainProgram existing_main = programs_elem.find("Program[@Name='MainProgram']") if existing_main is not None: # Replace the routines section existing_routines = existing_main.find("Routines") new_routines = main_program_new.find("Routines") if existing_routines is not None and new_routines is not None: existing_main.remove(existing_routines) existing_main.append(new_routines) print("[SUCCESS] Updated MainProgram with safety routines") # Extract and merge controller tags from MainProgram L5X main_controller = main_root.find(".//Controller[@Use='Context']") if main_controller is not None: main_tags = main_controller.find("Tags") if main_tags is not None and len(main_tags) > 0: # Find or create Tags section in controller controller_tags = controller_elem.find("Tags") if controller_tags is None: # Create a new Tags element controller_tags = ET.Element("Tags") # Find the right position to insert Tags # It should come after AddOnInstructionDefinitions but before Programs insert_index = None for i, child in enumerate(controller_elem): if child.tag == "AddOnInstructionDefinitions": insert_index = i + 1 break elif child.tag == "Programs": insert_index = i break if insert_index is not None: controller_elem.insert(insert_index, controller_tags) else: # If neither found, append before the end controller_elem.append(controller_tags) print("[SUCCESS] Created Tags section in controller") else: # Tags exists - check if it's empty and should be populated if len(controller_tags) == 0: print("[SUCCESS] Found empty Tags section - will populate with tags") # Add all tags from MainProgram to controller tag_count = 0 existing_tag_names = {tag.get('Name') for tag in controller_tags.findall('Tag')} for tag in main_tags: tag_name = tag.get('Name') if tag_name not in existing_tag_names: # Clone the tag to avoid moving it from the source new_tag = ET.fromstring(ET.tostring(tag, encoding='unicode')) controller_tags.append(new_tag) tag_count += 1 if tag_count > 0: print(f"[SUCCESS] Integrated {tag_count} controller tags from MainProgram") else: print("[SUCCESS] No new tags to add (all tags already exist)") # Add SafetyTask if not present tasks_elem = controller_elem.find("Tasks") if tasks_elem is None: tasks_elem = ET.SubElement(controller_elem, "Tasks") safety_task = tasks_elem.find("Task[@Name='SafetyTask']") if safety_task is None: safety_task = ET.SubElement(tasks_elem, "Task") safety_task.set("Name", "SafetyTask") safety_task.set("Type", "PERIODIC") safety_task.set("Rate", "20") safety_task.set("Priority", "10") safety_task.set("Watchdog", "20") safety_task.set("DisableUpdateOutputs", "false") safety_task.set("InhibitTask", "false") safety_task.set("Class", "Safety") sched_programs = ET.SubElement(safety_task, "ScheduledPrograms") sched_prog = ET.SubElement(sched_programs, "ScheduledProgram") sched_prog.set("Name", "SafetyProgram") print("[SUCCESS] Added SafetyTask") # Extract safety tag mappings from SafetyTagMapping file tag_mapping_file = routines_files['tag_mapping'] if tag_mapping_file.exists(): with open(tag_mapping_file, 'r') as f: mappings = f.read().strip() # Update SafetyInfo with tag mappings safety_info = controller_elem.find("SafetyInfo") if safety_info is not None: # Check if SafetyTagMap already exists existing_map = safety_info.find("SafetyTagMap") if existing_map is not None: existing_map.text = mappings else: safety_tag_map = ET.SubElement(safety_info, "SafetyTagMap") safety_tag_map.text = mappings print(f"[SUCCESS] Added safety tag mappings: {mappings}") # Save integrated file if project_name: # Use the project name directly for the output file output_dir = os.path.dirname(controller_file) output_file = os.path.join(output_dir, f"{project_name}.L5X") else: # Fallback to appending _Complete if no project name provided output_file = controller_file.replace('.L5X', '_Complete.L5X') # Convert to string and apply CDATA formatting xml_str = ET.tostring(root, encoding='unicode') full_xml = '\n' + xml_str # Apply CDATA wrapping full_xml = apply_cdata_wrapping(full_xml) # Save the file with open(output_file, 'w', encoding='utf-8') as f: f.write(full_xml) print(f"\n[SUCCESS] Created integrated L5X file: {output_file}") return output_file def apply_cdata_wrapping(xml_str: str) -> str: """Apply CDATA wrapping to L5K data and RLL Text content.""" # Pattern for L5K data blocks - match any content between tags l5k_pattern = r'()\s*([^<]+)\s*()' # Pattern for DataValueMember DATA data_pattern = r'(]*>)([^<]*)()' # Pattern for RLL Text content text_pattern = r'()(.*?)()' def wrap_l5k(match): start, content, end = match.group(1), match.group(2), match.group(3) # Only wrap if not already wrapped and has content if content.strip() and '\n{end}" return match.group(0) def wrap_data(match): start, content, end = match.group(1), match.group(2), match.group(3) return f"{start}{end}" def wrap_text(match): start, content, end = match.group(1), match.group(2), match.group(3) if content.strip(): # Remove any existing newlines from content and trim whitespace clean_content = content.strip() return f"{start}\n\n{end}" return match.group(0) # Apply all CDATA wrappings xml_str = re.sub(l5k_pattern, wrap_l5k, xml_str, flags=re.DOTALL) xml_str = re.sub(data_pattern, wrap_data, xml_str, flags=re.DOTALL) xml_str = re.sub(text_pattern, wrap_text, xml_str, flags=re.DOTALL) # Format the XML with proper indentation xml_str = format_xml_indentation(xml_str) return xml_str def format_xml_indentation(xml_str: str) -> str: """Apply Rockwell-style XML formatting.""" # Remove whitespace between tags xml_str = re.sub(r'>\s+<', '><', xml_str) # Add newlines at specific tag boundaries replacements = [ ('>\n\n\n\n\n\n\n\n\n\n\n', '>\n'), ('', '\n'), ('<', '\n<'), ('<', '\n<'), ('<', '\n<'), ('<', '\n<'), ('<', '\n<'), ('<', '\n<'), ('>\n\n<', '\n<'), ('<', '\n<'), ('>\n', '>\n'), ('<', '\n<'), ('<', '\n<'), ] for old, new in replacements: xml_str = xml_str.replace(old, new) return xml_str def compile_to_acd(l5x_file: str) -> str: """Compile the L5X file to ACD format.""" print("\n" + "=" * 60) print("PHASE 4: L5X to ACD Compilation") print("=" * 60) # Change to L5X2ACD Compiler directory orig_dir = os.getcwd() compiler_dir = Path("L5X2ACD Compiler") os.chdir(compiler_dir) try: # Convert l5x_file to absolute path l5x_path = Path(orig_dir) / l5x_file if not l5x_path.exists(): print(f"L5X file not found: {l5x_path}") return None # Run the compiler using the virtual environment's Python # Check if we're in a virtual environment if hasattr(sys, 'real_prefix') or (hasattr(sys, 'base_prefix') and sys.base_prefix != sys.prefix): # We're in a virtual environment, use current Python executable python_exe = sys.executable else: # Not in venv, try to find venv Python venv_python = Path(orig_dir) / "venv" / "Scripts" / "python.exe" if venv_python.exists(): python_exe = str(venv_python) else: print("[WARNING] Virtual environment not found, using system Python") python_exe = sys.executable cmd = [python_exe, "l5x_to_acd.py", str(l5x_path)] print(f"Running compiler command: {' '.join(cmd)}") print(f"L5X file path: {l5x_path}") result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8') # Always print stdout for detailed compiler output if result.stdout: print("\nCompiler Output:") try: print(result.stdout) except UnicodeEncodeError: # Handle Unicode characters that can't be displayed in Windows console safe_output = result.stdout.encode('ascii', 'ignore').decode('ascii') print(safe_output) # Print stderr if there's any error output if result.stderr: print("\nCompiler Errors:") try: print(result.stderr) except UnicodeEncodeError: # Handle Unicode characters that can't be displayed in Windows console safe_output = result.stderr.encode('ascii', 'ignore').decode('ascii') print(safe_output) # Check return code and provide additional information print(f"\nCompiler exit code: {result.returncode}") if result.returncode != 0: print(f"[ERROR] Compilation failed with exit code {result.returncode}") if result.returncode == 1: print(" This indicates recoverable failures - check output above for details") elif result.returncode == 2: print(" This indicates critical non-recoverable failures") elif result.returncode == 3: print(" This indicates a fatal error in the compiler") return None # Return path to ACD file acd_file = str(l5x_file).replace('.L5X', '.ACD') return acd_file finally: os.chdir(orig_dir) def main(): """Main workflow execution.""" parser = argparse.ArgumentParser(description='Streamlined PLC Generation Workflow') parser.add_argument('--excel-file', default='DESC_IP_MERGED.xlsx', help='Input Excel file') parser.add_argument('--project-name', default='MTN6_MCM01_UL1_UL3', help='Project name') parser.add_argument('--desc-ip-mode', action='store_true', default=True, help='Use DESC_IP extraction mode') parser.add_argument('--compile-acd', action='store_true', help='Compile to ACD format') args = parser.parse_args() print("STREAMLINED PLC GENERATION WORKFLOW") print("===================================") print(f"Excel File: {args.excel_file}") print(f"Project Name: {args.project_name}") print(f"DESC_IP Mode: {args.desc_ip_mode}") print() # Select the correct zones configuration based on project name if "MCM01" in args.project_name: zones_to_use = ZONES_CONFIGS.get("MCM01", DEFAULT_ZONES) print("Selected MCM01 zones configuration.") elif "MCM04" in args.project_name: zones_to_use = ZONES_CONFIGS.get("MCM04", DEFAULT_ZONES) print("Selected MCM04 zones configuration.") elif "MCM05" in args.project_name: zones_to_use = ZONES_CONFIGS.get("MCM05", DEFAULT_ZONES) print("Selected MCM05 zones configuration.") else: zones_to_use = DEFAULT_ZONES print("Project name does not match MCM01, MCM04, or MCM05, using default zones.") # Phase 0: Run PLC Data Generator if not run_plc_data_generator(args.excel_file): print("\n[ERROR] Failed to generate DESC_IP_MERGED.xlsx") return 1 # Phase 1: Generate routines routines_files = run_routines_generator(args.excel_file, args.desc_ip_mode, zones_dict=zones_to_use) if not routines_files: print("Failed to generate routines") return 1 # Phase 2: Generate IO tree configuration controller_file = run_io_tree_generator(args.excel_file, args.project_name) if not controller_file: print("\n[ERROR] Failed to obtain IO tree configuration") print(" Cannot continue without a base controller L5X file") return 1 # Phase 3: Integrate programs complete_file = integrate_programs_into_controller(str(controller_file), routines_files, args.project_name) if not complete_file: print("Failed to integrate programs") return 1 # Phase 4: Compile to ACD (optional) if args.compile_acd: acd_file = compile_to_acd(complete_file) if acd_file: print(f"\n[SUCCESS] Successfully compiled to ACD: {acd_file}") else: print("\n[ERROR] Failed to compile to ACD") return 1 print("\n" + "=" * 60) print("WORKFLOW COMPLETED SUCCESSFULLY") print("=" * 60) print(f"Final L5X file: {complete_file}") return 0 if __name__ == "__main__": sys.exit(main())