PLC_Generation/Routines Generator/complete_workflow.py

432 lines
16 KiB
Python

#!/usr/bin/env python3
"""
Complete PLC Generation Workflow
Runs the entire pipeline from raw Excel to compiled ACD:
1. PLC Data Generator (raw Excel → DESC_IP_MERGED.xlsx)
2. Routines Generator (DESC_IP_MERGED.xlsx → L5X files)
3. L5X2ACD Compiler (L5X files → ACD files)
"""
from __future__ import annotations
import sys
import argparse
import subprocess
from pathlib import Path
import io
import contextlib
import xml.etree.ElementTree as ET
import re
def get_project_paths():
"""Get standardized paths for all project components."""
project_root = Path(__file__).parent.parent.resolve()
return {
'project_root': project_root,
'data_generator': project_root / "PLC Data Generator",
'routines_generator': project_root / "Routines Generator",
'l5x2acd_compiler': project_root / "L5X2ACD Compiler",
'io_tree_generator': project_root / "IO Tree Configuration Generator"
}
def run_plc_data_generator(raw_excel_file: Path, paths: dict, verbose: bool = False) -> bool:
"""Run the PLC Data Generator to create DESC_IP_MERGED.xlsx."""
data_gen_dir = paths['data_generator']
data_gen_script = data_gen_dir / "main.py"
if not data_gen_script.exists():
print(f"ERROR: PLC Data Generator not found at {data_gen_script}")
return False
if not raw_excel_file.exists():
print(f"ERROR: Raw Excel file not found at {raw_excel_file}")
return False
try:
# Run the PLC Data Generator with the Excel file path as argument
result = subprocess.run([
sys.executable,
str(data_gen_script),
str(raw_excel_file.resolve()) # Pass the Excel file path as argument
], cwd=data_gen_dir, capture_output=True, text=True)
# Check if core processing succeeded by looking for output files
# Even if there's a permission error at the end, the processing might have completed
source = data_gen_dir / "DESC_IP_MERGED.xlsx"
success_indicators = [
"Processing complete!" in result.stdout,
"New Excel file created:" in result.stdout,
source.exists()
]
# Consider it successful if the essential files were created, even with permission errors
if result.returncode == 0 or (any(success_indicators) and "[Errno 1] Operation not permitted" in result.stdout):
if verbose and result.returncode != 0:
print("Warning: Permission error at end of processing, core processing completed")
if verbose:
print(result.stdout)
# Copy DESC_IP_MERGED.xlsx from data generator output (it already has safety sheets)
dest = paths['routines_generator'] / "DESC_IP_MERGED.xlsx"
if source.exists():
import shutil
shutil.copy2(source, dest)
return True
else:
return False
else:
if verbose:
print("Error: Data processing failed")
print("STDOUT:", result.stdout)
print("STDERR:", result.stderr)
return False
except Exception as e:
if verbose:
print(f"Error: Exception in data processing: {e}")
return False
def run_routines_generator(paths: dict, project_name: str = None, ignore_estop1ok: bool = False, safety_only: bool = False, verbose: bool = False) -> bool:
"""Run the Routines Generator.
When safety_only is True, runs safety-only generation (inputs, outputs, resets,
estops, zones, estop_check, safety tag map). Otherwise runs the standard
generator with DPM and other routines.
"""
routines_dir = paths['routines_generator']
try:
# Build command arguments to use unified, config-driven CLI
config_path = paths['project_root'] / 'generator_config.json'
excel_path = routines_dir / 'DESC_IP_MERGED.xlsx'
subcmd = 'safety' if safety_only else 'all'
# Build args with global flags BEFORE the subcommand
cmd_args = [
sys.executable,
'-m', 'src.unified_cli',
'--config', str(config_path),
'--excel-file', str(excel_path),
]
if verbose:
cmd_args.extend(['--log-level', 'DEBUG'])
cmd_args.append(subcmd)
# Note: routine inclusion/exclusion is driven by config; project_name and ignore-estop1ok are configured in JSON
# Run the unified CLI
result = subprocess.run(cmd_args, cwd=routines_dir, capture_output=True, text=True)
if verbose:
print(result.stdout)
if result.stderr:
print("[generator stderr]", result.stderr)
if result.returncode == 0:
return True
else:
return False
except Exception as e:
if verbose:
print(f"Error: Exception in routine generation: {e}")
return False
def run_io_tree_generator(paths: dict, project_name: str, safety_only: bool = False, verbose: bool = False) -> bool:
"""Run the IO Tree Configuration Generator.
If safety_only is True, skip this step to avoid generating non-safety routines.
"""
if safety_only:
return True
io_tree_dir = paths['io_tree_generator']
enhanced_mcm_script = io_tree_dir / "enhanced_mcm_generator.py"
# Use the file directly from PLC Data Generator since we're skipping Routines Generator
desc_ip_file = paths['data_generator'] / "DESC_IP_MERGED.xlsx"
if not enhanced_mcm_script.exists():
return False
# Zones fully removed: do not attempt to load or pass zones
zones_json = None
try:
# Build command arguments
cmd_args = [
sys.executable,
str(enhanced_mcm_script),
str(desc_ip_file),
project_name
]
# Zones removed; no additional args
# Run the IO Tree Configuration Generator
result = subprocess.run(cmd_args, cwd=io_tree_dir, capture_output=True, text=True)
if verbose:
print(result.stdout)
if result.stderr:
print("[io-tree stderr]", result.stderr)
if result.returncode == 0:
return True
else:
return False
except Exception as e:
if verbose:
print(f"Error: Exception in IO tree generation: {e}")
return False
def run_l5x_to_acd_compiler(paths: dict, project_name: str, safety_only: bool = False, verbose: bool = False) -> bool:
"""Prepare for L5X2ACD Compilation using dynamic compilation manager.
If safety_only is True, skip this step since a full project L5X wasn't generated.
"""
if safety_only:
return True
# Find the generated complete project L5X file
io_tree_dir = paths['io_tree_generator']
generated_projects_dir = io_tree_dir / "generated_projects"
if not generated_projects_dir.exists():
return False
# Look for L5X files that start with the project name
l5x_files = list(generated_projects_dir.glob(f"{project_name}*.L5X"))
if not l5x_files:
if verbose:
available_files = list(generated_projects_dir.glob("*.L5X"))
if available_files:
print(f"Available L5X files: {[f.name for f in available_files]}")
return False
if len(l5x_files) > 1 and verbose:
print(f"Warning: Multiple L5X files found, using first: {l5x_files[0].name}")
complete_l5x = l5x_files[0]
if verbose:
print(f"Found generated L5X file: {complete_l5x.name}")
# Inject SafetyTagMap from SafetyTagMapping.txt before compilation (if available)
try:
mapping_file = paths['routines_generator'] / 'SafetyTagMapping.txt'
if mapping_file.exists():
if verbose:
print("Injecting SafetyTagMap from SafetyTagMapping.txt into L5X ...")
_inject_safety_tag_map_into_l5x(complete_l5x, mapping_file, verbose)
elif verbose:
print("SafetyTagMapping.txt not found; skipping SafetyTagMap injection")
except Exception as e:
if verbose:
print(f"Warning: Failed to inject SafetyTagMap: {e}")
# Use the dynamic compilation manager
l5x2acd_dir = paths['project_root'] / "L5X2ACD Compiler"
try:
# Import and use the compilation manager
import sys
sys.path.append(str(l5x2acd_dir))
from compilation_manager import CompilationManager
# Create compilation manager
manager = CompilationManager(l5x2acd_dir)
# Determine project-specific options
project_type = "UNKNOWN"
options = {}
if project_name:
if "MCM01" in project_name.upper():
project_type = "MCM01"
options['enable_safety_validation'] = True
elif "MCM04" in project_name.upper():
project_type = "MCM04"
options['enable_feeder_optimization'] = True
if verbose:
print(f"- Project type: {project_type}")
# Setup compilation with wipe and dynamic generation
if verbose:
result = manager.setup_compilation(
source_l5x=complete_l5x,
project_name=project_name or complete_l5x.stem,
compilation_options=options,
wipe_existing=True
)
else:
_buf = io.StringIO()
with contextlib.redirect_stdout(_buf), contextlib.redirect_stderr(_buf):
result = manager.setup_compilation(
source_l5x=complete_l5x,
project_name=project_name or complete_l5x.stem,
compilation_options=options,
wipe_existing=True
)
if verbose:
print("OK: Compilation setup completed")
l5x2acd_windows_path = str(l5x2acd_dir).replace('/mnt/c/', 'C:\\').replace('/', '\\')
l5x_windows_path = str(result['l5x_file']).replace('/mnt/c/', 'C:\\').replace('/', '\\')
print("To compile on Windows:")
print(f"- cd \"{l5x2acd_windows_path}\"")
print(f"- python l5x_to_acd.py \"{l5x_windows_path}\"")
return True
except Exception as e:
if verbose:
print(f"Error: Exception in compilation setup: {e}")
import traceback
traceback.print_exc()
return False
def _inject_safety_tag_map_into_l5x(l5x_path: Path, mapping_file: Path, verbose: bool = False) -> None:
"""Inject or replace <SafetyTagMap> inside the existing Controller/SafetyInfo using text edits.
- Preserves the original XML header exactly
- Does not create additional SafetyInfo blocks
- Formats SafetyTagMap on its own line between SafetyInfo open/close tags
"""
mapping_text = mapping_file.read_text(encoding='utf-8').strip()
if not mapping_text:
if verbose:
print("SafetyTagMapping.txt is empty; skipping injection")
return
xml_text = l5x_path.read_text(encoding='utf-8')
# Find Controller block
ctrl_match = re.search(r"<Controller\b[\s\S]*?</Controller>", xml_text)
if not ctrl_match:
if verbose:
print("No <Controller> found; skipping injection")
return
ctrl_start, ctrl_end = ctrl_match.span()
ctrl_text = xml_text[ctrl_start:ctrl_end]
# Locate first SafetyInfo (body or self-closing)
m_body = re.search(r"<SafetyInfo\b([^>]*)>([\s\S]*?)</SafetyInfo>", ctrl_text)
m_self = re.search(r"<SafetyInfo\b([^>]*)/>", ctrl_text)
if not m_body and not m_self:
if verbose:
print("No <SafetyInfo> under <Controller>; skipping injection")
return
# Determine indentation based on the SafetyInfo line
first_match = m_body if (m_body and (not m_self or m_body.start() < m_self.start())) else m_self
safety_start = first_match.start()
line_start = ctrl_text.rfind('\n', 0, safety_start)
indent = ctrl_text[line_start+1:safety_start] if line_start != -1 else ''
map_line = f"\n{indent} <SafetyTagMap> {mapping_text} </SafetyTagMap>\n"
def dedup_safety_infos(text: str) -> str:
seen = False
def repl(match: re.Match) -> str:
nonlocal seen
if seen:
return ''
seen = True
return match.group(0)
pat = re.compile(r"(<SafetyInfo\b[^>]*/>)|(\n?\s*<SafetyInfo\b[^>]*>[\s\S]*?</SafetyInfo>)")
return pat.sub(repl, text)
if m_body and (not m_self or m_body.start() < m_self.start()):
# Replace or insert SafetyTagMap inside existing body
attrs = m_body.group(1)
inner = m_body.group(2)
# Replace existing map if present
if re.search(r"<SafetyTagMap>[\s\S]*?</SafetyTagMap>", inner):
new_inner = re.sub(r"<SafetyTagMap>[\s\S]*?</SafetyTagMap>", map_line.strip('\n'), inner, count=1)
# Remove any additional maps
new_inner = re.sub(r"<SafetyTagMap>[\s\S]*?</SafetyTagMap>", '', new_inner)
else:
new_inner = map_line + inner
new_block = f"<SafetyInfo{attrs}>{new_inner}</SafetyInfo>"
new_ctrl_text = ctrl_text[:m_body.start()] + new_block + ctrl_text[m_body.end():]
else:
# Convert self-closing to body with map
attrs = m_self.group(1)
new_block = f"<SafetyInfo{attrs}>{map_line}</SafetyInfo>"
new_ctrl_text = ctrl_text[:m_self.start()] + new_block + ctrl_text[m_self.end():]
new_ctrl_text = dedup_safety_infos(new_ctrl_text)
new_xml = xml_text[:ctrl_start] + new_ctrl_text + xml_text[ctrl_end:]
l5x_path.write_text(new_xml, encoding='utf-8')
if verbose:
print("SafetyTagMap injection OK")
def main() -> None:
"""Main entry point for complete workflow."""
parser = argparse.ArgumentParser(description="Complete PLC generation workflow from raw Excel to ACD")
parser.add_argument('--excel-file', type=Path, required=True, help='Raw Excel file to process')
parser.add_argument('--project-name', help='Project name (for compatibility)')
parser.add_argument('--ignore-estop1ok', action='store_true', help='Ignore ESTOP1OK tags in safety routines generation')
parser.add_argument('--safety-only', action='store_true', help='Generate only safety routines and safety checks')
parser.add_argument('--verbose', action='store_true', help='Print detailed logs for each step')
args = parser.parse_args()
# Get project paths
paths = get_project_paths()
print("PLC Generation Workflow")
# Step 1: Process raw Excel data
print("Step 1: Data processing ...", end=" ")
ok = run_plc_data_generator(args.excel_file, paths, verbose=args.verbose)
print("OK" if ok else "FAIL")
if not ok:
if not args.verbose:
print("(details suppressed; re-run with --verbose)")
sys.exit(1)
# Step 2: Generate L5X programs (Routines Generator)
print("Step 2: Routine generation ...", end=" ")
ok = run_routines_generator(paths, args.project_name, args.ignore_estop1ok, args.safety_only, verbose=args.verbose)
print("OK" if ok else "FAIL")
if not ok:
if not args.verbose:
print("(details suppressed; re-run with --verbose)")
sys.exit(1)
# Step 3: Generate complete project L5X (IO Tree Generator)
if args.safety_only:
print("Step 3: IO tree generation ... SKIPPED")
else:
print("Step 3: IO tree generation ...", end=" ")
ok = run_io_tree_generator(paths, args.project_name, args.safety_only, verbose=args.verbose)
print("OK" if ok else "FAIL")
if not ok:
if not args.verbose:
print("(details suppressed; re-run with --verbose)")
sys.exit(1)
# Step 4: Compile L5X to ACD
if args.safety_only:
print("Step 4: Prepare compilation ... SKIPPED")
else:
print("Step 4: Prepare compilation ...", end=" ")
ok = run_l5x_to_acd_compiler(paths, args.project_name, args.safety_only, verbose=args.verbose)
print("OK" if ok else "FAIL")
if not ok:
if not args.verbose:
print("(details suppressed; re-run with --verbose)")
sys.exit(1)
print("Workflow complete")
if args.verbose and not args.safety_only and args.project_name:
print(f"L5X: IO Tree Configuration Generator/generated_projects/{args.project_name}.L5X")
if __name__ == '__main__':
main()