PLC_Generation/streamlined_generation.py
2025-08-05 14:38:54 +04:00

649 lines
26 KiB
Python

#!/usr/bin/env python3
"""
Streamlined PLC Generation Workflow
===================================
This script demonstrates the complete streamlined generation workflow that:
1. Uses DESC_IP data extraction for safety devices
2. Generates MainProgram and SafetyProgram with proper routines
3. Creates IO Tree configuration with modules
4. Integrates everything into a complete L5X file with proper CDATA wrapping
5. Compiles to ACD format
Usage:
python streamlined_generation.py --excel-file DESC_IP_MERGED.xlsx --project-name MTN6_MCM01_UL1_UL3
"""
import os
import sys
import subprocess
import xml.etree.ElementTree as ET
import re
from pathlib import Path
import argparse
import pandas as pd # Added for safety_gen.loader.rst
# Force unbuffered output
sys.stdout = sys.__stdout__
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(line_buffering=True)
# Import zones configuration
from zones_config import ZONES_CONFIGS, DEFAULT_ZONES
# Set UTF-8 encoding for Windows
if sys.platform == 'win32':
os.environ['PYTHONIOENCODING'] = 'utf-8'
def run_plc_data_generator(excel_file: str) -> bool:
"""Run the PLC Data Generator to create DESC_IP_MERGED.xlsx."""
print("=" * 60)
print("PHASE 0: PLC Data Generation")
print("=" * 60)
# Save current directory
orig_dir = os.getcwd()
try:
# Change to PLC Data Generator directory
os.chdir("PLC Data Generator")
# The Excel file path should be relative to the root, so we need to go up one level
if excel_file.startswith("PLC Data Generator/"):
# Remove the "PLC Data Generator/" prefix since we're already in that directory
excel_file_arg = excel_file[len("PLC Data Generator/"):]
else:
# Add ../ to go up to the root directory
excel_file_arg = f"../{excel_file}"
# Run the main.py script
cmd = [sys.executable, "main.py", excel_file_arg]
print(f"Processing Excel file: {excel_file_arg}")
result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8')
if result.returncode != 0:
print("PLC Data Generator failed:")
if result.stdout:
print(result.stdout)
if result.stderr:
print(f"Error: {result.stderr}")
return False
# Success - print output
if result.stdout:
print(result.stdout)
# Check if DESC_IP_MERGED.xlsx was created
output_file = Path("DESC_IP_MERGED.xlsx")
if output_file.exists():
# Copy it to the root directory for other phases
import shutil
dest = Path(orig_dir) / "DESC_IP_MERGED.xlsx"
shutil.copy2(output_file, dest)
print(f"Created DESC_IP_MERGED.xlsx in root directory")
return True
else:
print("ERROR: DESC_IP_MERGED.xlsx was not created")
return False
finally:
os.chdir(orig_dir)
def run_routines_generator(excel_file: str, use_desc_ip: bool = True, zones_dict=None) -> dict:
"""Run the Routines Generator to create program files.
Args:
excel_file: Path to Excel file
use_desc_ip: Whether to use DESC_IP extraction mode
zones_dict: Dictionary-based zones data. If None, uses default.
"""
print("\n" + "=" * 60)
print("PHASE 1: Routines Generation")
print("=" * 60)
# If zones_dict is not provided, use default
if zones_dict is None:
zones_dict = DEFAULT_ZONES
print("Using default zones configuration (MCM04)")
# Change to Routines Generator directory
orig_dir = os.getcwd()
routines_dir = Path("Routines Generator")
os.chdir(routines_dir)
try:
# Always use dictionary-based zones instead of Excel
print("Using dictionary-based zones data...")
print(f"Number of zones: {len(zones_dict)}")
for zone in zones_dict:
print(f" - {zone['name']}: ", end="")
if zone['start'] and zone['stop']:
print(f"{zone['start']} to {zone['stop']}", end="")
else:
print("(No range)", end="")
if zone['interlock']:
print(f" [Interlock: {zone['interlock']}]")
else:
print()
# Add current directory to Python path to fix import issues
sys.path.insert(0, os.getcwd())
# Also add src directory to handle imports within the modules
src_path = os.path.join(os.getcwd(), 'src')
if src_path not in sys.path:
sys.path.insert(0, src_path)
# Copy DESC_IP_MERGED.xlsx from parent directory if it exists there
parent_desc_ip = Path(orig_dir) / "DESC_IP_MERGED.xlsx"
local_desc_ip = Path("DESC_IP_MERGED.xlsx")
if parent_desc_ip.exists():
import shutil
shutil.copy2(parent_desc_ip, local_desc_ip)
print(f"Copied DESC_IP_MERGED.xlsx to Routines Generator directory")
from src.generators.safety_program import LimitedSafetyProgramGenerator
from src.generators.main_program import LimitedMainProgramGenerator
# Create generators with zones_dict
safety_gen = LimitedSafetyProgramGenerator("DESC_IP_MERGED.xlsx", use_desc_ip_extraction=use_desc_ip, zones_dict=zones_dict)
main_gen = LimitedMainProgramGenerator("DESC_IP_MERGED.xlsx", use_desc_ip_extraction=use_desc_ip, zones_dict=zones_dict)
# Generate files
safety_gen.write('SafetyProgram_Limited.L5X')
print("[SUCCESS] Generated SafetyProgram_Limited.L5X")
main_gen.write('MainProgram_Limited.L5X')
print("[SUCCESS] Generated MainProgram_Limited.L5X")
sys.stdout.flush()
# Generate CSV and tag mapping (these need to be updated too)
# For now, we'll skip these as they would need zones_dict support as well
print("Note: CSV and tag mapping generation skipped when using dictionary zones")
sys.stdout.flush()
# Generate safety tag mapping even with dictionary zones
print("Generating safety tag mapping...")
sys.stdout.flush()
from src.writers.mapping_writer import create_safety_tag_mapping
# Collect safety tags from RST data
safety_tags = set()
safety_tags.add("MCM_S_PB") # Static MCM tag
# Get RST data from the data loader
rst_df = safety_gen.loader.rst
for _, row in rst_df.iterrows():
if pd.notna(row['DESCA']) and (any(k in row['DESCA'] for k in ('S1_PB', 'S2_PB')) or row['DESCA'].endswith('SPB')):
safety_tags.add(row['DESCA'])
# We don't need beacon mappings in the safety tag map
beacon_sft_mappings = set()
# Create the safety tag mapping file
mapping_file = 'SafetyTagMapping_Limited.txt'
create_safety_tag_mapping(safety_tags, set(), beacon_sft_mappings, mapping_file)
print(f"[SUCCESS] Generated {mapping_file}")
# Return paths to generated files
return {
'safety_program': routines_dir / 'SafetyProgram_Limited.L5X',
'main_program': routines_dir / 'MainProgram_Limited.L5X',
'tags_csv': routines_dir / 'MTN6_MCM01_Controller_Tags_Limited.CSV',
'tag_mapping': routines_dir / 'SafetyTagMapping_Limited.txt'
}
finally:
os.chdir(orig_dir)
def run_io_tree_generator(excel_file: str, project_name: str) -> str:
"""Run the IO Tree Configuration Generator to create module configuration."""
print("\n" + "=" * 60)
print("PHASE 2: IO Tree Configuration")
print("=" * 60)
# Change to IO Tree Configuration Generator directory
orig_dir = os.getcwd()
io_tree_dir = Path("IO Tree Configuration Generator")
os.chdir(io_tree_dir)
try:
# Check if we already have a project file
#existing_file = Path(f"generated_projects/{project_name}.L5X")
#if existing_file.exists():
#print(f"[SUCCESS] Found existing IO tree configuration: {existing_file}")
#print(" Skipping generation and using existing file")
#return Path(os.getcwd()) / existing_file # Return absolute path
# If not, try to generate one
print("No existing IO tree found, attempting to generate...")
# Use DESC_IP_MERGED.xlsx which has the DESC_IP sheet
desc_ip_file = Path(orig_dir) / "DESC_IP_MERGED.xlsx"
if not desc_ip_file.exists():
print(f"[ERROR] DESC_IP_MERGED.xlsx not found at {desc_ip_file}")
print(" Cannot generate IO tree without DESC_IP data")
return None
# Run the enhanced MCM generator
# enhanced_mcm_generator expects positional args: <excel_file> <project_name> [--split]
cmd = [sys.executable, "enhanced_mcm_generator.py", str(desc_ip_file), project_name]
# Suppress error output to avoid confusion
result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8')
if result.returncode != 0:
print("[ERROR] IO Tree generation failed")
print(" This is expected if the required Excel file is not available")
return None
try:
print(result.stdout)
except UnicodeEncodeError:
safe_output = result.stdout.encode('ascii', 'ignore').decode('ascii')
print(safe_output)
return Path(os.getcwd()) / f"generated_projects/{project_name}.L5X" # Return absolute path
finally:
os.chdir(orig_dir)
def integrate_programs_into_controller(controller_file, routines_files, project_name=None):
"""Integrate MainProgram and SafetyProgram into the controller L5X."""
print("\n============================================================")
print("PHASE 3: Program Integration")
print("============================================================")
# Parse controller file
tree = ET.parse(controller_file)
root = tree.getroot()
controller_elem = root.find(".//Controller")
# Load SafetyProgram
safety_tree = ET.parse(routines_files['safety_program'])
safety_root = safety_tree.getroot()
# Find or create Programs section
programs_elem = controller_elem.find("Programs")
if programs_elem is None:
programs_elem = ET.SubElement(controller_elem, "Programs")
# Add SafetyProgram
safety_program_new = safety_root.find(".//Program[@Name='SafetyProgram']")
if safety_program_new is not None:
# Check if SafetyProgram already exists
existing_safety = programs_elem.find("Program[@Name='SafetyProgram']")
if existing_safety is not None:
programs_elem.remove(existing_safety)
programs_elem.append(safety_program_new)
print("[SUCCESS] Integrated SafetyProgram with routines")
# Load MainProgram
main_tree = ET.parse(routines_files['main_program'])
main_root = main_tree.getroot()
# Update MainProgram routines
main_program_new = main_root.find(".//Program[@Name='MainProgram']")
if main_program_new is not None:
# Find existing MainProgram
existing_main = programs_elem.find("Program[@Name='MainProgram']")
if existing_main is not None:
# Replace the routines section
existing_routines = existing_main.find("Routines")
new_routines = main_program_new.find("Routines")
if existing_routines is not None and new_routines is not None:
existing_main.remove(existing_routines)
existing_main.append(new_routines)
print("[SUCCESS] Updated MainProgram with safety routines")
# Extract and merge controller tags from MainProgram L5X
main_controller = main_root.find(".//Controller[@Use='Context']")
if main_controller is not None:
main_tags = main_controller.find("Tags")
if main_tags is not None and len(main_tags) > 0:
# Find or create Tags section in controller
controller_tags = controller_elem.find("Tags")
if controller_tags is None:
# Create a new Tags element
controller_tags = ET.Element("Tags")
# Find the right position to insert Tags
# It should come after AddOnInstructionDefinitions but before Programs
insert_index = None
for i, child in enumerate(controller_elem):
if child.tag == "AddOnInstructionDefinitions":
insert_index = i + 1
break
elif child.tag == "Programs":
insert_index = i
break
if insert_index is not None:
controller_elem.insert(insert_index, controller_tags)
else:
# If neither found, append before the end
controller_elem.append(controller_tags)
print("[SUCCESS] Created Tags section in controller")
else:
# Tags exists - check if it's empty and should be populated
if len(controller_tags) == 0:
print("[SUCCESS] Found empty Tags section - will populate with tags")
# Add all tags from MainProgram to controller
tag_count = 0
existing_tag_names = {tag.get('Name') for tag in controller_tags.findall('Tag')}
for tag in main_tags:
tag_name = tag.get('Name')
if tag_name not in existing_tag_names:
# Clone the tag to avoid moving it from the source
new_tag = ET.fromstring(ET.tostring(tag, encoding='unicode'))
controller_tags.append(new_tag)
tag_count += 1
if tag_count > 0:
print(f"[SUCCESS] Integrated {tag_count} controller tags from MainProgram")
else:
print("[SUCCESS] No new tags to add (all tags already exist)")
# Add SafetyTask if not present
tasks_elem = controller_elem.find("Tasks")
if tasks_elem is None:
tasks_elem = ET.SubElement(controller_elem, "Tasks")
safety_task = tasks_elem.find("Task[@Name='SafetyTask']")
if safety_task is None:
safety_task = ET.SubElement(tasks_elem, "Task")
safety_task.set("Name", "SafetyTask")
safety_task.set("Type", "PERIODIC")
safety_task.set("Rate", "20")
safety_task.set("Priority", "10")
safety_task.set("Watchdog", "20")
safety_task.set("DisableUpdateOutputs", "false")
safety_task.set("InhibitTask", "false")
safety_task.set("Class", "Safety")
sched_programs = ET.SubElement(safety_task, "ScheduledPrograms")
sched_prog = ET.SubElement(sched_programs, "ScheduledProgram")
sched_prog.set("Name", "SafetyProgram")
print("[SUCCESS] Added SafetyTask")
# Extract safety tag mappings from SafetyTagMapping file
tag_mapping_file = routines_files['tag_mapping']
if tag_mapping_file.exists():
with open(tag_mapping_file, 'r') as f:
mappings = f.read().strip()
# Update SafetyInfo with tag mappings
safety_info = controller_elem.find("SafetyInfo")
if safety_info is not None:
# Check if SafetyTagMap already exists
existing_map = safety_info.find("SafetyTagMap")
if existing_map is not None:
existing_map.text = mappings
else:
safety_tag_map = ET.SubElement(safety_info, "SafetyTagMap")
safety_tag_map.text = mappings
print(f"[SUCCESS] Added safety tag mappings: {mappings}")
# Save integrated file
if project_name:
# Use the project name directly for the output file
output_dir = os.path.dirname(controller_file)
output_file = os.path.join(output_dir, f"{project_name}.L5X")
else:
# Fallback to appending _Complete if no project name provided
output_file = controller_file.replace('.L5X', '_Complete.L5X')
# Convert to string and apply CDATA formatting
xml_str = ET.tostring(root, encoding='unicode')
full_xml = '<?xml version="1.0" encoding="UTF-8" standalone="yes"?>\n' + xml_str
# Apply CDATA wrapping
full_xml = apply_cdata_wrapping(full_xml)
# Save the file
with open(output_file, 'w', encoding='utf-8') as f:
f.write(full_xml)
print(f"\n[SUCCESS] Created integrated L5X file: {output_file}")
return output_file
def apply_cdata_wrapping(xml_str: str) -> str:
"""Apply CDATA wrapping to L5K data and RLL Text content."""
# Pattern for L5K data blocks - match any content between tags
l5k_pattern = r'(<Data Format="L5K">)\s*([^<]+)\s*(</Data>)'
# Pattern for DataValueMember DATA
data_pattern = r'(<DataValueMember Name="DATA"[^>]*>)([^<]*)(</DataValueMember>)'
# Pattern for RLL Text content
text_pattern = r'(<Text>)(.*?)(</Text>)'
def wrap_l5k(match):
start, content, end = match.group(1), match.group(2), match.group(3)
# Only wrap if not already wrapped and has content
if content.strip() and '<![CDATA[' not in content:
return f"{start}\n<![CDATA[{content.strip()}]]>\n{end}"
return match.group(0)
def wrap_data(match):
start, content, end = match.group(1), match.group(2), match.group(3)
return f"{start}<![CDATA[{content}]]>{end}"
def wrap_text(match):
start, content, end = match.group(1), match.group(2), match.group(3)
if content.strip():
# Remove any existing newlines from content and trim whitespace
clean_content = content.strip()
return f"{start}\n<![CDATA[{clean_content}]]>\n{end}"
return match.group(0)
# Apply all CDATA wrappings
xml_str = re.sub(l5k_pattern, wrap_l5k, xml_str, flags=re.DOTALL)
xml_str = re.sub(data_pattern, wrap_data, xml_str, flags=re.DOTALL)
xml_str = re.sub(text_pattern, wrap_text, xml_str, flags=re.DOTALL)
# Format the XML with proper indentation
xml_str = format_xml_indentation(xml_str)
return xml_str
def format_xml_indentation(xml_str: str) -> str:
"""Apply Rockwell-style XML formatting."""
# Remove whitespace between tags
xml_str = re.sub(r'>\s+<', '><', xml_str)
# Add newlines at specific tag boundaries
replacements = [
('><Controller', '>\n<Controller'),
('><Modules', '>\n<Modules'),
('><Module', '>\n<Module'),
('><Programs', '>\n<Programs'),
('><Program', '>\n<Program'),
('><Tags', '>\n<Tags'),
('><Tag', '>\n<Tag'),
('><Routines', '>\n<Routines'),
('><Routine', '>\n<Routine'),
('><RLLContent', '>\n<RLLContent'),
('><Rung', '>\n<Rung'),
('><Text>', '>\n<Text>'),
('</Text></Rung>', '</Text>\n</Rung>'),
('</Rung><', '</Rung>\n<'),
('</RLLContent><', '</RLLContent>\n<'),
('</Routine><', '</Routine>\n<'),
('</Routines><', '</Routines>\n<'),
('</Program><', '</Program>\n<'),
('</Programs><', '</Programs>\n<'),
('><Tasks', '>\n<Tasks'),
('><Task', '>\n<Task'),
('</Task><', '</Task>\n<'),
('</Tasks><', '</Tasks>\n<'),
('><SafetyInfo', '>\n<SafetyInfo'),
('><SafetyTagMap>', '>\n<SafetyTagMap>'),
('</SafetyTagMap><', '</SafetyTagMap>\n<'),
('</Controller><', '</Controller>\n<'),
]
for old, new in replacements:
xml_str = xml_str.replace(old, new)
return xml_str
def compile_to_acd(l5x_file: str) -> str:
"""Compile the L5X file to ACD format."""
print("\n" + "=" * 60)
print("PHASE 4: L5X to ACD Compilation")
print("=" * 60)
# Change to L5X2ACD Compiler directory
orig_dir = os.getcwd()
compiler_dir = Path("L5X2ACD Compiler")
os.chdir(compiler_dir)
try:
# Convert l5x_file to absolute path
l5x_path = Path(orig_dir) / l5x_file
if not l5x_path.exists():
print(f"L5X file not found: {l5x_path}")
return None
# Run the compiler using the virtual environment's Python
# Check if we're in a virtual environment
if hasattr(sys, 'real_prefix') or (hasattr(sys, 'base_prefix') and sys.base_prefix != sys.prefix):
# We're in a virtual environment, use current Python executable
python_exe = sys.executable
else:
# Not in venv, try to find venv Python
venv_python = Path(orig_dir) / "venv" / "Scripts" / "python.exe"
if venv_python.exists():
python_exe = str(venv_python)
else:
print("[WARNING] Virtual environment not found, using system Python")
python_exe = sys.executable
cmd = [python_exe, "l5x_to_acd.py", str(l5x_path)]
print(f"Running compiler command: {' '.join(cmd)}")
print(f"L5X file path: {l5x_path}")
result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8')
# Always print stdout for detailed compiler output
if result.stdout:
print("\nCompiler Output:")
try:
print(result.stdout)
except UnicodeEncodeError:
# Handle Unicode characters that can't be displayed in Windows console
safe_output = result.stdout.encode('ascii', 'ignore').decode('ascii')
print(safe_output)
# Print stderr if there's any error output
if result.stderr:
print("\nCompiler Errors:")
try:
print(result.stderr)
except UnicodeEncodeError:
# Handle Unicode characters that can't be displayed in Windows console
safe_output = result.stderr.encode('ascii', 'ignore').decode('ascii')
print(safe_output)
# Check return code and provide additional information
print(f"\nCompiler exit code: {result.returncode}")
if result.returncode != 0:
print(f"[ERROR] Compilation failed with exit code {result.returncode}")
if result.returncode == 1:
print(" This indicates recoverable failures - check output above for details")
elif result.returncode == 2:
print(" This indicates critical non-recoverable failures")
elif result.returncode == 3:
print(" This indicates a fatal error in the compiler")
return None
# Return path to ACD file
acd_file = str(l5x_file).replace('.L5X', '.ACD')
return acd_file
finally:
os.chdir(orig_dir)
def main():
"""Main workflow execution."""
parser = argparse.ArgumentParser(description='Streamlined PLC Generation Workflow')
parser.add_argument('--excel-file', default='DESC_IP_MERGED.xlsx', help='Input Excel file')
parser.add_argument('--project-name', default='MTN6_MCM01_UL1_UL3', help='Project name')
parser.add_argument('--desc-ip-mode', action='store_true', default=True, help='Use DESC_IP extraction mode')
parser.add_argument('--compile-acd', action='store_true', help='Compile to ACD format')
args = parser.parse_args()
print("STREAMLINED PLC GENERATION WORKFLOW")
print("===================================")
print(f"Excel File: {args.excel_file}")
print(f"Project Name: {args.project_name}")
print(f"DESC_IP Mode: {args.desc_ip_mode}")
print()
# Select the correct zones configuration based on project name
if "MCM01" in args.project_name:
zones_to_use = ZONES_CONFIGS.get("MCM01", DEFAULT_ZONES)
print("Selected MCM01 zones configuration.")
elif "MCM04" in args.project_name:
zones_to_use = ZONES_CONFIGS.get("MCM04", DEFAULT_ZONES)
print("Selected MCM04 zones configuration.")
elif "MCM05" in args.project_name:
zones_to_use = ZONES_CONFIGS.get("MCM05", DEFAULT_ZONES)
print("Selected MCM05 zones configuration.")
else:
zones_to_use = DEFAULT_ZONES
print("Project name does not match MCM01, MCM04, or MCM05, using default zones.")
# Phase 0: Run PLC Data Generator
if not run_plc_data_generator(args.excel_file):
print("\n[ERROR] Failed to generate DESC_IP_MERGED.xlsx")
return 1
# Phase 1: Generate routines
routines_files = run_routines_generator(args.excel_file, args.desc_ip_mode, zones_dict=zones_to_use)
if not routines_files:
print("Failed to generate routines")
return 1
# Phase 2: Generate IO tree configuration
controller_file = run_io_tree_generator(args.excel_file, args.project_name)
if not controller_file:
print("\n[ERROR] Failed to obtain IO tree configuration")
print(" Cannot continue without a base controller L5X file")
return 1
# Phase 3: Integrate programs
complete_file = integrate_programs_into_controller(str(controller_file), routines_files, args.project_name)
if not complete_file:
print("Failed to integrate programs")
return 1
# Phase 4: Compile to ACD (optional)
if args.compile_acd:
acd_file = compile_to_acd(complete_file)
if acd_file:
print(f"\n[SUCCESS] Successfully compiled to ACD: {acd_file}")
else:
print("\n[ERROR] Failed to compile to ACD")
return 1
print("\n" + "=" * 60)
print("WORKFLOW COMPLETED SUCCESSFULLY")
print("=" * 60)
print(f"Final L5X file: {complete_file}")
return 0
if __name__ == "__main__":
sys.exit(main())