#!/usr/bin/env python3 """ Complete PLC Generation Workflow Runs the entire pipeline from raw Excel to compiled ACD: 1. PLC Data Generator (raw Excel → DESC_IP_MERGED.xlsx) 2. Routines Generator (DESC_IP_MERGED.xlsx → L5X files) 3. L5X2ACD Compiler (L5X files → ACD files) """ from __future__ import annotations import sys import argparse import subprocess from pathlib import Path import io import contextlib import xml.etree.ElementTree as ET import re from datetime import datetime def get_project_paths(): """Get standardized paths for all project components.""" project_root = Path(__file__).parent.parent.resolve() return { 'project_root': project_root, 'data_generator': project_root / "PLC Data Generator", 'routines_generator': project_root / "Routines Generator", 'l5x2acd_compiler': project_root / "L5X2ACD Compiler", 'io_tree_generator': project_root / "IO Tree Configuration Generator" } def resolve_project_config_files(project_name: str, project_root: Path) -> tuple[Path, Path]: """Resolve configuration file paths based on project name. Args: project_name: Project name (e.g., 'MTN6_MCM02', 'SAT9_MCM01', 'CNO8_MCM01') project_root: Root directory of the project Returns: tuple: (generator_config_path, zones_config_path) Raises: FileNotFoundError: If required config files don't exist """ # Extract project prefix (MTN6, SAT9, CNO8, etc.) import re project_match = re.match(r'^([A-Z0-9]+)_', project_name.upper()) if not project_match: raise ValueError(f"Invalid project name format: {project_name}. Expected format: PREFIX_MCMXX") project_prefix = project_match.group(1) # Build config file paths generator_config = project_root / f"{project_prefix}_generator_config.json" zones_config = project_root / f"{project_prefix}_zones.json" # Validate files exist if not generator_config.exists(): raise FileNotFoundError(f"Generator config not found: {generator_config}") if not zones_config.exists(): raise FileNotFoundError(f"Zones config not found: {zones_config}") return generator_config, zones_config def get_available_projects(project_root: Path) -> list[str]: """Get list of available project prefixes based on config files. Args: project_root: Root directory of the project Returns: List of available project prefixes (e.g., ['MTN6', 'SAT9', 'CNO8']) """ import re projects = set() # Look for *_generator_config.json files for config_file in project_root.glob("*_generator_config.json"): match = re.match(r'^([A-Z0-9]+)_generator_config\.json$', config_file.name) if match: prefix = match.group(1) # Check if corresponding zones file exists zones_file = project_root / f"{prefix}_zones.json" if zones_file.exists(): projects.add(prefix) return sorted(list(projects)) def resolve_boilerplate_directory(project_prefix: str, io_tree_dir: Path) -> str: """Resolve the boilerplate directory based on project prefix. Args: project_prefix: Project prefix (e.g., 'MTN6', 'SAT9', 'CNO8') io_tree_dir: IO Tree Configuration Generator directory Returns: Boilerplate directory name (e.g., 'MTN6_boilerplate') """ boilerplate_dir = f"{project_prefix}_boilerplate" boilerplate_path = io_tree_dir / boilerplate_dir if boilerplate_path.exists(): return boilerplate_dir else: # Fall back to default boilerplate directory print(f"Warning: Project-specific boilerplate directory not found: {boilerplate_dir}") print(" Using default 'boilerplate' directory") return "boilerplate" def _append_log(log_path: Path | None, header: str, body: str) -> None: if not log_path: return try: log_path.parent.mkdir(parents=True, exist_ok=True) with open(log_path, 'a', encoding='utf-8') as f: f.write(f"\n=== {header} ===\n") if body: f.write(body) if not body.endswith("\n"): f.write("\n") except Exception: pass def run_plc_data_generator(raw_excel_file: Path, paths: dict, verbose: bool = False, log_file: Path | None = None) -> bool: """Run the PLC Data Generator to create DESC_IP_MERGED.xlsx.""" data_gen_dir = paths['data_generator'] data_gen_script = data_gen_dir / "main.py" if not data_gen_script.exists(): print(f"ERROR: PLC Data Generator not found at {data_gen_script}") return False if not raw_excel_file.exists(): print(f"ERROR: Raw Excel file not found at {raw_excel_file}") return False try: # Run the PLC Data Generator with the Excel file path as argument result = subprocess.run([ sys.executable, str(data_gen_script), str(raw_excel_file.resolve()) # Pass the Excel file path as argument ], cwd=data_gen_dir, capture_output=True, text=True) # Check if core processing succeeded by looking for output files # Even if there's a permission error at the end, the processing might have completed source = data_gen_dir / "DESC_IP_MERGED.xlsx" success_indicators = [ "Processing complete!" in result.stdout, "New Excel file created:" in result.stdout, source.exists() ] # Consider it successful if the essential files were created, even with permission errors if result.returncode == 0 or (any(success_indicators) and "[Errno 1] Operation not permitted" in result.stdout): if verbose and result.returncode != 0: _append_log(log_file, "Step 1 Warning", "Permission error at end of processing, core processing completed") if verbose: _append_log(log_file, "Step 1: PLC Data Generator stdout", result.stdout) _append_log(log_file, "Step 1: PLC Data Generator stderr", result.stderr) # Copy DESC_IP_MERGED.xlsx from data generator output (it already has safety sheets) dest = paths['routines_generator'] / "DESC_IP_MERGED.xlsx" if source.exists(): import shutil shutil.copy2(source, dest) return True else: return False else: if verbose: _append_log(log_file, "Step 1 Error", "Data processing failed") _append_log(log_file, "Step 1: PLC Data Generator stdout", result.stdout) _append_log(log_file, "Step 1: PLC Data Generator stderr", result.stderr) return False except Exception as e: if verbose: _append_log(log_file, "Step 1 Exception", str(e)) return False def run_routines_generator(paths: dict, project_name: str = None, ignore_estop1ok: bool = False, safety_only: bool = False, verbose: bool = False, log_file: Path | None = None, config_path: Path = None) -> bool: """Run the Routines Generator. When safety_only is True, runs safety-only generation (inputs, outputs, resets, estops, zones, estop_check, safety tag map). Otherwise runs the standard generator with DPM and other routines. Args: config_path: Path to project-specific generator config file. If None, uses default. """ routines_dir = paths['routines_generator'] try: # Use provided config path or fall back to default if config_path is None: config_path = paths['project_root'] / 'generator_config.json' excel_path = routines_dir / 'DESC_IP_MERGED.xlsx' subcmd = 'safety' if safety_only else 'all' # Build args with global flags BEFORE the subcommand cmd_args = [ sys.executable, '-m', 'src.unified_cli', '--config', str(config_path), '--excel-file', str(excel_path), ] if verbose: cmd_args.extend(['--log-level', 'DEBUG']) if log_file is not None: cmd_args.extend(['--log-file', str(log_file)]) cmd_args.append(subcmd) # Note: routine inclusion/exclusion is driven by config; project_name and ignore-estop1ok are configured in JSON # Run the unified CLI result = subprocess.run(cmd_args, cwd=routines_dir, capture_output=True, text=True) if verbose: _append_log(log_file, "Step 2: Routines Generator stdout", result.stdout) if result.stderr: _append_log(log_file, "Step 2: Routines Generator stderr", result.stderr) if result.returncode == 0: return True else: return False except Exception as e: if verbose: _append_log(log_file, "Step 2 Exception", str(e)) return False def run_io_tree_generator(paths: dict, project_name: str, safety_only: bool = False, verbose: bool = False, log_file: Path | None = None, boilerplate_dir: str = None) -> bool: """Run the IO Tree Configuration Generator. If safety_only is True, skip this step to avoid generating non-safety routines. """ if safety_only: return True io_tree_dir = paths['io_tree_generator'] enhanced_mcm_script = io_tree_dir / "enhanced_mcm_generator.py" # Use the file directly from PLC Data Generator since we're skipping Routines Generator desc_ip_file = paths['data_generator'] / "DESC_IP_MERGED.xlsx" if not enhanced_mcm_script.exists(): return False # Zones fully removed: do not attempt to load or pass zones zones_json = None try: # Build command arguments cmd_args = [ sys.executable, str(enhanced_mcm_script), str(desc_ip_file), project_name ] # Add boilerplate directory if specified if boilerplate_dir: cmd_args.append(boilerplate_dir) # Run the IO Tree Configuration Generator result = subprocess.run(cmd_args, cwd=io_tree_dir, capture_output=True, text=True) if verbose: _append_log(log_file, "Step 3: IO Tree Generator stdout", result.stdout) if result.stderr: _append_log(log_file, "Step 3: IO Tree Generator stderr", result.stderr) if result.returncode == 0: return True else: return False except Exception as e: if verbose: _append_log(log_file, "Step 3 Exception", str(e)) return False def run_l5x_to_acd_compiler(paths: dict, project_name: str, safety_only: bool = False, verbose: bool = False, log_file: Path | None = None) -> bool: """Prepare for L5X2ACD Compilation using dynamic compilation manager. If safety_only is True, skip this step since a full project L5X wasn't generated. """ if safety_only: return True # Find the generated complete project L5X file io_tree_dir = paths['io_tree_generator'] generated_projects_dir = io_tree_dir / "generated_projects" if not generated_projects_dir.exists(): return False # Look for L5X files that start with the project name l5x_files = list(generated_projects_dir.glob(f"{project_name}*.L5X")) if not l5x_files: # Minimal output: rely on caller to report FAIL return False if len(l5x_files) > 1 and verbose: print(f"Warning: Multiple L5X files found, using first: {l5x_files[0].name}") complete_l5x = l5x_files[0] # Inject SafetyTagMap from SafetyTagMapping.txt before compilation (if available) try: mapping_file = paths['routines_generator'] / 'SafetyTagMapping.txt' if mapping_file.exists(): # Always inject silently (minimal logging) _inject_safety_tag_map_into_l5x(complete_l5x, mapping_file, False) except Exception as e: if verbose: _append_log(log_file, "Step 4 Warning", f"Failed to inject SafetyTagMap: {e}") # Use the dynamic compilation manager l5x2acd_dir = paths['project_root'] / "L5X2ACD Compiler" try: # Import and use the compilation manager import sys sys.path.append(str(l5x2acd_dir)) from compilation_manager import CompilationManager # Create compilation manager manager = CompilationManager(l5x2acd_dir) # Determine project-specific options (silent) project_type = "UNKNOWN" options = {} if project_name: if "MCM01" in project_name.upper(): project_type = "MCM01" options['enable_safety_validation'] = True elif "MCM04" in project_name.upper(): project_type = "MCM04" options['enable_feeder_optimization'] = True # Setup compilation with smart replacement (no wiping) # Always run quietly and suppress tool output; caller prints step OK/FAIL _buf = io.StringIO() with contextlib.redirect_stdout(_buf), contextlib.redirect_stderr(_buf): result = manager.setup_compilation( source_l5x=complete_l5x, project_name=project_name or complete_l5x.stem, compilation_options=options, wipe_existing=False, # Use smart replacement instead of wiping replace_mode=True ) if verbose: _append_log(log_file, "Step 4: L5X2ACD Compiler output", _buf.getvalue()) return True except Exception as e: # Minimal output; let caller handle FAIL display return False def _inject_safety_tag_map_into_l5x(l5x_path: Path, mapping_file: Path, verbose: bool = False) -> None: """Inject or replace inside the existing Controller/SafetyInfo using text edits. - Preserves the original XML header exactly - Does not create additional SafetyInfo blocks - Formats SafetyTagMap on its own line between SafetyInfo open/close tags """ mapping_text = mapping_file.read_text(encoding='utf-8').strip() if not mapping_text: if verbose: print("SafetyTagMapping.txt is empty; skipping injection") return xml_text = l5x_path.read_text(encoding='utf-8') # Find Controller block ctrl_match = re.search(r"", xml_text) if not ctrl_match: if verbose: print("No found; skipping injection") return ctrl_start, ctrl_end = ctrl_match.span() ctrl_text = xml_text[ctrl_start:ctrl_end] # Locate first SafetyInfo (body or self-closing) m_body = re.search(r"]*)>([\s\S]*?)", ctrl_text) m_self = re.search(r"]*)/>", ctrl_text) if not m_body and not m_self: if verbose: print("No under ; skipping injection") return # Determine indentation based on the SafetyInfo line first_match = m_body if (m_body and (not m_self or m_body.start() < m_self.start())) else m_self safety_start = first_match.start() line_start = ctrl_text.rfind('\n', 0, safety_start) indent = ctrl_text[line_start+1:safety_start] if line_start != -1 else '' map_line = f"\n{indent} {mapping_text} \n" def dedup_safety_infos(text: str) -> str: seen = False def repl(match: re.Match) -> str: nonlocal seen if seen: return '' seen = True return match.group(0) pat = re.compile(r"(]*/>)|(\n?\s*]*>[\s\S]*?)") return pat.sub(repl, text) if m_body and (not m_self or m_body.start() < m_self.start()): # Replace or insert SafetyTagMap inside existing body attrs = m_body.group(1) inner = m_body.group(2) # Replace existing map if present if re.search(r"[\s\S]*?", inner): new_inner = re.sub(r"[\s\S]*?", map_line.strip('\n'), inner, count=1) # Remove any additional maps new_inner = re.sub(r"[\s\S]*?", '', new_inner) else: new_inner = map_line + inner new_block = f"{new_inner}" new_ctrl_text = ctrl_text[:m_body.start()] + new_block + ctrl_text[m_body.end():] else: # Convert self-closing to body with map attrs = m_self.group(1) new_block = f"{map_line}" new_ctrl_text = ctrl_text[:m_self.start()] + new_block + ctrl_text[m_self.end():] new_ctrl_text = dedup_safety_infos(new_ctrl_text) new_xml = xml_text[:ctrl_start] + new_ctrl_text + xml_text[ctrl_end:] l5x_path.write_text(new_xml, encoding='utf-8') if verbose: print("SafetyTagMap injection OK") def main() -> None: """Main entry point for complete workflow.""" parser = argparse.ArgumentParser(description="Complete PLC generation workflow from raw Excel to ACD") parser.add_argument('--excel-file', type=str, help='Raw Excel file to process') # Project selection parser.add_argument('--project', help='Project prefix (e.g., MTN6, SAT9, CNO8) - automatically selects config files') parser.add_argument('--project-name', help='Project name (e.g., MTN6_MCM02) - used for output naming and compatibility') parser.add_argument('--ignore-estop1ok', action='store_true', help='Ignore ESTOP1OK tags in safety routines generation') parser.add_argument('--safety-only', action='store_true', help='Generate only safety routines and safety checks') parser.add_argument('--verbose', action='store_true', help='Write detailed logs for each step to a file (no console spam)') parser.add_argument('--list-projects', action='store_true', help='List available projects and exit') args = parser.parse_args() # Get project paths paths = get_project_paths() # Handle --list-projects (allow without --excel-file) if args.list_projects: available_projects = get_available_projects(paths['project_root']) print("Available projects:") for project in available_projects: generator_config = paths['project_root'] / f"{project}_generator_config.json" zones_config = paths['project_root'] / f"{project}_zones.json" boilerplate_dir = paths['io_tree_generator'] / f"{project}_boilerplate" boilerplate_status = "✓" if boilerplate_dir.exists() else "✗" print(f" {project:<6} - Config: {generator_config.name}, Zones: {zones_config.name}, Boilerplate: {boilerplate_status}") if not available_projects: print(" No projects found. Expected files: PREFIX_generator_config.json and PREFIX_zones.json") return # Resolve project configuration generator_config_path = None zones_config_path = None project_name = args.project_name # Use provided project name if given if args.project: # Project selection mode - use project prefix to find config files try: # If no project name provided, derive it from Excel file name if not project_name: excel_name = args.excel_file.stem if 'MCM' in excel_name.upper(): # Try to extract MCM info from filename import re mcm_match = re.search(r'(MCM\d+)', excel_name.upper()) if mcm_match: project_name = f"{args.project.upper()}_{mcm_match.group(1)}" else: project_name = f"{args.project.upper()}_MCM01" # Default fallback else: project_name = f"{args.project.upper()}_MCM01" # Default fallback # Use project prefix to find config files, but use provided or derived project name generator_config_path, zones_config_path = resolve_project_config_files(f"{args.project.upper()}_MCM01", paths['project_root']) print(f"Using project: {args.project.upper()}") print(f" Generator config: {generator_config_path.name}") print(f" Zones config: {zones_config_path.name}") print(f" Project name: {project_name}") except (ValueError, FileNotFoundError) as e: print(f"Error: {e}") available_projects = get_available_projects(paths['project_root']) if available_projects: print(f"Available projects: {', '.join(available_projects)}") else: print("No projects found. Run with --list-projects to see details.") sys.exit(1) elif args.project_name: # Backward compatibility mode using --project-name only project_name = args.project_name # Try to auto-detect config files based on project name try: generator_config_path, zones_config_path = resolve_project_config_files(project_name, paths['project_root']) print(f"Auto-detected config files for {project_name}:") print(f" Generator config: {generator_config_path.name}") print(f" Zones config: {zones_config_path.name}") except (ValueError, FileNotFoundError): # Fall back to default config files print(f"Using default config files (project-specific configs not found for {project_name})") generator_config_path = None # Will use default in run_routines_generator else: # No project specified - require at least one print("Error: Either --project or --project-name must be specified") available_projects = get_available_projects(paths['project_root']) if available_projects: print(f"Available projects: {', '.join(available_projects)}") sys.exit(1) # Validate excel-file is provided for actual processing (not just listing) if not args.excel_file: print("Error: --excel-file is required for processing") parser.print_help() sys.exit(1) # Normalize the Excel file path to handle Windows-style paths in WSL excel_file_path = Path(args.excel_file.replace('\\', '/')) if not excel_file_path.is_absolute(): excel_file_path = paths['project_root'] / excel_file_path args.excel_file = excel_file_path # Setup enhanced logging from src.logging_config import setup_logging, get_logger ts = datetime.now().strftime('%Y%m%d_%H%M%S') log_dir = paths['project_root'] / 'logs' base = project_name or 'project' log_file = log_dir / f"workflow_{base}_{ts}.log" if args.verbose else None # Configure logging with location information setup_logging( level='DEBUG' if args.verbose else 'INFO', console_format='human', log_file=log_file, show_module=True, show_location=True, use_colors=True ) logger = get_logger(__name__) logger.info("PLC Generation Workflow started", excel_file=str(args.excel_file), project_name=args.project_name) print("PLC Generation Workflow") # Step 1: Process raw Excel data print("Step 1: Data processing ...", end=" ") logger.info("Starting data processing step") try: ok = run_plc_data_generator(args.excel_file, paths, verbose=args.verbose, log_file=log_file) print("OK" if ok else "FAIL") if not ok: logger.error("Data processing failed") if not args.verbose: print("(details suppressed; re-run with --verbose)") sys.exit(1) logger.info("Data processing completed successfully") except Exception as e: logger.exception("Data processing step failed with exception", error=str(e)) print("FAIL") sys.exit(1) # Step 2: Generate L5X programs (Routines Generator) print("Step 2: Routine generation ...", end=" ") logger.info("Starting routine generation step") try: ok = run_routines_generator(paths, project_name, args.ignore_estop1ok, args.safety_only, verbose=args.verbose, log_file=log_file, config_path=generator_config_path) print("OK" if ok else "FAIL") if not ok: logger.error("Routine generation failed") if not args.verbose: print("(details suppressed; re-run with --verbose)") sys.exit(1) logger.info("Routine generation completed successfully") except Exception as e: logger.exception("Routine generation step failed with exception", error=str(e)) print("FAIL") sys.exit(1) # Step 3: Generate complete project L5X (IO Tree Generator) if args.safety_only: print("Step 3: IO tree generation ... SKIPPED") else: print("Step 3: IO tree generation ...", end=" ") # Determine boilerplate directory based on project boilerplate_dir = None if args.project: boilerplate_dir = resolve_boilerplate_directory(args.project.upper(), paths['io_tree_generator']) ok = run_io_tree_generator(paths, project_name, args.safety_only, verbose=args.verbose, log_file=log_file, boilerplate_dir=boilerplate_dir) print("OK" if ok else "FAIL") if not ok: if not args.verbose: print("(details suppressed; re-run with --verbose)") sys.exit(1) # Step 4: Compile L5X to ACD if args.safety_only: print("Step 4: Prepare compilation ... SKIPPED") else: print("Step 4: Prepare compilation ...", end=" ") ok = run_l5x_to_acd_compiler(paths, project_name, args.safety_only, verbose=args.verbose, log_file=log_file) print("OK" if ok else "FAIL") if not ok: if not args.verbose: print("(details suppressed; re-run with --verbose)") sys.exit(1) print("Workflow complete") if args.verbose and log_file is not None: print(f"Logs: {log_file}") if not args.safety_only and project_name: print(f"L5X: IO Tree Configuration Generator/generated_projects/{project_name}.L5X") if __name__ == '__main__': main()