PLC_Generation/Routines Generator/complete_workflow.py
2025-08-18 13:20:34 +04:00

632 lines
26 KiB
Python

#!/usr/bin/env python3
"""
Complete PLC Generation Workflow
Runs the entire pipeline from raw Excel to compiled ACD:
1. PLC Data Generator (raw Excel → DESC_IP_MERGED.xlsx)
2. Routines Generator (DESC_IP_MERGED.xlsx → L5X files)
3. L5X2ACD Compiler (L5X files → ACD files)
"""
from __future__ import annotations
import sys
import argparse
import subprocess
from pathlib import Path
import io
import contextlib
import xml.etree.ElementTree as ET
import re
from datetime import datetime
def get_project_paths():
"""Get standardized paths for all project components."""
project_root = Path(__file__).parent.parent.resolve()
return {
'project_root': project_root,
'data_generator': project_root / "PLC Data Generator",
'routines_generator': project_root / "Routines Generator",
'l5x2acd_compiler': project_root / "L5X2ACD Compiler",
'io_tree_generator': project_root / "IO Tree Configuration Generator"
}
def resolve_project_config_files(project_name: str, project_root: Path) -> tuple[Path, Path]:
"""Resolve configuration file paths based on project name.
Args:
project_name: Project name (e.g., 'MTN6_MCM02', 'SAT9_MCM01', 'CNO8_MCM01')
project_root: Root directory of the project
Returns:
tuple: (generator_config_path, zones_config_path)
Raises:
FileNotFoundError: If required config files don't exist
"""
# Extract project prefix (MTN6, SAT9, CNO8, etc.)
import re
project_match = re.match(r'^([A-Z0-9]+)_', project_name.upper())
if not project_match:
raise ValueError(f"Invalid project name format: {project_name}. Expected format: PREFIX_MCMXX")
project_prefix = project_match.group(1)
# Build config file paths
generator_config = project_root / f"{project_prefix}_generator_config.json"
zones_config = project_root / f"{project_prefix}_zones.json"
# Validate files exist
if not generator_config.exists():
raise FileNotFoundError(f"Generator config not found: {generator_config}")
if not zones_config.exists():
raise FileNotFoundError(f"Zones config not found: {zones_config}")
return generator_config, zones_config
def get_available_projects(project_root: Path) -> list[str]:
"""Get list of available project prefixes based on config files.
Args:
project_root: Root directory of the project
Returns:
List of available project prefixes (e.g., ['MTN6', 'SAT9', 'CNO8'])
"""
import re
projects = set()
# Look for *_generator_config.json files
for config_file in project_root.glob("*_generator_config.json"):
match = re.match(r'^([A-Z0-9]+)_generator_config\.json$', config_file.name)
if match:
prefix = match.group(1)
# Check if corresponding zones file exists
zones_file = project_root / f"{prefix}_zones.json"
if zones_file.exists():
projects.add(prefix)
return sorted(list(projects))
def resolve_boilerplate_directory(project_prefix: str, io_tree_dir: Path) -> str:
"""Resolve the boilerplate directory based on project prefix.
Args:
project_prefix: Project prefix (e.g., 'MTN6', 'SAT9', 'CNO8')
io_tree_dir: IO Tree Configuration Generator directory
Returns:
Boilerplate directory name (e.g., 'MTN6_boilerplate')
"""
boilerplate_dir = f"{project_prefix}_boilerplate"
boilerplate_path = io_tree_dir / boilerplate_dir
if boilerplate_path.exists():
return boilerplate_dir
else:
# Fall back to default boilerplate directory
print(f"Warning: Project-specific boilerplate directory not found: {boilerplate_dir}")
print(" Using default 'boilerplate' directory")
return "boilerplate"
def _append_log(log_path: Path | None, header: str, body: str) -> None:
if not log_path:
return
try:
log_path.parent.mkdir(parents=True, exist_ok=True)
with open(log_path, 'a', encoding='utf-8') as f:
f.write(f"\n=== {header} ===\n")
if body:
f.write(body)
if not body.endswith("\n"):
f.write("\n")
except Exception:
pass
def run_plc_data_generator(raw_excel_file: Path, paths: dict, verbose: bool = False, log_file: Path | None = None) -> bool:
"""Run the PLC Data Generator to create DESC_IP_MERGED.xlsx."""
data_gen_dir = paths['data_generator']
data_gen_script = data_gen_dir / "main.py"
if not data_gen_script.exists():
print(f"ERROR: PLC Data Generator not found at {data_gen_script}")
return False
if not raw_excel_file.exists():
print(f"ERROR: Raw Excel file not found at {raw_excel_file}")
return False
try:
# Run the PLC Data Generator with the Excel file path as argument
result = subprocess.run([
sys.executable,
str(data_gen_script),
str(raw_excel_file.resolve()) # Pass the Excel file path as argument
], cwd=data_gen_dir, capture_output=True, text=True)
# Check if core processing succeeded by looking for output files
# Even if there's a permission error at the end, the processing might have completed
source = data_gen_dir / "DESC_IP_MERGED.xlsx"
success_indicators = [
"Processing complete!" in result.stdout,
"New Excel file created:" in result.stdout,
source.exists()
]
# Consider it successful if the essential files were created, even with permission errors
if result.returncode == 0 or (any(success_indicators) and "[Errno 1] Operation not permitted" in result.stdout):
if verbose and result.returncode != 0:
_append_log(log_file, "Step 1 Warning", "Permission error at end of processing, core processing completed")
if verbose:
_append_log(log_file, "Step 1: PLC Data Generator stdout", result.stdout)
_append_log(log_file, "Step 1: PLC Data Generator stderr", result.stderr)
# Copy DESC_IP_MERGED.xlsx from data generator output (it already has safety sheets)
dest = paths['routines_generator'] / "DESC_IP_MERGED.xlsx"
if source.exists():
import shutil
shutil.copy2(source, dest)
return True
else:
return False
else:
if verbose:
_append_log(log_file, "Step 1 Error", "Data processing failed")
_append_log(log_file, "Step 1: PLC Data Generator stdout", result.stdout)
_append_log(log_file, "Step 1: PLC Data Generator stderr", result.stderr)
return False
except Exception as e:
if verbose:
_append_log(log_file, "Step 1 Exception", str(e))
return False
def run_routines_generator(paths: dict, project_name: str = None, ignore_estop1ok: bool = False, safety_only: bool = False, verbose: bool = False, log_file: Path | None = None, config_path: Path = None) -> bool:
"""Run the Routines Generator.
When safety_only is True, runs safety-only generation (inputs, outputs, resets,
estops, zones, estop_check, safety tag map). Otherwise runs the standard
generator with DPM and other routines.
Args:
config_path: Path to project-specific generator config file. If None, uses default.
"""
routines_dir = paths['routines_generator']
try:
# Use provided config path or fall back to default
if config_path is None:
config_path = paths['project_root'] / 'generator_config.json'
excel_path = routines_dir / 'DESC_IP_MERGED.xlsx'
subcmd = 'safety' if safety_only else 'all'
# Build args with global flags BEFORE the subcommand
cmd_args = [
sys.executable,
'-m', 'src.unified_cli',
'--config', str(config_path),
'--excel-file', str(excel_path),
]
if verbose:
cmd_args.extend(['--log-level', 'DEBUG'])
if log_file is not None:
cmd_args.extend(['--log-file', str(log_file)])
cmd_args.append(subcmd)
# Note: routine inclusion/exclusion is driven by config; project_name and ignore-estop1ok are configured in JSON
# Run the unified CLI
result = subprocess.run(cmd_args, cwd=routines_dir, capture_output=True, text=True)
if verbose:
_append_log(log_file, "Step 2: Routines Generator stdout", result.stdout)
if result.stderr:
_append_log(log_file, "Step 2: Routines Generator stderr", result.stderr)
if result.returncode == 0:
return True
else:
return False
except Exception as e:
if verbose:
_append_log(log_file, "Step 2 Exception", str(e))
return False
def run_io_tree_generator(paths: dict, project_name: str, safety_only: bool = False, verbose: bool = False, log_file: Path | None = None, boilerplate_dir: str = None) -> bool:
"""Run the IO Tree Configuration Generator.
If safety_only is True, skip this step to avoid generating non-safety routines.
"""
if safety_only:
return True
io_tree_dir = paths['io_tree_generator']
enhanced_mcm_script = io_tree_dir / "enhanced_mcm_generator.py"
# Use the file directly from PLC Data Generator since we're skipping Routines Generator
desc_ip_file = paths['data_generator'] / "DESC_IP_MERGED.xlsx"
if not enhanced_mcm_script.exists():
return False
# Zones fully removed: do not attempt to load or pass zones
zones_json = None
try:
# Build command arguments
cmd_args = [
sys.executable,
str(enhanced_mcm_script),
str(desc_ip_file),
project_name
]
# Add boilerplate directory if specified
if boilerplate_dir:
cmd_args.append(boilerplate_dir)
# Run the IO Tree Configuration Generator
result = subprocess.run(cmd_args, cwd=io_tree_dir, capture_output=True, text=True)
if verbose:
_append_log(log_file, "Step 3: IO Tree Generator stdout", result.stdout)
if result.stderr:
_append_log(log_file, "Step 3: IO Tree Generator stderr", result.stderr)
if result.returncode == 0:
return True
else:
return False
except Exception as e:
if verbose:
_append_log(log_file, "Step 3 Exception", str(e))
return False
def run_l5x_to_acd_compiler(paths: dict, project_name: str, safety_only: bool = False, verbose: bool = False, log_file: Path | None = None) -> bool:
"""Prepare for L5X2ACD Compilation using dynamic compilation manager.
If safety_only is True, skip this step since a full project L5X wasn't generated.
"""
if safety_only:
return True
# Find the generated complete project L5X file
io_tree_dir = paths['io_tree_generator']
generated_projects_dir = io_tree_dir / "generated_projects"
if not generated_projects_dir.exists():
return False
# Look for L5X files that start with the project name
l5x_files = list(generated_projects_dir.glob(f"{project_name}*.L5X"))
if not l5x_files:
# Minimal output: rely on caller to report FAIL
return False
if len(l5x_files) > 1 and verbose:
print(f"Warning: Multiple L5X files found, using first: {l5x_files[0].name}")
complete_l5x = l5x_files[0]
# Inject SafetyTagMap from SafetyTagMapping.txt before compilation (if available)
try:
mapping_file = paths['routines_generator'] / 'SafetyTagMapping.txt'
if mapping_file.exists():
# Always inject silently (minimal logging)
_inject_safety_tag_map_into_l5x(complete_l5x, mapping_file, False)
except Exception as e:
if verbose:
_append_log(log_file, "Step 4 Warning", f"Failed to inject SafetyTagMap: {e}")
# Use the dynamic compilation manager
l5x2acd_dir = paths['project_root'] / "L5X2ACD Compiler"
try:
# Import and use the compilation manager
import sys
sys.path.append(str(l5x2acd_dir))
from compilation_manager import CompilationManager
# Create compilation manager
manager = CompilationManager(l5x2acd_dir)
# Determine project-specific options (silent)
project_type = "UNKNOWN"
options = {}
if project_name:
if "MCM01" in project_name.upper():
project_type = "MCM01"
options['enable_safety_validation'] = True
elif "MCM04" in project_name.upper():
project_type = "MCM04"
options['enable_feeder_optimization'] = True
# Setup compilation with wipe and dynamic generation
# Always run quietly and suppress tool output; caller prints step OK/FAIL
_buf = io.StringIO()
with contextlib.redirect_stdout(_buf), contextlib.redirect_stderr(_buf):
result = manager.setup_compilation(
source_l5x=complete_l5x,
project_name=project_name or complete_l5x.stem,
compilation_options=options,
wipe_existing=True
)
if verbose:
_append_log(log_file, "Step 4: L5X2ACD Compiler output", _buf.getvalue())
return True
except Exception as e:
# Minimal output; let caller handle FAIL display
return False
def _inject_safety_tag_map_into_l5x(l5x_path: Path, mapping_file: Path, verbose: bool = False) -> None:
"""Inject or replace <SafetyTagMap> inside the existing Controller/SafetyInfo using text edits.
- Preserves the original XML header exactly
- Does not create additional SafetyInfo blocks
- Formats SafetyTagMap on its own line between SafetyInfo open/close tags
"""
mapping_text = mapping_file.read_text(encoding='utf-8').strip()
if not mapping_text:
if verbose:
print("SafetyTagMapping.txt is empty; skipping injection")
return
xml_text = l5x_path.read_text(encoding='utf-8')
# Find Controller block
ctrl_match = re.search(r"<Controller\b[\s\S]*?</Controller>", xml_text)
if not ctrl_match:
if verbose:
print("No <Controller> found; skipping injection")
return
ctrl_start, ctrl_end = ctrl_match.span()
ctrl_text = xml_text[ctrl_start:ctrl_end]
# Locate first SafetyInfo (body or self-closing)
m_body = re.search(r"<SafetyInfo\b([^>]*)>([\s\S]*?)</SafetyInfo>", ctrl_text)
m_self = re.search(r"<SafetyInfo\b([^>]*)/>", ctrl_text)
if not m_body and not m_self:
if verbose:
print("No <SafetyInfo> under <Controller>; skipping injection")
return
# Determine indentation based on the SafetyInfo line
first_match = m_body if (m_body and (not m_self or m_body.start() < m_self.start())) else m_self
safety_start = first_match.start()
line_start = ctrl_text.rfind('\n', 0, safety_start)
indent = ctrl_text[line_start+1:safety_start] if line_start != -1 else ''
map_line = f"\n{indent} <SafetyTagMap> {mapping_text} </SafetyTagMap>\n"
def dedup_safety_infos(text: str) -> str:
seen = False
def repl(match: re.Match) -> str:
nonlocal seen
if seen:
return ''
seen = True
return match.group(0)
pat = re.compile(r"(<SafetyInfo\b[^>]*/>)|(\n?\s*<SafetyInfo\b[^>]*>[\s\S]*?</SafetyInfo>)")
return pat.sub(repl, text)
if m_body and (not m_self or m_body.start() < m_self.start()):
# Replace or insert SafetyTagMap inside existing body
attrs = m_body.group(1)
inner = m_body.group(2)
# Replace existing map if present
if re.search(r"<SafetyTagMap>[\s\S]*?</SafetyTagMap>", inner):
new_inner = re.sub(r"<SafetyTagMap>[\s\S]*?</SafetyTagMap>", map_line.strip('\n'), inner, count=1)
# Remove any additional maps
new_inner = re.sub(r"<SafetyTagMap>[\s\S]*?</SafetyTagMap>", '', new_inner)
else:
new_inner = map_line + inner
new_block = f"<SafetyInfo{attrs}>{new_inner}</SafetyInfo>"
new_ctrl_text = ctrl_text[:m_body.start()] + new_block + ctrl_text[m_body.end():]
else:
# Convert self-closing to body with map
attrs = m_self.group(1)
new_block = f"<SafetyInfo{attrs}>{map_line}</SafetyInfo>"
new_ctrl_text = ctrl_text[:m_self.start()] + new_block + ctrl_text[m_self.end():]
new_ctrl_text = dedup_safety_infos(new_ctrl_text)
new_xml = xml_text[:ctrl_start] + new_ctrl_text + xml_text[ctrl_end:]
l5x_path.write_text(new_xml, encoding='utf-8')
if verbose:
print("SafetyTagMap injection OK")
def main() -> None:
"""Main entry point for complete workflow."""
parser = argparse.ArgumentParser(description="Complete PLC generation workflow from raw Excel to ACD")
parser.add_argument('--excel-file', type=Path, help='Raw Excel file to process')
# Project selection
parser.add_argument('--project', help='Project prefix (e.g., MTN6, SAT9, CNO8) - automatically selects config files')
parser.add_argument('--project-name', help='Project name (e.g., MTN6_MCM02) - used for output naming and compatibility')
parser.add_argument('--ignore-estop1ok', action='store_true', help='Ignore ESTOP1OK tags in safety routines generation')
parser.add_argument('--safety-only', action='store_true', help='Generate only safety routines and safety checks')
parser.add_argument('--verbose', action='store_true', help='Write detailed logs for each step to a file (no console spam)')
parser.add_argument('--list-projects', action='store_true', help='List available projects and exit')
args = parser.parse_args()
# Get project paths
paths = get_project_paths()
# Handle --list-projects (allow without --excel-file)
if args.list_projects:
available_projects = get_available_projects(paths['project_root'])
print("Available projects:")
for project in available_projects:
generator_config = paths['project_root'] / f"{project}_generator_config.json"
zones_config = paths['project_root'] / f"{project}_zones.json"
boilerplate_dir = paths['io_tree_generator'] / f"{project}_boilerplate"
boilerplate_status = "" if boilerplate_dir.exists() else ""
print(f" {project:<6} - Config: {generator_config.name}, Zones: {zones_config.name}, Boilerplate: {boilerplate_status}")
if not available_projects:
print(" No projects found. Expected files: PREFIX_generator_config.json and PREFIX_zones.json")
return
# Resolve project configuration
generator_config_path = None
zones_config_path = None
project_name = args.project_name # Use provided project name if given
if args.project:
# Project selection mode - use project prefix to find config files
try:
# If no project name provided, derive it from Excel file name
if not project_name:
excel_name = args.excel_file.stem
if 'MCM' in excel_name.upper():
# Try to extract MCM info from filename
import re
mcm_match = re.search(r'(MCM\d+)', excel_name.upper())
if mcm_match:
project_name = f"{args.project.upper()}_{mcm_match.group(1)}"
else:
project_name = f"{args.project.upper()}_MCM01" # Default fallback
else:
project_name = f"{args.project.upper()}_MCM01" # Default fallback
# Use project prefix to find config files, but use provided or derived project name
generator_config_path, zones_config_path = resolve_project_config_files(f"{args.project.upper()}_MCM01", paths['project_root'])
print(f"Using project: {args.project.upper()}")
print(f" Generator config: {generator_config_path.name}")
print(f" Zones config: {zones_config_path.name}")
print(f" Project name: {project_name}")
except (ValueError, FileNotFoundError) as e:
print(f"Error: {e}")
available_projects = get_available_projects(paths['project_root'])
if available_projects:
print(f"Available projects: {', '.join(available_projects)}")
else:
print("No projects found. Run with --list-projects to see details.")
sys.exit(1)
elif args.project_name:
# Backward compatibility mode using --project-name only
project_name = args.project_name
# Try to auto-detect config files based on project name
try:
generator_config_path, zones_config_path = resolve_project_config_files(project_name, paths['project_root'])
print(f"Auto-detected config files for {project_name}:")
print(f" Generator config: {generator_config_path.name}")
print(f" Zones config: {zones_config_path.name}")
except (ValueError, FileNotFoundError):
# Fall back to default config files
print(f"Using default config files (project-specific configs not found for {project_name})")
generator_config_path = None # Will use default in run_routines_generator
else:
# No project specified - require at least one
print("Error: Either --project or --project-name must be specified")
available_projects = get_available_projects(paths['project_root'])
if available_projects:
print(f"Available projects: {', '.join(available_projects)}")
sys.exit(1)
# Validate excel-file is provided for actual processing (not just listing)
if not args.excel_file:
print("Error: --excel-file is required for processing")
parser.print_help()
sys.exit(1)
# Setup enhanced logging
from src.logging_config import setup_logging, get_logger
ts = datetime.now().strftime('%Y%m%d_%H%M%S')
log_dir = paths['project_root'] / 'logs'
base = project_name or 'project'
log_file = log_dir / f"workflow_{base}_{ts}.log" if args.verbose else None
# Configure logging with location information
setup_logging(
level='DEBUG' if args.verbose else 'INFO',
console_format='human',
log_file=log_file,
show_module=True,
show_location=True,
use_colors=True
)
logger = get_logger(__name__)
logger.info("PLC Generation Workflow started", excel_file=str(args.excel_file), project_name=args.project_name)
print("PLC Generation Workflow")
# Step 1: Process raw Excel data
print("Step 1: Data processing ...", end=" ")
logger.info("Starting data processing step")
try:
ok = run_plc_data_generator(args.excel_file, paths, verbose=args.verbose, log_file=log_file)
print("OK" if ok else "FAIL")
if not ok:
logger.error("Data processing failed")
if not args.verbose:
print("(details suppressed; re-run with --verbose)")
sys.exit(1)
logger.info("Data processing completed successfully")
except Exception as e:
logger.exception("Data processing step failed with exception", error=str(e))
print("FAIL")
sys.exit(1)
# Step 2: Generate L5X programs (Routines Generator)
print("Step 2: Routine generation ...", end=" ")
logger.info("Starting routine generation step")
try:
ok = run_routines_generator(paths, project_name, args.ignore_estop1ok, args.safety_only, verbose=args.verbose, log_file=log_file, config_path=generator_config_path)
print("OK" if ok else "FAIL")
if not ok:
logger.error("Routine generation failed")
if not args.verbose:
print("(details suppressed; re-run with --verbose)")
sys.exit(1)
logger.info("Routine generation completed successfully")
except Exception as e:
logger.exception("Routine generation step failed with exception", error=str(e))
print("FAIL")
sys.exit(1)
# Step 3: Generate complete project L5X (IO Tree Generator)
if args.safety_only:
print("Step 3: IO tree generation ... SKIPPED")
else:
print("Step 3: IO tree generation ...", end=" ")
# Determine boilerplate directory based on project
boilerplate_dir = None
if args.project:
boilerplate_dir = resolve_boilerplate_directory(args.project.upper(), paths['io_tree_generator'])
ok = run_io_tree_generator(paths, project_name, args.safety_only, verbose=args.verbose, log_file=log_file, boilerplate_dir=boilerplate_dir)
print("OK" if ok else "FAIL")
if not ok:
if not args.verbose:
print("(details suppressed; re-run with --verbose)")
sys.exit(1)
# Step 4: Compile L5X to ACD
if args.safety_only:
print("Step 4: Prepare compilation ... SKIPPED")
else:
print("Step 4: Prepare compilation ...", end=" ")
ok = run_l5x_to_acd_compiler(paths, project_name, args.safety_only, verbose=args.verbose, log_file=log_file)
print("OK" if ok else "FAIL")
if not ok:
if not args.verbose:
print("(details suppressed; re-run with --verbose)")
sys.exit(1)
print("Workflow complete")
if args.verbose and log_file is not None:
print(f"Logs: {log_file}")
if not args.safety_only and project_name:
print(f"L5X: IO Tree Configuration Generator/generated_projects/{project_name}.L5X")
if __name__ == '__main__':
main()