import os import hashlib import subprocess import json import re import time import socket import traceback import sys from pathlib import Path import pandas as pd from flask import Flask, render_template, request, url_for, redirect, flash, jsonify from flask_wtf.csrf import CSRFProtect import uuid app = Flask(__name__) app.config['SECRET_KEY'] = 'your-secret-key' app.config['UPLOAD_FOLDER'] = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'shared_uploads') app.config['CLONES_FOLDER'] = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'shared_clones') app.config['MAX_CONTENT_LENGTH'] = 50 * 1024 * 1024 # 50 MB limit for uploads # Initialize CSRF protection csrf = CSRFProtect(app) # Enhanced logging setup def log_error(msg, exc_info=None): """Log error messages with stacktrace if provided""" err_msg = f"ERROR: {msg}" if exc_info: err_msg += f"\n{traceback.format_exc()}" print(err_msg, file=sys.stderr) def log_info(msg): """Log informational messages""" print(f"INFO: {msg}", file=sys.stdout) def log_debug(msg): """Log debug messages""" print(f"DEBUG: {msg}", file=sys.stdout) # Log startup information log_info(f"Flask app starting with UPLOAD_FOLDER: {app.config['UPLOAD_FOLDER']}") log_info(f"CLONES_FOLDER: {app.config['CLONES_FOLDER']}") # Ensure directories exist with appropriate permissions os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) os.makedirs(app.config['CLONES_FOLDER'], exist_ok=True) # Set permissions to allow all users to read/write try: os.chmod(app.config['UPLOAD_FOLDER'], 0o777) os.chmod(app.config['CLONES_FOLDER'], 0o777) except Exception as e: log_error(f"Could not set permissions on shared folders: {str(e)}") # Shared data storage - stores all comparisons SHARED_DATA = { 'comparisons': {}, # Dictionary of comparison_id -> comparison_data 'latest_comparison_id': None, 'last_update_time': None } def validate_repo_url(url): """ Validate a Git repository URL format. Args: url (str): Repository URL to validate Returns: bool: True if URL is valid, False otherwise str: Error message if invalid, None if valid """ # Check for empty URL if not url or not url.strip(): return False, "Repository URL is required" # Pattern for Git HTTP/HTTPS URLs (including ports) http_pattern = r'^https?:\/\/(?:[\w.-]+)(?::\d+)?\/[\w.-]+\/[\w.-]+(?:\.git)?$' # Pattern for Git SSH URLs ssh_pattern = r'^git@(?:[\w.-]+):[\w.-]+\/[\w.-]+(?:\.git)?$' if re.match(http_pattern, url) or re.match(ssh_pattern, url): return True, None else: return False, "Invalid Git repository URL format. Please use a valid HTTP/HTTPS or SSH URL." def validate_excel_file(file): """ Validate uploaded Excel file. Args: file: File object from request.files Returns: bool: True if file is valid, False otherwise str: Error message if invalid, None if valid """ # Check if file exists if not file or file.filename == '': return False, "No file selected" # Check file extension allowed_extensions = {'.xlsx', '.xls'} file_ext = os.path.splitext(file.filename)[1].lower() if file_ext not in allowed_extensions: return False, f"Invalid file type. Please upload an Excel file (.xlsx or .xls)" # Check file size (although this is also enforced by MAX_CONTENT_LENGTH) if len(file.read()) > app.config['MAX_CONTENT_LENGTH']: file.seek(0) # Reset file pointer after reading return False, f"File too large. Maximum size is {app.config['MAX_CONTENT_LENGTH'] // (1024 * 1024)} MB" file.seek(0) # Reset file pointer after reading return True, None def create_or_update_repo(repo_url): """ Clone or update a Git repository. Args: repo_url (str): URL of the Git repository Returns: str: Path to the cloned repository Raises: ValueError: If repository URL is invalid or operation fails """ # Validate repo URL is_valid, error_msg = validate_repo_url(repo_url) if not is_valid: raise ValueError(error_msg) try: # Generate a secure repo_id from the URL using SHA1 repo_id = hashlib.sha1(repo_url.encode()).hexdigest() repo_path = os.path.join(app.config['CLONES_FOLDER'], repo_id) # Set timeout for Git operations (30 seconds) timeout = 30 # Extract domain and port for connectivity check domain = "github.com" # Default fallback port = 443 # Default HTTPS port if "://" in repo_url: domain_part = repo_url.split("://")[1].split("/")[0] if ":" in domain_part: domain, port_str = domain_part.split(":", 1) port = int(port_str) else: domain = domain_part # Set default port based on protocol port = 80 if repo_url.startswith("http://") else 443 elif "@" in repo_url and ":" in repo_url: domain = repo_url.split("@")[1].split(":")[0] port = 22 # Default SSH port if os.path.exists(repo_path): # Check if it's actually a Git repository if not os.path.exists(os.path.join(repo_path, '.git')): raise ValueError(f"Directory exists but is not a Git repository: {repo_path}") # Repository already exists, perform a pull try: # Check for internet connectivity first socket.create_connection((domain, port), timeout=5) # Try to pull from main branch try: subprocess.check_call(['git', '-C', repo_path, 'pull', 'origin', 'main'], timeout=timeout) except subprocess.CalledProcessError: # Try master branch if main fails try: subprocess.check_call(['git', '-C', repo_path, 'pull', 'origin', 'master'], timeout=timeout) except subprocess.CalledProcessError as e: raise ValueError(f"Failed to pull from repository: {str(e)}") except subprocess.TimeoutExpired: raise ValueError(f"Git pull operation timed out after {timeout} seconds. Repository might be too large or network is slow.") except socket.error: raise ValueError(f"Network error: Cannot connect to {domain}:{port}. Please check your internet connection.") except PermissionError: raise ValueError(f"Permission denied: Cannot access the repository directory: {repo_path}") else: # Clone the repository try: # Check for internet connectivity first socket.create_connection((domain, port), timeout=5) # Attempt to clone the repository try: subprocess.check_call(['git', 'clone', repo_url, repo_path], timeout=timeout) except subprocess.CalledProcessError as e: # Provide more detailed error messages for common issues if "Authentication failed" in str(e): raise ValueError("Authentication failed. The repository might be private or the credentials are invalid.") elif "Repository not found" in str(e): raise ValueError(f"Repository not found: {repo_url}. Please check the URL and try again.") else: raise ValueError(f"Failed to clone repository: {str(e)}") except subprocess.TimeoutExpired: raise ValueError(f"Git clone operation timed out after {timeout} seconds. Repository might be too large or network is slow.") except socket.error: raise ValueError(f"Network error: Cannot connect to {domain}:{port}. Please check your internet connection.") except PermissionError: raise ValueError(f"Permission denied: Cannot create the repository directory: {repo_path}") # Verify that the repository contains JSON files json_files = list(Path(repo_path).glob('**/*.json')) if not json_files: raise ValueError("The repository doesn't contain any JSON files. Please check the repository and try again.") return repo_path except subprocess.CalledProcessError as e: error_msg = f"Git operation failed: {str(e)}" raise ValueError(error_msg) except Exception as e: error_msg = f"Repository operation failed: {str(e)}" raise ValueError(error_msg) def extract_folder_name(file_path): """ Extract the control panel name from a JSON file path. Example: "/path/Detailed-Views/MCM01 Fluid Inbound Merges 1-4/view.json" -> "MCM01 Fluid Inbound Merges" Args: file_path (str): Path to the JSON file Returns: str: Extracted control panel name, or empty string if not found """ try: # Convert to Path object for easier path manipulation path = Path(file_path) # Check if it's in a Detailed-Views subfolder parts = path.parts detailed_views_index = -1 for i, part in enumerate(parts): if part == "Detailed-Views": detailed_views_index = i break if detailed_views_index >= 0 and detailed_views_index < len(parts) - 1: # Get the folder name right after "Detailed-Views" folder_name = parts[detailed_views_index + 1] # Clean up the name - remove any numbering suffix like "1-4" # This matches the format in the example: "MCM01 Fluid Inbound Merges 1-4" -> "MCM01 Fluid Inbound Merges" clean_name = re.sub(r'\s+\d+-\d+$', '', folder_name) return clean_name return "" except Exception: return "" def should_exclude_name(name): """ Check if a name should be excluded based on the presence of certain keywords. Args: name (str): The name to check Returns: bool: True if the name should be excluded, False otherwise """ exclude_terms = ['button', 'camera', 'line', 'end', 'image', 'label', 'embeddedview', 'root','flexcontainer','buton'] name_lower = name.lower() for term in exclude_terms: if term.lower() in name_lower: return True return False def load_scada_names(repo_path): """ Find JSON files in the Detailed-Views folder and extract component names. Names can be found in 'meta.name' fields at both the root level and in nested children. Excludes names containing specific terms. Args: repo_path (str): Path to the repository Returns: list: List of dictionaries containing SCADA names and control panels Raises: ValueError: If operation fails """ try: names_with_panels = [] repo_dir = Path(repo_path) # Find JSON files only in Detailed-Views folder json_files = list(repo_dir.glob('**/Detailed-Views/**/*.json')) if not json_files: print(f"Warning: No JSON files found in Detailed-Views folder at {repo_path}") for json_file in json_files: try: with open(json_file, 'r') as f: data = json.load(f) # Extract control panel name from file path control_panel = extract_folder_name(str(json_file)) # Extract names recursively from the JSON structure, now with control panel info extract_names_recursive(data, names_with_panels, control_panel, visited=None) except json.JSONDecodeError: # Skip invalid JSON files continue except Exception as e: # Skip files with other errors continue return names_with_panels except Exception as e: error_msg = f"Failed to load SCADA names: {str(e)}" raise ValueError(error_msg) def extract_names_recursive(obj, names_list, control_panel, visited=None): """ Recursively extract all 'meta.name' values from a nested JSON object. Excludes names containing terms defined in should_exclude_name function. Args: obj: The JSON object or list to process names_list: List to append found names to control_panel: The control panel name extracted from file path visited: Set of object ids already visited (to prevent infinite recursion) """ if visited is None: visited = set() # Skip already visited objects or non-container types if not isinstance(obj, (dict, list)) or id(obj) in visited: return # Mark this object as visited visited.add(id(obj)) if isinstance(obj, dict): # Check if this object has a meta.name field if 'meta' in obj and isinstance(obj['meta'], dict) and 'name' in obj['meta']: name = obj['meta']['name'] if name and isinstance(name, str) and not should_exclude_name(name): names_list.append({ "name": name, "control_panel": control_panel }) # Check for children array and process only this key specifically if 'children' in obj and isinstance(obj['children'], list): for child in obj['children']: extract_names_recursive(child, names_list, control_panel, visited) # Only process a few key dictionary values that might contain component definitions keys_to_process = ['root', 'props', 'custom'] for key in keys_to_process: if key in obj: extract_names_recursive(obj[key], names_list, control_panel, visited) elif isinstance(obj, list): # Process only the first 1000 items to prevent excessive recursion for item in obj[:1000]: extract_names_recursive(item, names_list, control_panel, visited) def load_excel_names(file_path): """ Extract names from an Excel file. Args: file_path (str): Path to the Excel file Returns: list: List of dictionaries containing names and control panels from the Excel file Raises: ValueError: If file doesn't exist, isn't a valid Excel file, or doesn't contain a "Name" column """ try: # Check if file exists if not os.path.exists(file_path): raise ValueError(f"Excel file not found: {file_path}") # Check file size before attempting to read file_size_mb = os.path.getsize(file_path) / (1024 * 1024) if file_size_mb > 50: # 50 MB limit raise ValueError(f"Excel file too large ({file_size_mb:.1f} MB). Maximum size is 50 MB.") # Try to read Excel file with a timeout try: df = pd.read_excel(file_path) except pd.errors.EmptyDataError: raise ValueError(f"Excel file is empty: {file_path}") except pd.errors.ParserError: raise ValueError(f"Invalid Excel file format or corrupted file: {file_path}") except Exception as e: raise ValueError(f"Failed to read Excel file: {str(e)}") # Check if any data exists if df.empty: raise ValueError(f"Excel file contains no data: {file_path}") # Find name column - check for 'Name' column (case-insensitive) name_col = None for col in df.columns: if isinstance(col, str) and col.lower() == 'name': name_col = col break if not name_col: raise ValueError(f"Excel file missing required 'Name' column") # Find control panel column (if it exists) - check for any column containing "control" or "panel" (case-insensitive) control_panel_col = None for col in df.columns: if isinstance(col, str) and ('control' in col.lower() or 'panel' in col.lower()): control_panel_col = col break # Extract names and control panels, ignoring NaN values result = [] for _, row in df.iterrows(): name = row.get(name_col) if pd.isna(name): continue # Get Control Panel value, default to empty string if missing control_panel = "" if control_panel_col and control_panel_col in df.columns: control_panel = row.get(control_panel_col, "") # Handle NaN values if pd.isna(control_panel): control_panel = "" # Store as dictionary with name and control_panel keys result.append({ "name": str(name).strip(), "control_panel": str(control_panel).strip() }) # Check if we got any names if not result: raise ValueError(f"No valid names found in Excel file: {file_path}") return result except pd.errors.EmptyDataError: raise ValueError(f"Excel file is empty: {file_path}") except pd.errors.ParserError: raise ValueError(f"Invalid Excel file format: {file_path}") except Exception as e: error_msg = f"Failed to load names from Excel file: {str(e)}" raise ValueError(error_msg) def normalize_names(names_with_panels): """ Normalize a list of name dictionaries for consistent comparison. Normalization includes: - Stripping whitespace - Converting to uppercase - Removing duplicates Args: names_with_panels (list): List of dictionaries containing 'name' and 'control_panel' Returns: tuple: (normalized_list, name_mapping) where: - normalized_list is the list of normalized name dictionaries - name_mapping is a dict mapping normalized names to original info Raises: ValueError: If input is not a valid list """ try: if not isinstance(names_with_panels, list): raise ValueError("Input must be a list of name dictionaries") normalized = [] name_mapping = {} # Map normalized names to original names and control panels for item in names_with_panels: if not isinstance(item, dict) or 'name' not in item: continue name = item['name'] control_panel = item.get('control_panel', '') if name is None: continue # Convert to string if not already if not isinstance(name, str): name = str(name) # Preserve the original name exactly as it appears in the source original_name = name.strip() # Apply normalization steps normalized_name = original_name.upper() normalized.append({ 'name': normalized_name, 'control_panel': control_panel }) # Store the mapping (use normalized name as key) name_mapping[normalized_name] = { 'original_name': original_name, 'control_panel': control_panel } # Remove duplicates while preserving order normalized_unique = [] seen = set() for item in normalized: name = item['name'] if name not in seen and name: # Skip empty strings seen.add(name) normalized_unique.append(item) return normalized_unique, name_mapping except Exception as e: error_msg = f"Failed to normalize names: {str(e)}" raise ValueError(error_msg) def compare_name_lists(list1, list2): """ Compare two lists of name dictionaries and identify differences. Args: list1 (list): First list of name dictionaries list2 (list): Second list of name dictionaries Returns: dict: Dictionary containing: - 'only_in_list1': Items in list1 but not in list2 - 'only_in_list2': Items in list2 but not in list1 - 'common': Items present in both lists """ try: if not isinstance(list1, list) or not isinstance(list2, list): raise ValueError("Both inputs must be lists") # Extract just the names for set operations names1 = {item['name'] for item in list1} names2 = {item['name'] for item in list2} # Find names unique to each list and common names only_in_list1_names = names1 - names2 only_in_list2_names = names2 - names1 common_names = names1 & names2 # Build result lists with full item info only_in_list1 = [item for item in list1 if item['name'] in only_in_list1_names] only_in_list2 = [item for item in list2 if item['name'] in only_in_list2_names] # For common items, we need to merge control panel info from both lists common_items = [] # Create lookup dictionaries for faster access list1_dict = {item['name']: item for item in list1} list2_dict = {item['name']: item for item in list2} for name in common_names: item1 = list1_dict[name] item2 = list2_dict[name] # Use control panel from list1 if available, otherwise from list2 control_panel = item1.get('control_panel') or item2.get('control_panel', '') common_items.append({ 'name': name, 'control_panel': control_panel }) # Sort results for consistent output only_in_list1.sort(key=lambda x: x['name']) only_in_list2.sort(key=lambda x: x['name']) common_items.sort(key=lambda x: x['name']) # Return comparison results return { 'only_in_list1': only_in_list1, 'only_in_list2': only_in_list2, 'common': common_items } except Exception as e: error_msg = f"Failed to compare name lists: {str(e)}" raise ValueError(error_msg) def force_update_repo(repo_id): """ Force update a repository regardless of which route is being accessed. Args: repo_id (str): ID of the repository to update Returns: bool: True if update was successful, False otherwise """ try: # Find repository path repo_path = os.path.join(app.config['CLONES_FOLDER'], repo_id) if not os.path.exists(repo_path) or not os.path.exists(os.path.join(repo_path, '.git')): return False # Try to pull from main branch try: subprocess.check_call(['git', '-C', repo_path, 'pull', 'origin', 'main'], timeout=30) except subprocess.CalledProcessError: # Try master branch if main fails try: subprocess.check_call(['git', '-C', repo_path, 'pull', 'origin', 'master'], timeout=30) except subprocess.CalledProcessError: return False return True except Exception: return False # Routes @app.route('/') def index(): """ Render the homepage with the upload form. Returns: str: Rendered HTML template """ log_info(f"Accessed index page - Request: {request.method} {request.path}") # Check if there are any previous results to show has_previous_results = len(SHARED_DATA['comparisons']) > 0 last_update_time = SHARED_DATA['last_update_time'] comparisons = SHARED_DATA['comparisons'] return render_template('index.html', has_previous_results=has_previous_results, last_update_time=last_update_time, comparisons=comparisons) @app.route('/compare', methods=['POST']) def compare(): """ Handle form submission, process data, and display comparison results. Returns: str: Rendered HTML template with comparison results """ log_info(f"Compare route accessed - IP: {request.remote_addr}") log_debug(f"Request method: {request.method}") log_debug(f"Request content type: {request.content_type}") log_debug(f"Request headers: {dict(request.headers)}") try: # Log request details log_debug(f"Form data keys: {list(request.form.keys())}") log_debug(f"Files keys: {list(request.files.keys())}") if not request.form: log_error("No form data received") flash('No form data received. Please try again.', 'danger') return redirect(url_for('index')) # Check for CSRF token if 'csrf_token' not in request.form: log_error("CSRF token missing from form data") flash('CSRF token missing. Please refresh the page and try again.', 'danger') return redirect(url_for('index')) # Extract repo URL and validate repo_url = request.form.get('repo_url') if not repo_url: log_error("Repository URL is missing from form data") flash('Repository URL is required', 'danger') return redirect(url_for('index')) log_info(f"Processing comparison with repo URL: {repo_url}") # Handle file uploads if 'manifest_file' not in request.files or 'dwg_file' not in request.files: missing_files = [] if 'manifest_file' not in request.files: missing_files.append('manifest_file') if 'dwg_file' not in request.files: missing_files.append('dwg_file') log_error(f"Missing required files: {', '.join(missing_files)}") flash(f'Missing required files: {", ".join(missing_files)}', 'danger') return redirect(url_for('index')) manifest_file = request.files['manifest_file'] dwg_file = request.files['dwg_file'] log_debug(f"Manifest filename: {manifest_file.filename}") log_debug(f"DWG filename: {dwg_file.filename}") # Check for empty filenames if not manifest_file.filename or not dwg_file.filename: log_error(f"Empty filenames - Manifest: '{manifest_file.filename}', DWG: '{dwg_file.filename}'") if not manifest_file.filename: flash("No manifest file selected", 'danger') if not dwg_file.filename: flash("No DWG file selected", 'danger') return redirect(url_for('index')) # Validate uploaded files is_valid, error_msg = validate_excel_file(manifest_file) if not is_valid: log_error(f"Manifest file validation error: {error_msg}") flash(f"Manifest file error: {error_msg}", 'danger') return redirect(url_for('index')) is_valid, error_msg = validate_excel_file(dwg_file) if not is_valid: log_error(f"DWG file validation error: {error_msg}") flash(f"DWG file error: {error_msg}", 'danger') return redirect(url_for('index')) # Generate unique ID for this comparison comparison_id = str(uuid.uuid4()) log_info(f"Created comparison ID: {comparison_id}") # Create folder for this comparison comparison_folder = os.path.join(app.config['UPLOAD_FOLDER'], comparison_id) os.makedirs(comparison_folder, exist_ok=True) log_debug(f"Created comparison folder: {comparison_folder}") # Save uploaded files with shared permissions manifest_path = os.path.join(comparison_folder, 'manifest.xlsx') dwg_path = os.path.join(comparison_folder, 'dwg.xlsx') try: manifest_file.save(manifest_path) log_debug(f"Saved manifest file to: {manifest_path}") except Exception as e: log_error(f"Failed to save manifest file: {str(e)}", exc_info=True) flash(f"Failed to save manifest file: {str(e)}", 'danger') return redirect(url_for('index')) try: dwg_file.save(dwg_path) log_debug(f"Saved DWG file to: {dwg_path}") except Exception as e: log_error(f"Failed to save DWG file: {str(e)}", exc_info=True) flash(f"Failed to save DWG file: {str(e)}", 'danger') return redirect(url_for('index')) # Set permissions on uploaded files try: os.chmod(manifest_path, 0o666) os.chmod(dwg_path, 0o666) except Exception as e: log_error(f"Could not set permissions on uploaded files: {str(e)}") # Clone or update repository try: repo_path = create_or_update_repo(repo_url) log_info(f"Repository path: {repo_path}") except ValueError as e: log_error(f"Repository error: {str(e)}") flash(str(e), 'danger') return redirect(url_for('index')) # Load data from all sources try: log_info("Loading SCADA names from repository") scada_names = load_scada_names(repo_path) log_debug(f"Loaded {len(scada_names)} SCADA names") log_info("Loading manifest names from Excel") manifest_names = load_excel_names(manifest_path) log_debug(f"Loaded {len(manifest_names)} manifest names") log_info("Loading DWG names from Excel") dwg_names = load_excel_names(dwg_path) log_debug(f"Loaded {len(dwg_names)} DWG names") except ValueError as e: log_error(f"Error loading data: {str(e)}") flash(str(e), 'danger') return redirect(url_for('index')) # Normalize names for consistent comparison and get name mappings normalized_scada, scada_mapping = normalize_names(scada_names) log_debug(f"Normalized SCADA names: {len(normalized_scada)}") normalized_manifest, manifest_mapping = normalize_names(manifest_names) log_debug(f"Normalized manifest names: {len(normalized_manifest)}") normalized_dwg, dwg_mapping = normalize_names(dwg_names) log_debug(f"Normalized DWG names: {len(normalized_dwg)}") # Generate repo_id for future updates repo_id = hashlib.sha1(repo_url.encode()).hexdigest() # Compare all combinations log_info("Comparing normalized name lists") scada_vs_manifest = compare_name_lists(normalized_scada, normalized_manifest) scada_vs_dwg = compare_name_lists(normalized_scada, normalized_dwg) manifest_vs_dwg = compare_name_lists(normalized_manifest, normalized_dwg) # Prepare comparison data for the template comparison_data = { 'scada_vs_manifest': { 'only_in_scada': scada_vs_manifest['only_in_list1'], 'only_in_manifest': scada_vs_manifest['only_in_list2'], 'common': scada_vs_manifest['common'], 'scada_count': len(normalized_scada), 'manifest_count': len(normalized_manifest) }, 'scada_vs_dwg': { 'only_in_scada': scada_vs_dwg['only_in_list1'], 'only_in_dwg': scada_vs_dwg['only_in_list2'], 'common': scada_vs_dwg['common'], 'scada_count': len(normalized_scada), 'dwg_count': len(normalized_dwg) }, 'manifest_vs_dwg': { 'only_in_manifest': manifest_vs_dwg['only_in_list1'], 'only_in_dwg': manifest_vs_dwg['only_in_list2'], 'common': manifest_vs_dwg['common'], 'manifest_count': len(normalized_manifest), 'dwg_count': len(normalized_dwg) }, 'name_mappings': { 'scada': scada_mapping, 'manifest': manifest_mapping, 'dwg': dwg_mapping }, 'repo_id': repo_id, 'repository_url': repo_url, 'comparison_id': comparison_id, 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), 'name': f"Comparison {time.strftime('%Y-%m-%d %H:%M:%S')}" } # Update shared data for all users global SHARED_DATA SHARED_DATA['comparisons'][comparison_id] = comparison_data SHARED_DATA['latest_comparison_id'] = comparison_id SHARED_DATA['last_update_time'] = time.strftime('%Y-%m-%d %H:%M:%S') log_info(f"Comparison completed successfully - ID: {comparison_id}") return render_template('results.html', data=comparison_data, comparisons=SHARED_DATA['comparisons']) except ValueError as e: log_error(f"ValueError during comparison: {str(e)}") flash(str(e), 'danger') return redirect(url_for('index')) except Exception as e: log_error(f"Unexpected error during comparison: {str(e)}", exc_info=True) flash(f"An unexpected error occurred: {str(e)}", 'danger') return redirect(url_for('index')) # Custom error handler for CSRFs @app.errorhandler(400) def handle_csrf_error(e): log_error(f"400 error: {str(e)}") log_debug(f"Form data at error: {dict(request.form) if request.form else 'No form data'}") log_debug(f"Request URL: {request.url}") log_debug(f"Request method: {request.method}") log_debug(f"Request headers: {dict(request.headers)}") flash("Your form submission failed. This could be due to an expired session. Please try again.", "danger") return redirect(url_for('index')) @app.errorhandler(413) def request_entity_too_large(e): log_error(f"413 error: File too large") flash("The uploaded file is too large. Maximum size is 50 MB.", "danger") return redirect(url_for('index')) @app.route('/latest') def latest_results(): """ Show the latest comparison results from any user. Always update the repository before displaying results. Returns: str: Rendered HTML template with the latest comparison results """ if not SHARED_DATA['latest_comparison_id'] or len(SHARED_DATA['comparisons']) == 0: flash('No comparison results available yet. Please upload files to generate results.', 'info') return redirect(url_for('index')) comparison_id = SHARED_DATA['latest_comparison_id'] data = SHARED_DATA['comparisons'][comparison_id] # Update repository and reload data if possible repo_id = data.get('repo_id') if repo_id: repository_updated = force_update_repo(repo_id) if repository_updated: # Get files path comparison_folder = os.path.join(app.config['UPLOAD_FOLDER'], comparison_id) manifest_path = os.path.join(comparison_folder, 'manifest.xlsx') dwg_path = os.path.join(comparison_folder, 'dwg.xlsx') if os.path.exists(manifest_path) and os.path.exists(dwg_path): try: # Repository path repo_path = os.path.join(app.config['CLONES_FOLDER'], repo_id) # Reload data scada_names = load_scada_names(repo_path) manifest_names = load_excel_names(manifest_path) dwg_names = load_excel_names(dwg_path) # Normalize names normalized_scada, scada_mapping = normalize_names(scada_names) normalized_manifest, manifest_mapping = normalize_names(manifest_names) normalized_dwg, dwg_mapping = normalize_names(dwg_names) # Compare data scada_vs_manifest = compare_name_lists(normalized_scada, normalized_manifest) scada_vs_dwg = compare_name_lists(normalized_scada, normalized_dwg) manifest_vs_dwg = compare_name_lists(normalized_manifest, normalized_dwg) # Update comparison data data = { 'scada_vs_manifest': { 'only_in_scada': scada_vs_manifest['only_in_list1'], 'only_in_manifest': scada_vs_manifest['only_in_list2'], 'common': scada_vs_manifest['common'], 'scada_count': len(normalized_scada), 'manifest_count': len(normalized_manifest) }, 'scada_vs_dwg': { 'only_in_scada': scada_vs_dwg['only_in_list1'], 'only_in_dwg': scada_vs_dwg['only_in_list2'], 'common': scada_vs_dwg['common'], 'scada_count': len(normalized_scada), 'dwg_count': len(normalized_dwg) }, 'manifest_vs_dwg': { 'only_in_manifest': manifest_vs_dwg['only_in_list1'], 'only_in_dwg': manifest_vs_dwg['only_in_list2'], 'common': manifest_vs_dwg['common'], 'manifest_count': len(normalized_manifest), 'dwg_count': len(normalized_dwg) }, 'name_mappings': { 'scada': scada_mapping, 'manifest': manifest_mapping, 'dwg': dwg_mapping }, 'repo_id': repo_id, 'repository_url': data['repository_url'], 'comparison_id': comparison_id, 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), 'name': data['name'] } # Update shared data SHARED_DATA['comparisons'][comparison_id] = data SHARED_DATA['last_update_time'] = time.strftime('%Y-%m-%d %H:%M:%S') flash('Repository has been updated and data has been reloaded with the latest changes.', 'success') except Exception as e: flash(f'Repository updated but error reloading data: {str(e)}', 'warning') else: flash('Repository has been updated with the latest changes, but data files not found.', 'warning') return render_template('results.html', data=data, comparisons=SHARED_DATA['comparisons']) @app.route('/comparison/') def view_comparison(comparison_id): """ View a specific comparison by ID. Always update the repository before displaying results. Args: comparison_id: ID of the comparison to view Returns: str: Rendered HTML template with the comparison results """ if comparison_id not in SHARED_DATA['comparisons']: flash('Comparison not found. It may have been deleted.', 'warning') return redirect(url_for('index')) data = SHARED_DATA['comparisons'][comparison_id] # Update repository and reload data if possible repo_id = data.get('repo_id') if repo_id: repository_updated = force_update_repo(repo_id) if repository_updated: # Get files path comparison_folder = os.path.join(app.config['UPLOAD_FOLDER'], comparison_id) manifest_path = os.path.join(comparison_folder, 'manifest.xlsx') dwg_path = os.path.join(comparison_folder, 'dwg.xlsx') if os.path.exists(manifest_path) and os.path.exists(dwg_path): try: # Repository path repo_path = os.path.join(app.config['CLONES_FOLDER'], repo_id) # Reload data scada_names = load_scada_names(repo_path) manifest_names = load_excel_names(manifest_path) dwg_names = load_excel_names(dwg_path) # Normalize names normalized_scada, scada_mapping = normalize_names(scada_names) normalized_manifest, manifest_mapping = normalize_names(manifest_names) normalized_dwg, dwg_mapping = normalize_names(dwg_names) # Compare data scada_vs_manifest = compare_name_lists(normalized_scada, normalized_manifest) scada_vs_dwg = compare_name_lists(normalized_scada, normalized_dwg) manifest_vs_dwg = compare_name_lists(normalized_manifest, normalized_dwg) # Update comparison data data = { 'scada_vs_manifest': { 'only_in_scada': scada_vs_manifest['only_in_list1'], 'only_in_manifest': scada_vs_manifest['only_in_list2'], 'common': scada_vs_manifest['common'], 'scada_count': len(normalized_scada), 'manifest_count': len(normalized_manifest) }, 'scada_vs_dwg': { 'only_in_scada': scada_vs_dwg['only_in_list1'], 'only_in_dwg': scada_vs_dwg['only_in_list2'], 'common': scada_vs_dwg['common'], 'scada_count': len(normalized_scada), 'dwg_count': len(normalized_dwg) }, 'manifest_vs_dwg': { 'only_in_manifest': manifest_vs_dwg['only_in_list1'], 'only_in_dwg': manifest_vs_dwg['only_in_list2'], 'common': manifest_vs_dwg['common'], 'manifest_count': len(normalized_manifest), 'dwg_count': len(normalized_dwg) }, 'name_mappings': { 'scada': scada_mapping, 'manifest': manifest_mapping, 'dwg': dwg_mapping }, 'repo_id': repo_id, 'repository_url': data['repository_url'], 'comparison_id': comparison_id, 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), 'name': data['name'] } # Update shared data SHARED_DATA['comparisons'][comparison_id] = data SHARED_DATA['last_update_time'] = time.strftime('%Y-%m-%d %H:%M:%S') flash('Repository has been updated and data has been reloaded with the latest changes.', 'success') except Exception as e: flash(f'Repository updated but error reloading data: {str(e)}', 'warning') else: flash('Repository has been updated with the latest changes, but data files not found.', 'warning') return render_template('results.html', data=data, comparisons=SHARED_DATA['comparisons']) @app.route('/update_files', methods=['GET', 'POST']) def update_files(): """ Handle re-upload of files for an existing comparison. Returns: str: Rendered HTML template with updated comparison results """ # If GET request, redirect to index if request.method == 'GET': return redirect(url_for('index')) try: # Extract repo_id and comparison_id repo_id = request.form.get('repo_id') comparison_id = request.form.get('comparison_id') if not repo_id: flash('Repository ID is required for updates', 'danger') return redirect(url_for('index')) # Check if repository exists repo_path = os.path.join(app.config['CLONES_FOLDER'], repo_id) if not os.path.exists(repo_path): flash('Repository not found. Please start a new comparison.', 'danger') return redirect(url_for('index')) # Create a new comparison ID if not provided if not comparison_id or comparison_id not in SHARED_DATA['comparisons']: comparison_id = str(uuid.uuid4()) comparison_folder = os.path.join(app.config['UPLOAD_FOLDER'], comparison_id) os.makedirs(comparison_folder, exist_ok=True) else: comparison_folder = os.path.join(app.config['UPLOAD_FOLDER'], comparison_id) if not os.path.exists(comparison_folder): os.makedirs(comparison_folder, exist_ok=True) # Handle file uploads manifest_path = os.path.join(comparison_folder, 'manifest.xlsx') dwg_path = os.path.join(comparison_folder, 'dwg.xlsx') # Check if new manifest file was uploaded if 'manifest_file' in request.files and request.files['manifest_file'].filename != '': manifest_file = request.files['manifest_file'] # Validate uploaded file is_valid, error_msg = validate_excel_file(manifest_file) if not is_valid: flash(f"Manifest file error: {error_msg}", 'danger') return redirect(url_for('index')) manifest_file.save(manifest_path) elif not os.path.exists(manifest_path): flash('Manifest file not found. Please upload a manifest file.', 'danger') return redirect(url_for('index')) # Check if new DWG file was uploaded if 'dwg_file' in request.files and request.files['dwg_file'].filename != '': dwg_file = request.files['dwg_file'] # Validate uploaded file is_valid, error_msg = validate_excel_file(dwg_file) if not is_valid: flash(f"DWG file error: {error_msg}", 'danger') return redirect(url_for('index')) dwg_file.save(dwg_path) elif not os.path.exists(dwg_path): flash('DWG file not found. Please upload a DWG file.', 'danger') return redirect(url_for('index')) # Extract repository_url from existing comparison or use default repository_url = SHARED_DATA['comparisons'].get(comparison_id, {}).get('repository_url', 'Unknown Repository') # Load data from all sources scada_names = load_scada_names(repo_path) manifest_names = load_excel_names(manifest_path) dwg_names = load_excel_names(dwg_path) # Normalize names for consistent comparison and get name mappings normalized_scada, scada_mapping = normalize_names(scada_names) normalized_manifest, manifest_mapping = normalize_names(manifest_names) normalized_dwg, dwg_mapping = normalize_names(dwg_names) # Compare all combinations scada_vs_manifest = compare_name_lists(normalized_scada, normalized_manifest) scada_vs_dwg = compare_name_lists(normalized_scada, normalized_dwg) manifest_vs_dwg = compare_name_lists(normalized_manifest, normalized_dwg) # Prepare comparison data for the template comparison_data = { 'scada_vs_manifest': { 'only_in_scada': scada_vs_manifest['only_in_list1'], 'only_in_manifest': scada_vs_manifest['only_in_list2'], 'common': scada_vs_manifest['common'], 'scada_count': len(normalized_scada), 'manifest_count': len(normalized_manifest) }, 'scada_vs_dwg': { 'only_in_scada': scada_vs_dwg['only_in_list1'], 'only_in_dwg': scada_vs_dwg['only_in_list2'], 'common': scada_vs_dwg['common'], 'scada_count': len(normalized_scada), 'dwg_count': len(normalized_dwg) }, 'manifest_vs_dwg': { 'only_in_manifest': manifest_vs_dwg['only_in_list1'], 'only_in_dwg': manifest_vs_dwg['only_in_list2'], 'common': manifest_vs_dwg['common'], 'manifest_count': len(normalized_manifest), 'dwg_count': len(normalized_dwg) }, 'name_mappings': { 'scada': scada_mapping, 'manifest': manifest_mapping, 'dwg': dwg_mapping }, 'repo_id': repo_id, 'repository_url': repository_url, 'comparison_id': comparison_id, 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), 'name': SHARED_DATA['comparisons'].get(comparison_id, {}).get('name', f"Updated Comparison {time.strftime('%Y-%m-%d %H:%M:%S')}") } # Update shared data SHARED_DATA['comparisons'][comparison_id] = comparison_data SHARED_DATA['latest_comparison_id'] = comparison_id SHARED_DATA['last_update_time'] = time.strftime('%Y-%m-%d %H:%M:%S') flash('Comparison updated successfully', 'success') return render_template('results.html', data=comparison_data, comparisons=SHARED_DATA['comparisons']) except ValueError as e: flash(str(e), 'danger') return redirect(url_for('index')) except Exception as e: flash(f"An unexpected error occurred: {str(e)}", 'danger') return redirect(url_for('index')) @app.route('/rename_comparison/', methods=['POST']) def rename_comparison(comparison_id): """ Rename a comparison. Args: comparison_id: ID of the comparison to rename Returns: JSON response with success status """ if comparison_id not in SHARED_DATA['comparisons']: return jsonify({'success': False, 'message': 'Comparison not found'}) name = request.form.get('name') if not name: return jsonify({'success': False, 'message': 'Name is required'}) SHARED_DATA['comparisons'][comparison_id]['name'] = name return jsonify({'success': True}) @app.route('/delete_comparison/', methods=['POST']) def delete_comparison(comparison_id): """ Delete a comparison. Args: comparison_id: ID of the comparison to delete Returns: Redirect to homepage """ if comparison_id in SHARED_DATA['comparisons']: # Delete comparison data del SHARED_DATA['comparisons'][comparison_id] # Delete comparison folder comparison_folder = os.path.join(app.config['UPLOAD_FOLDER'], comparison_id) if os.path.exists(comparison_folder): try: for file in os.listdir(comparison_folder): os.remove(os.path.join(comparison_folder, file)) os.rmdir(comparison_folder) except Exception as e: print(f"Warning: Could not delete comparison folder: {str(e)}") # Update latest comparison ID if needed if SHARED_DATA['latest_comparison_id'] == comparison_id: if SHARED_DATA['comparisons']: SHARED_DATA['latest_comparison_id'] = list(SHARED_DATA['comparisons'].keys())[-1] else: SHARED_DATA['latest_comparison_id'] = None flash('Comparison deleted successfully', 'success') else: flash('Comparison not found', 'warning') return redirect(url_for('index')) @app.route('/refresh_repository', methods=['POST']) def refresh_repository(): """ Handle repository refresh requests and reload data with the latest changes. Returns: str: Rendered HTML template with updated comparison results """ try: # Extract repo_id and comparison_id repo_id = request.form.get('repo_id') comparison_id = request.form.get('comparison_id') if not repo_id or not comparison_id: flash('Repository ID and comparison ID are required', 'danger') return redirect(url_for('index')) if comparison_id not in SHARED_DATA['comparisons']: flash('Comparison not found', 'danger') return redirect(url_for('index')) # Get the existing comparison data comparison_data = SHARED_DATA['comparisons'][comparison_id] # Get files path from existing comparison comparison_folder = os.path.join(app.config['UPLOAD_FOLDER'], comparison_id) manifest_path = os.path.join(comparison_folder, 'manifest.xlsx') dwg_path = os.path.join(comparison_folder, 'dwg.xlsx') # Check if files exist if not os.path.exists(manifest_path) or not os.path.exists(dwg_path): flash('Required files not found', 'danger') return redirect(url_for('index')) # Find repository path repo_path = os.path.join(app.config['CLONES_FOLDER'], repo_id) if not os.path.exists(repo_path) or not os.path.exists(os.path.join(repo_path, '.git')): flash('Repository not found', 'danger') return redirect(url_for('index')) # Update the repository try: # Try to pull from main branch try: subprocess.check_call(['git', '-C', repo_path, 'pull', 'origin', 'main'], timeout=30) repository_updated = True except subprocess.CalledProcessError: # Try master branch if main fails try: subprocess.check_call(['git', '-C', repo_path, 'pull', 'origin', 'master'], timeout=30) repository_updated = True except subprocess.CalledProcessError: flash('Failed to update repository', 'warning') repository_updated = False except Exception as e: flash(f'Error updating repository: {str(e)}', 'warning') repository_updated = False # Reload data from all sources scada_names = load_scada_names(repo_path) manifest_names = load_excel_names(manifest_path) dwg_names = load_excel_names(dwg_path) # Normalize names for consistent comparison and get name mappings normalized_scada, scada_mapping = normalize_names(scada_names) normalized_manifest, manifest_mapping = normalize_names(manifest_names) normalized_dwg, dwg_mapping = normalize_names(dwg_names) # Compare all combinations scada_vs_manifest = compare_name_lists(normalized_scada, normalized_manifest) scada_vs_dwg = compare_name_lists(normalized_scada, normalized_dwg) manifest_vs_dwg = compare_name_lists(normalized_manifest, normalized_dwg) # Prepare comparison data for the template updated_comparison_data = { 'scada_vs_manifest': { 'only_in_scada': scada_vs_manifest['only_in_list1'], 'only_in_manifest': scada_vs_manifest['only_in_list2'], 'common': scada_vs_manifest['common'], 'scada_count': len(normalized_scada), 'manifest_count': len(normalized_manifest) }, 'scada_vs_dwg': { 'only_in_scada': scada_vs_dwg['only_in_list1'], 'only_in_dwg': scada_vs_dwg['only_in_list2'], 'common': scada_vs_dwg['common'], 'scada_count': len(normalized_scada), 'dwg_count': len(normalized_dwg) }, 'manifest_vs_dwg': { 'only_in_manifest': manifest_vs_dwg['only_in_list1'], 'only_in_dwg': manifest_vs_dwg['only_in_list2'], 'common': manifest_vs_dwg['common'], 'manifest_count': len(normalized_manifest), 'dwg_count': len(normalized_dwg) }, 'name_mappings': { 'scada': scada_mapping, 'manifest': manifest_mapping, 'dwg': dwg_mapping }, 'repo_id': repo_id, 'repository_url': comparison_data['repository_url'], 'comparison_id': comparison_id, 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), 'name': comparison_data['name'] } # Update shared data SHARED_DATA['comparisons'][comparison_id] = updated_comparison_data SHARED_DATA['last_update_time'] = time.strftime('%Y-%m-%d %H:%M:%S') if repository_updated: flash('Repository updated and data reloaded successfully', 'success') else: flash('Data reloaded with existing repository content', 'info') return render_template('results.html', data=updated_comparison_data, comparisons=SHARED_DATA['comparisons']) except ValueError as e: flash(str(e), 'danger') return redirect(url_for('index')) except Exception as e: flash(f"An unexpected error occurred: {str(e)}", 'danger') return redirect(url_for('index')) if __name__ == '__main__': app.run(host='0.0.0.0', port=5000, debug=True)