2025-05-16 10:18:55 +04:00

1406 lines
58 KiB
Python

import os
import hashlib
import subprocess
import json
import re
import time
import socket
import traceback
import sys
from pathlib import Path
import pandas as pd
from flask import Flask, render_template, request, url_for, redirect, flash, jsonify
from flask_wtf.csrf import CSRFProtect
import uuid
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
app.config['UPLOAD_FOLDER'] = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'shared_uploads')
app.config['CLONES_FOLDER'] = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'shared_clones')
app.config['MAX_CONTENT_LENGTH'] = 50 * 1024 * 1024 # 50 MB limit for uploads
# Initialize CSRF protection
csrf = CSRFProtect(app)
# Enhanced logging setup
def log_error(msg, exc_info=None):
"""Log error messages with stacktrace if provided"""
err_msg = f"ERROR: {msg}"
if exc_info:
err_msg += f"\n{traceback.format_exc()}"
print(err_msg, file=sys.stderr)
def log_info(msg):
"""Log informational messages"""
print(f"INFO: {msg}", file=sys.stdout)
def log_debug(msg):
"""Log debug messages"""
print(f"DEBUG: {msg}", file=sys.stdout)
# Log startup information
log_info(f"Flask app starting with UPLOAD_FOLDER: {app.config['UPLOAD_FOLDER']}")
log_info(f"CLONES_FOLDER: {app.config['CLONES_FOLDER']}")
# Ensure directories exist with appropriate permissions
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
os.makedirs(app.config['CLONES_FOLDER'], exist_ok=True)
# Set permissions to allow all users to read/write
try:
os.chmod(app.config['UPLOAD_FOLDER'], 0o777)
os.chmod(app.config['CLONES_FOLDER'], 0o777)
except Exception as e:
log_error(f"Could not set permissions on shared folders: {str(e)}")
# Shared data storage - stores all comparisons
SHARED_DATA = {
'comparisons': {}, # Dictionary of comparison_id -> comparison_data
'latest_comparison_id': None,
'last_update_time': None
}
def validate_repo_url(url):
"""
Validate a Git repository URL format.
Args:
url (str): Repository URL to validate
Returns:
bool: True if URL is valid, False otherwise
str: Error message if invalid, None if valid
"""
# Check for empty URL
if not url or not url.strip():
return False, "Repository URL is required"
# Pattern for Git HTTP/HTTPS URLs (including ports)
http_pattern = r'^https?:\/\/(?:[\w.-]+)(?::\d+)?\/[\w.-]+\/[\w.-]+(?:\.git)?$'
# Pattern for Git SSH URLs
ssh_pattern = r'^git@(?:[\w.-]+):[\w.-]+\/[\w.-]+(?:\.git)?$'
if re.match(http_pattern, url) or re.match(ssh_pattern, url):
return True, None
else:
return False, "Invalid Git repository URL format. Please use a valid HTTP/HTTPS or SSH URL."
def validate_excel_file(file):
"""
Validate uploaded Excel file.
Args:
file: File object from request.files
Returns:
bool: True if file is valid, False otherwise
str: Error message if invalid, None if valid
"""
# Check if file exists
if not file or file.filename == '':
return False, "No file selected"
# Check file extension
allowed_extensions = {'.xlsx', '.xls'}
file_ext = os.path.splitext(file.filename)[1].lower()
if file_ext not in allowed_extensions:
return False, f"Invalid file type. Please upload an Excel file (.xlsx or .xls)"
# Check file size (although this is also enforced by MAX_CONTENT_LENGTH)
if len(file.read()) > app.config['MAX_CONTENT_LENGTH']:
file.seek(0) # Reset file pointer after reading
return False, f"File too large. Maximum size is {app.config['MAX_CONTENT_LENGTH'] // (1024 * 1024)} MB"
file.seek(0) # Reset file pointer after reading
return True, None
def create_or_update_repo(repo_url):
"""
Clone or update a Git repository.
Args:
repo_url (str): URL of the Git repository
Returns:
str: Path to the cloned repository
Raises:
ValueError: If repository URL is invalid or operation fails
"""
# Validate repo URL
is_valid, error_msg = validate_repo_url(repo_url)
if not is_valid:
raise ValueError(error_msg)
try:
# Generate a secure repo_id from the URL using SHA1
repo_id = hashlib.sha1(repo_url.encode()).hexdigest()
repo_path = os.path.join(app.config['CLONES_FOLDER'], repo_id)
# Set timeout for Git operations (30 seconds)
timeout = 30
# Extract domain and port for connectivity check
domain = "github.com" # Default fallback
port = 443 # Default HTTPS port
if "://" in repo_url:
domain_part = repo_url.split("://")[1].split("/")[0]
if ":" in domain_part:
domain, port_str = domain_part.split(":", 1)
port = int(port_str)
else:
domain = domain_part
# Set default port based on protocol
port = 80 if repo_url.startswith("http://") else 443
elif "@" in repo_url and ":" in repo_url:
domain = repo_url.split("@")[1].split(":")[0]
port = 22 # Default SSH port
if os.path.exists(repo_path):
# Check if it's actually a Git repository
if not os.path.exists(os.path.join(repo_path, '.git')):
raise ValueError(f"Directory exists but is not a Git repository: {repo_path}")
# Repository already exists, perform a pull
try:
# Check for internet connectivity first
socket.create_connection((domain, port), timeout=5)
# Try to pull from main branch
try:
subprocess.check_call(['git', '-C', repo_path, 'pull', 'origin', 'main'],
timeout=timeout)
except subprocess.CalledProcessError:
# Try master branch if main fails
try:
subprocess.check_call(['git', '-C', repo_path, 'pull', 'origin', 'master'],
timeout=timeout)
except subprocess.CalledProcessError as e:
raise ValueError(f"Failed to pull from repository: {str(e)}")
except subprocess.TimeoutExpired:
raise ValueError(f"Git pull operation timed out after {timeout} seconds. Repository might be too large or network is slow.")
except socket.error:
raise ValueError(f"Network error: Cannot connect to {domain}:{port}. Please check your internet connection.")
except PermissionError:
raise ValueError(f"Permission denied: Cannot access the repository directory: {repo_path}")
else:
# Clone the repository
try:
# Check for internet connectivity first
socket.create_connection((domain, port), timeout=5)
# Attempt to clone the repository
try:
subprocess.check_call(['git', 'clone', repo_url, repo_path],
timeout=timeout)
except subprocess.CalledProcessError as e:
# Provide more detailed error messages for common issues
if "Authentication failed" in str(e):
raise ValueError("Authentication failed. The repository might be private or the credentials are invalid.")
elif "Repository not found" in str(e):
raise ValueError(f"Repository not found: {repo_url}. Please check the URL and try again.")
else:
raise ValueError(f"Failed to clone repository: {str(e)}")
except subprocess.TimeoutExpired:
raise ValueError(f"Git clone operation timed out after {timeout} seconds. Repository might be too large or network is slow.")
except socket.error:
raise ValueError(f"Network error: Cannot connect to {domain}:{port}. Please check your internet connection.")
except PermissionError:
raise ValueError(f"Permission denied: Cannot create the repository directory: {repo_path}")
# Verify that the repository contains JSON files
json_files = list(Path(repo_path).glob('**/*.json'))
if not json_files:
raise ValueError("The repository doesn't contain any JSON files. Please check the repository and try again.")
return repo_path
except subprocess.CalledProcessError as e:
error_msg = f"Git operation failed: {str(e)}"
raise ValueError(error_msg)
except Exception as e:
error_msg = f"Repository operation failed: {str(e)}"
raise ValueError(error_msg)
def extract_folder_name(file_path):
"""
Extract the control panel name from a JSON file path.
Example: "/path/Detailed-Views/MCM01 Fluid Inbound Merges 1-4/view.json" -> "MCM01 Fluid Inbound Merges"
Args:
file_path (str): Path to the JSON file
Returns:
str: Extracted control panel name, or empty string if not found
"""
try:
# Convert to Path object for easier path manipulation
path = Path(file_path)
# Check if it's in a Detailed-Views subfolder
parts = path.parts
detailed_views_index = -1
for i, part in enumerate(parts):
if part == "Detailed-Views":
detailed_views_index = i
break
if detailed_views_index >= 0 and detailed_views_index < len(parts) - 1:
# Get the folder name right after "Detailed-Views"
folder_name = parts[detailed_views_index + 1]
# Clean up the name - remove any numbering suffix like "1-4"
# This matches the format in the example: "MCM01 Fluid Inbound Merges 1-4" -> "MCM01 Fluid Inbound Merges"
clean_name = re.sub(r'\s+\d+-\d+$', '', folder_name)
return clean_name
return ""
except Exception:
return ""
def should_exclude_name(name):
"""
Check if a name should be excluded based on the presence of certain keywords.
Args:
name (str): The name to check
Returns:
bool: True if the name should be excluded, False otherwise
"""
exclude_terms = ['button', 'camera', 'line', 'end', 'image', 'label', 'embeddedview', 'root','flexcontainer','buton']
name_lower = name.lower()
for term in exclude_terms:
if term.lower() in name_lower:
return True
return False
def load_scada_names(repo_path):
"""
Find JSON files in the Detailed-Views folder and extract component names.
Names can be found in 'meta.name' fields at both the root level and in nested children.
Excludes names containing specific terms.
Args:
repo_path (str): Path to the repository
Returns:
list: List of dictionaries containing SCADA names and control panels
Raises:
ValueError: If operation fails
"""
try:
names_with_panels = []
repo_dir = Path(repo_path)
# Find JSON files only in Detailed-Views folder
json_files = list(repo_dir.glob('**/Detailed-Views/**/*.json'))
if not json_files:
print(f"Warning: No JSON files found in Detailed-Views folder at {repo_path}")
for json_file in json_files:
try:
with open(json_file, 'r') as f:
data = json.load(f)
# Extract control panel name from file path
control_panel = extract_folder_name(str(json_file))
# Extract names recursively from the JSON structure, now with control panel info
extract_names_recursive(data, names_with_panels, control_panel, visited=None)
except json.JSONDecodeError:
# Skip invalid JSON files
continue
except Exception as e:
# Skip files with other errors
continue
return names_with_panels
except Exception as e:
error_msg = f"Failed to load SCADA names: {str(e)}"
raise ValueError(error_msg)
def extract_names_recursive(obj, names_list, control_panel, visited=None):
"""
Recursively extract all 'meta.name' values from a nested JSON object.
Excludes names containing terms defined in should_exclude_name function.
Args:
obj: The JSON object or list to process
names_list: List to append found names to
control_panel: The control panel name extracted from file path
visited: Set of object ids already visited (to prevent infinite recursion)
"""
if visited is None:
visited = set()
# Skip already visited objects or non-container types
if not isinstance(obj, (dict, list)) or id(obj) in visited:
return
# Mark this object as visited
visited.add(id(obj))
if isinstance(obj, dict):
# Check if this object has a meta.name field
if 'meta' in obj and isinstance(obj['meta'], dict) and 'name' in obj['meta']:
name = obj['meta']['name']
if name and isinstance(name, str) and not should_exclude_name(name):
names_list.append({
"name": name,
"control_panel": control_panel
})
# Check for children array and process only this key specifically
if 'children' in obj and isinstance(obj['children'], list):
for child in obj['children']:
extract_names_recursive(child, names_list, control_panel, visited)
# Only process a few key dictionary values that might contain component definitions
keys_to_process = ['root', 'props', 'custom']
for key in keys_to_process:
if key in obj:
extract_names_recursive(obj[key], names_list, control_panel, visited)
elif isinstance(obj, list):
# Process only the first 1000 items to prevent excessive recursion
for item in obj[:1000]:
extract_names_recursive(item, names_list, control_panel, visited)
def load_excel_names(file_path):
"""
Extract names from an Excel file.
Args:
file_path (str): Path to the Excel file
Returns:
list: List of dictionaries containing names and control panels from the Excel file
Raises:
ValueError: If file doesn't exist, isn't a valid Excel file, or doesn't contain a "Name" column
"""
try:
# Check if file exists
if not os.path.exists(file_path):
raise ValueError(f"Excel file not found: {file_path}")
# Check file size before attempting to read
file_size_mb = os.path.getsize(file_path) / (1024 * 1024)
if file_size_mb > 50: # 50 MB limit
raise ValueError(f"Excel file too large ({file_size_mb:.1f} MB). Maximum size is 50 MB.")
# Try to read Excel file with a timeout
try:
df = pd.read_excel(file_path)
except pd.errors.EmptyDataError:
raise ValueError(f"Excel file is empty: {file_path}")
except pd.errors.ParserError:
raise ValueError(f"Invalid Excel file format or corrupted file: {file_path}")
except Exception as e:
raise ValueError(f"Failed to read Excel file: {str(e)}")
# Check if any data exists
if df.empty:
raise ValueError(f"Excel file contains no data: {file_path}")
# Find name column - check for 'Name' column (case-insensitive)
name_col = None
for col in df.columns:
if isinstance(col, str) and col.lower() == 'name':
name_col = col
break
if not name_col:
raise ValueError(f"Excel file missing required 'Name' column")
# Find control panel column (if it exists) - check for any column containing "control" or "panel" (case-insensitive)
control_panel_col = None
for col in df.columns:
if isinstance(col, str) and ('control' in col.lower() or 'panel' in col.lower()):
control_panel_col = col
break
# Extract names and control panels, ignoring NaN values
result = []
for _, row in df.iterrows():
name = row.get(name_col)
if pd.isna(name):
continue
# Get Control Panel value, default to empty string if missing
control_panel = ""
if control_panel_col and control_panel_col in df.columns:
control_panel = row.get(control_panel_col, "")
# Handle NaN values
if pd.isna(control_panel):
control_panel = ""
# Store as dictionary with name and control_panel keys
result.append({
"name": str(name).strip(),
"control_panel": str(control_panel).strip()
})
# Check if we got any names
if not result:
raise ValueError(f"No valid names found in Excel file: {file_path}")
return result
except pd.errors.EmptyDataError:
raise ValueError(f"Excel file is empty: {file_path}")
except pd.errors.ParserError:
raise ValueError(f"Invalid Excel file format: {file_path}")
except Exception as e:
error_msg = f"Failed to load names from Excel file: {str(e)}"
raise ValueError(error_msg)
def normalize_names(names_with_panels):
"""
Normalize a list of name dictionaries for consistent comparison.
Normalization includes:
- Stripping whitespace
- Converting to uppercase
- Removing duplicates
Args:
names_with_panels (list): List of dictionaries containing 'name' and 'control_panel'
Returns:
tuple: (normalized_list, name_mapping) where:
- normalized_list is the list of normalized name dictionaries
- name_mapping is a dict mapping normalized names to original info
Raises:
ValueError: If input is not a valid list
"""
try:
if not isinstance(names_with_panels, list):
raise ValueError("Input must be a list of name dictionaries")
normalized = []
name_mapping = {} # Map normalized names to original names and control panels
for item in names_with_panels:
if not isinstance(item, dict) or 'name' not in item:
continue
name = item['name']
control_panel = item.get('control_panel', '')
if name is None:
continue
# Convert to string if not already
if not isinstance(name, str):
name = str(name)
# Preserve the original name exactly as it appears in the source
original_name = name.strip()
# Apply normalization steps
normalized_name = original_name.upper()
normalized.append({
'name': normalized_name,
'control_panel': control_panel
})
# Store the mapping (use normalized name as key)
name_mapping[normalized_name] = {
'original_name': original_name,
'control_panel': control_panel
}
# Remove duplicates while preserving order
normalized_unique = []
seen = set()
for item in normalized:
name = item['name']
if name not in seen and name: # Skip empty strings
seen.add(name)
normalized_unique.append(item)
return normalized_unique, name_mapping
except Exception as e:
error_msg = f"Failed to normalize names: {str(e)}"
raise ValueError(error_msg)
def compare_name_lists(list1, list2):
"""
Compare two lists of name dictionaries and identify differences.
Args:
list1 (list): First list of name dictionaries
list2 (list): Second list of name dictionaries
Returns:
dict: Dictionary containing:
- 'only_in_list1': Items in list1 but not in list2
- 'only_in_list2': Items in list2 but not in list1
- 'common': Items present in both lists
"""
try:
if not isinstance(list1, list) or not isinstance(list2, list):
raise ValueError("Both inputs must be lists")
# Extract just the names for set operations
names1 = {item['name'] for item in list1}
names2 = {item['name'] for item in list2}
# Find names unique to each list and common names
only_in_list1_names = names1 - names2
only_in_list2_names = names2 - names1
common_names = names1 & names2
# Build result lists with full item info
only_in_list1 = [item for item in list1 if item['name'] in only_in_list1_names]
only_in_list2 = [item for item in list2 if item['name'] in only_in_list2_names]
# For common items, we need to merge control panel info from both lists
common_items = []
# Create lookup dictionaries for faster access
list1_dict = {item['name']: item for item in list1}
list2_dict = {item['name']: item for item in list2}
for name in common_names:
item1 = list1_dict[name]
item2 = list2_dict[name]
# Use control panel from list1 if available, otherwise from list2
control_panel = item1.get('control_panel') or item2.get('control_panel', '')
common_items.append({
'name': name,
'control_panel': control_panel
})
# Sort results for consistent output
only_in_list1.sort(key=lambda x: x['name'])
only_in_list2.sort(key=lambda x: x['name'])
common_items.sort(key=lambda x: x['name'])
# Return comparison results
return {
'only_in_list1': only_in_list1,
'only_in_list2': only_in_list2,
'common': common_items
}
except Exception as e:
error_msg = f"Failed to compare name lists: {str(e)}"
raise ValueError(error_msg)
def force_update_repo(repo_id):
"""
Force update a repository regardless of which route is being accessed.
Args:
repo_id (str): ID of the repository to update
Returns:
bool: True if update was successful, False otherwise
"""
try:
# Find repository path
repo_path = os.path.join(app.config['CLONES_FOLDER'], repo_id)
if not os.path.exists(repo_path) or not os.path.exists(os.path.join(repo_path, '.git')):
return False
# Try to pull from main branch
try:
subprocess.check_call(['git', '-C', repo_path, 'pull', 'origin', 'main'],
timeout=30)
except subprocess.CalledProcessError:
# Try master branch if main fails
try:
subprocess.check_call(['git', '-C', repo_path, 'pull', 'origin', 'master'],
timeout=30)
except subprocess.CalledProcessError:
return False
return True
except Exception:
return False
# Routes
@app.route('/')
def index():
"""
Render the homepage with the upload form.
Returns:
str: Rendered HTML template
"""
log_info(f"Accessed index page - Request: {request.method} {request.path}")
# Check if there are any previous results to show
has_previous_results = len(SHARED_DATA['comparisons']) > 0
last_update_time = SHARED_DATA['last_update_time']
comparisons = SHARED_DATA['comparisons']
return render_template('index.html',
has_previous_results=has_previous_results,
last_update_time=last_update_time,
comparisons=comparisons)
@app.route('/compare', methods=['POST'])
def compare():
"""
Handle form submission, process data, and display comparison results.
Returns:
str: Rendered HTML template with comparison results
"""
log_info(f"Compare route accessed - IP: {request.remote_addr}")
log_debug(f"Request method: {request.method}")
log_debug(f"Request content type: {request.content_type}")
log_debug(f"Request headers: {dict(request.headers)}")
try:
# Log request details
log_debug(f"Form data keys: {list(request.form.keys())}")
log_debug(f"Files keys: {list(request.files.keys())}")
if not request.form:
log_error("No form data received")
flash('No form data received. Please try again.', 'danger')
return redirect(url_for('index'))
# Check for CSRF token
if 'csrf_token' not in request.form:
log_error("CSRF token missing from form data")
flash('CSRF token missing. Please refresh the page and try again.', 'danger')
return redirect(url_for('index'))
# Extract repo URL and validate
repo_url = request.form.get('repo_url')
if not repo_url:
log_error("Repository URL is missing from form data")
flash('Repository URL is required', 'danger')
return redirect(url_for('index'))
log_info(f"Processing comparison with repo URL: {repo_url}")
# Handle file uploads
if 'manifest_file' not in request.files or 'dwg_file' not in request.files:
missing_files = []
if 'manifest_file' not in request.files:
missing_files.append('manifest_file')
if 'dwg_file' not in request.files:
missing_files.append('dwg_file')
log_error(f"Missing required files: {', '.join(missing_files)}")
flash(f'Missing required files: {", ".join(missing_files)}', 'danger')
return redirect(url_for('index'))
manifest_file = request.files['manifest_file']
dwg_file = request.files['dwg_file']
log_debug(f"Manifest filename: {manifest_file.filename}")
log_debug(f"DWG filename: {dwg_file.filename}")
# Check for empty filenames
if not manifest_file.filename or not dwg_file.filename:
log_error(f"Empty filenames - Manifest: '{manifest_file.filename}', DWG: '{dwg_file.filename}'")
if not manifest_file.filename:
flash("No manifest file selected", 'danger')
if not dwg_file.filename:
flash("No DWG file selected", 'danger')
return redirect(url_for('index'))
# Validate uploaded files
is_valid, error_msg = validate_excel_file(manifest_file)
if not is_valid:
log_error(f"Manifest file validation error: {error_msg}")
flash(f"Manifest file error: {error_msg}", 'danger')
return redirect(url_for('index'))
is_valid, error_msg = validate_excel_file(dwg_file)
if not is_valid:
log_error(f"DWG file validation error: {error_msg}")
flash(f"DWG file error: {error_msg}", 'danger')
return redirect(url_for('index'))
# Generate unique ID for this comparison
comparison_id = str(uuid.uuid4())
log_info(f"Created comparison ID: {comparison_id}")
# Create folder for this comparison
comparison_folder = os.path.join(app.config['UPLOAD_FOLDER'], comparison_id)
os.makedirs(comparison_folder, exist_ok=True)
log_debug(f"Created comparison folder: {comparison_folder}")
# Save uploaded files with shared permissions
manifest_path = os.path.join(comparison_folder, 'manifest.xlsx')
dwg_path = os.path.join(comparison_folder, 'dwg.xlsx')
try:
manifest_file.save(manifest_path)
log_debug(f"Saved manifest file to: {manifest_path}")
except Exception as e:
log_error(f"Failed to save manifest file: {str(e)}", exc_info=True)
flash(f"Failed to save manifest file: {str(e)}", 'danger')
return redirect(url_for('index'))
try:
dwg_file.save(dwg_path)
log_debug(f"Saved DWG file to: {dwg_path}")
except Exception as e:
log_error(f"Failed to save DWG file: {str(e)}", exc_info=True)
flash(f"Failed to save DWG file: {str(e)}", 'danger')
return redirect(url_for('index'))
# Set permissions on uploaded files
try:
os.chmod(manifest_path, 0o666)
os.chmod(dwg_path, 0o666)
except Exception as e:
log_error(f"Could not set permissions on uploaded files: {str(e)}")
# Clone or update repository
try:
repo_path = create_or_update_repo(repo_url)
log_info(f"Repository path: {repo_path}")
except ValueError as e:
log_error(f"Repository error: {str(e)}")
flash(str(e), 'danger')
return redirect(url_for('index'))
# Load data from all sources
try:
log_info("Loading SCADA names from repository")
scada_names = load_scada_names(repo_path)
log_debug(f"Loaded {len(scada_names)} SCADA names")
log_info("Loading manifest names from Excel")
manifest_names = load_excel_names(manifest_path)
log_debug(f"Loaded {len(manifest_names)} manifest names")
log_info("Loading DWG names from Excel")
dwg_names = load_excel_names(dwg_path)
log_debug(f"Loaded {len(dwg_names)} DWG names")
except ValueError as e:
log_error(f"Error loading data: {str(e)}")
flash(str(e), 'danger')
return redirect(url_for('index'))
# Normalize names for consistent comparison and get name mappings
normalized_scada, scada_mapping = normalize_names(scada_names)
log_debug(f"Normalized SCADA names: {len(normalized_scada)}")
normalized_manifest, manifest_mapping = normalize_names(manifest_names)
log_debug(f"Normalized manifest names: {len(normalized_manifest)}")
normalized_dwg, dwg_mapping = normalize_names(dwg_names)
log_debug(f"Normalized DWG names: {len(normalized_dwg)}")
# Generate repo_id for future updates
repo_id = hashlib.sha1(repo_url.encode()).hexdigest()
# Compare all combinations
log_info("Comparing normalized name lists")
scada_vs_manifest = compare_name_lists(normalized_scada, normalized_manifest)
scada_vs_dwg = compare_name_lists(normalized_scada, normalized_dwg)
manifest_vs_dwg = compare_name_lists(normalized_manifest, normalized_dwg)
# Prepare comparison data for the template
comparison_data = {
'scada_vs_manifest': {
'only_in_scada': scada_vs_manifest['only_in_list1'],
'only_in_manifest': scada_vs_manifest['only_in_list2'],
'common': scada_vs_manifest['common'],
'scada_count': len(normalized_scada),
'manifest_count': len(normalized_manifest)
},
'scada_vs_dwg': {
'only_in_scada': scada_vs_dwg['only_in_list1'],
'only_in_dwg': scada_vs_dwg['only_in_list2'],
'common': scada_vs_dwg['common'],
'scada_count': len(normalized_scada),
'dwg_count': len(normalized_dwg)
},
'manifest_vs_dwg': {
'only_in_manifest': manifest_vs_dwg['only_in_list1'],
'only_in_dwg': manifest_vs_dwg['only_in_list2'],
'common': manifest_vs_dwg['common'],
'manifest_count': len(normalized_manifest),
'dwg_count': len(normalized_dwg)
},
'name_mappings': {
'scada': scada_mapping,
'manifest': manifest_mapping,
'dwg': dwg_mapping
},
'repo_id': repo_id,
'repository_url': repo_url,
'comparison_id': comparison_id,
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
'name': f"Comparison {time.strftime('%Y-%m-%d %H:%M:%S')}"
}
# Update shared data for all users
global SHARED_DATA
SHARED_DATA['comparisons'][comparison_id] = comparison_data
SHARED_DATA['latest_comparison_id'] = comparison_id
SHARED_DATA['last_update_time'] = time.strftime('%Y-%m-%d %H:%M:%S')
log_info(f"Comparison completed successfully - ID: {comparison_id}")
return render_template('results.html', data=comparison_data, comparisons=SHARED_DATA['comparisons'])
except ValueError as e:
log_error(f"ValueError during comparison: {str(e)}")
flash(str(e), 'danger')
return redirect(url_for('index'))
except Exception as e:
log_error(f"Unexpected error during comparison: {str(e)}", exc_info=True)
flash(f"An unexpected error occurred: {str(e)}", 'danger')
return redirect(url_for('index'))
# Custom error handler for CSRFs
@app.errorhandler(400)
def handle_csrf_error(e):
log_error(f"400 error: {str(e)}")
log_debug(f"Form data at error: {dict(request.form) if request.form else 'No form data'}")
log_debug(f"Request URL: {request.url}")
log_debug(f"Request method: {request.method}")
log_debug(f"Request headers: {dict(request.headers)}")
flash("Your form submission failed. This could be due to an expired session. Please try again.", "danger")
return redirect(url_for('index'))
@app.errorhandler(413)
def request_entity_too_large(e):
log_error(f"413 error: File too large")
flash("The uploaded file is too large. Maximum size is 50 MB.", "danger")
return redirect(url_for('index'))
@app.route('/latest')
def latest_results():
"""
Show the latest comparison results from any user.
Always update the repository before displaying results.
Returns:
str: Rendered HTML template with the latest comparison results
"""
if not SHARED_DATA['latest_comparison_id'] or len(SHARED_DATA['comparisons']) == 0:
flash('No comparison results available yet. Please upload files to generate results.', 'info')
return redirect(url_for('index'))
comparison_id = SHARED_DATA['latest_comparison_id']
data = SHARED_DATA['comparisons'][comparison_id]
# Update repository and reload data if possible
repo_id = data.get('repo_id')
if repo_id:
repository_updated = force_update_repo(repo_id)
if repository_updated:
# Get files path
comparison_folder = os.path.join(app.config['UPLOAD_FOLDER'], comparison_id)
manifest_path = os.path.join(comparison_folder, 'manifest.xlsx')
dwg_path = os.path.join(comparison_folder, 'dwg.xlsx')
if os.path.exists(manifest_path) and os.path.exists(dwg_path):
try:
# Repository path
repo_path = os.path.join(app.config['CLONES_FOLDER'], repo_id)
# Reload data
scada_names = load_scada_names(repo_path)
manifest_names = load_excel_names(manifest_path)
dwg_names = load_excel_names(dwg_path)
# Normalize names
normalized_scada, scada_mapping = normalize_names(scada_names)
normalized_manifest, manifest_mapping = normalize_names(manifest_names)
normalized_dwg, dwg_mapping = normalize_names(dwg_names)
# Compare data
scada_vs_manifest = compare_name_lists(normalized_scada, normalized_manifest)
scada_vs_dwg = compare_name_lists(normalized_scada, normalized_dwg)
manifest_vs_dwg = compare_name_lists(normalized_manifest, normalized_dwg)
# Update comparison data
data = {
'scada_vs_manifest': {
'only_in_scada': scada_vs_manifest['only_in_list1'],
'only_in_manifest': scada_vs_manifest['only_in_list2'],
'common': scada_vs_manifest['common'],
'scada_count': len(normalized_scada),
'manifest_count': len(normalized_manifest)
},
'scada_vs_dwg': {
'only_in_scada': scada_vs_dwg['only_in_list1'],
'only_in_dwg': scada_vs_dwg['only_in_list2'],
'common': scada_vs_dwg['common'],
'scada_count': len(normalized_scada),
'dwg_count': len(normalized_dwg)
},
'manifest_vs_dwg': {
'only_in_manifest': manifest_vs_dwg['only_in_list1'],
'only_in_dwg': manifest_vs_dwg['only_in_list2'],
'common': manifest_vs_dwg['common'],
'manifest_count': len(normalized_manifest),
'dwg_count': len(normalized_dwg)
},
'name_mappings': {
'scada': scada_mapping,
'manifest': manifest_mapping,
'dwg': dwg_mapping
},
'repo_id': repo_id,
'repository_url': data['repository_url'],
'comparison_id': comparison_id,
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
'name': data['name']
}
# Update shared data
SHARED_DATA['comparisons'][comparison_id] = data
SHARED_DATA['last_update_time'] = time.strftime('%Y-%m-%d %H:%M:%S')
flash('Repository has been updated and data has been reloaded with the latest changes.', 'success')
except Exception as e:
flash(f'Repository updated but error reloading data: {str(e)}', 'warning')
else:
flash('Repository has been updated with the latest changes, but data files not found.', 'warning')
return render_template('results.html', data=data, comparisons=SHARED_DATA['comparisons'])
@app.route('/comparison/<comparison_id>')
def view_comparison(comparison_id):
"""
View a specific comparison by ID.
Always update the repository before displaying results.
Args:
comparison_id: ID of the comparison to view
Returns:
str: Rendered HTML template with the comparison results
"""
if comparison_id not in SHARED_DATA['comparisons']:
flash('Comparison not found. It may have been deleted.', 'warning')
return redirect(url_for('index'))
data = SHARED_DATA['comparisons'][comparison_id]
# Update repository and reload data if possible
repo_id = data.get('repo_id')
if repo_id:
repository_updated = force_update_repo(repo_id)
if repository_updated:
# Get files path
comparison_folder = os.path.join(app.config['UPLOAD_FOLDER'], comparison_id)
manifest_path = os.path.join(comparison_folder, 'manifest.xlsx')
dwg_path = os.path.join(comparison_folder, 'dwg.xlsx')
if os.path.exists(manifest_path) and os.path.exists(dwg_path):
try:
# Repository path
repo_path = os.path.join(app.config['CLONES_FOLDER'], repo_id)
# Reload data
scada_names = load_scada_names(repo_path)
manifest_names = load_excel_names(manifest_path)
dwg_names = load_excel_names(dwg_path)
# Normalize names
normalized_scada, scada_mapping = normalize_names(scada_names)
normalized_manifest, manifest_mapping = normalize_names(manifest_names)
normalized_dwg, dwg_mapping = normalize_names(dwg_names)
# Compare data
scada_vs_manifest = compare_name_lists(normalized_scada, normalized_manifest)
scada_vs_dwg = compare_name_lists(normalized_scada, normalized_dwg)
manifest_vs_dwg = compare_name_lists(normalized_manifest, normalized_dwg)
# Update comparison data
data = {
'scada_vs_manifest': {
'only_in_scada': scada_vs_manifest['only_in_list1'],
'only_in_manifest': scada_vs_manifest['only_in_list2'],
'common': scada_vs_manifest['common'],
'scada_count': len(normalized_scada),
'manifest_count': len(normalized_manifest)
},
'scada_vs_dwg': {
'only_in_scada': scada_vs_dwg['only_in_list1'],
'only_in_dwg': scada_vs_dwg['only_in_list2'],
'common': scada_vs_dwg['common'],
'scada_count': len(normalized_scada),
'dwg_count': len(normalized_dwg)
},
'manifest_vs_dwg': {
'only_in_manifest': manifest_vs_dwg['only_in_list1'],
'only_in_dwg': manifest_vs_dwg['only_in_list2'],
'common': manifest_vs_dwg['common'],
'manifest_count': len(normalized_manifest),
'dwg_count': len(normalized_dwg)
},
'name_mappings': {
'scada': scada_mapping,
'manifest': manifest_mapping,
'dwg': dwg_mapping
},
'repo_id': repo_id,
'repository_url': data['repository_url'],
'comparison_id': comparison_id,
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
'name': data['name']
}
# Update shared data
SHARED_DATA['comparisons'][comparison_id] = data
SHARED_DATA['last_update_time'] = time.strftime('%Y-%m-%d %H:%M:%S')
flash('Repository has been updated and data has been reloaded with the latest changes.', 'success')
except Exception as e:
flash(f'Repository updated but error reloading data: {str(e)}', 'warning')
else:
flash('Repository has been updated with the latest changes, but data files not found.', 'warning')
return render_template('results.html', data=data, comparisons=SHARED_DATA['comparisons'])
@app.route('/update_files', methods=['GET', 'POST'])
def update_files():
"""
Handle re-upload of files for an existing comparison.
Returns:
str: Rendered HTML template with updated comparison results
"""
# If GET request, redirect to index
if request.method == 'GET':
return redirect(url_for('index'))
try:
# Extract repo_id and comparison_id
repo_id = request.form.get('repo_id')
comparison_id = request.form.get('comparison_id')
if not repo_id:
flash('Repository ID is required for updates', 'danger')
return redirect(url_for('index'))
# Check if repository exists
repo_path = os.path.join(app.config['CLONES_FOLDER'], repo_id)
if not os.path.exists(repo_path):
flash('Repository not found. Please start a new comparison.', 'danger')
return redirect(url_for('index'))
# Create a new comparison ID if not provided
if not comparison_id or comparison_id not in SHARED_DATA['comparisons']:
comparison_id = str(uuid.uuid4())
comparison_folder = os.path.join(app.config['UPLOAD_FOLDER'], comparison_id)
os.makedirs(comparison_folder, exist_ok=True)
else:
comparison_folder = os.path.join(app.config['UPLOAD_FOLDER'], comparison_id)
if not os.path.exists(comparison_folder):
os.makedirs(comparison_folder, exist_ok=True)
# Handle file uploads
manifest_path = os.path.join(comparison_folder, 'manifest.xlsx')
dwg_path = os.path.join(comparison_folder, 'dwg.xlsx')
# Check if new manifest file was uploaded
if 'manifest_file' in request.files and request.files['manifest_file'].filename != '':
manifest_file = request.files['manifest_file']
# Validate uploaded file
is_valid, error_msg = validate_excel_file(manifest_file)
if not is_valid:
flash(f"Manifest file error: {error_msg}", 'danger')
return redirect(url_for('index'))
manifest_file.save(manifest_path)
elif not os.path.exists(manifest_path):
flash('Manifest file not found. Please upload a manifest file.', 'danger')
return redirect(url_for('index'))
# Check if new DWG file was uploaded
if 'dwg_file' in request.files and request.files['dwg_file'].filename != '':
dwg_file = request.files['dwg_file']
# Validate uploaded file
is_valid, error_msg = validate_excel_file(dwg_file)
if not is_valid:
flash(f"DWG file error: {error_msg}", 'danger')
return redirect(url_for('index'))
dwg_file.save(dwg_path)
elif not os.path.exists(dwg_path):
flash('DWG file not found. Please upload a DWG file.', 'danger')
return redirect(url_for('index'))
# Extract repository_url from existing comparison or use default
repository_url = SHARED_DATA['comparisons'].get(comparison_id, {}).get('repository_url', 'Unknown Repository')
# Load data from all sources
scada_names = load_scada_names(repo_path)
manifest_names = load_excel_names(manifest_path)
dwg_names = load_excel_names(dwg_path)
# Normalize names for consistent comparison and get name mappings
normalized_scada, scada_mapping = normalize_names(scada_names)
normalized_manifest, manifest_mapping = normalize_names(manifest_names)
normalized_dwg, dwg_mapping = normalize_names(dwg_names)
# Compare all combinations
scada_vs_manifest = compare_name_lists(normalized_scada, normalized_manifest)
scada_vs_dwg = compare_name_lists(normalized_scada, normalized_dwg)
manifest_vs_dwg = compare_name_lists(normalized_manifest, normalized_dwg)
# Prepare comparison data for the template
comparison_data = {
'scada_vs_manifest': {
'only_in_scada': scada_vs_manifest['only_in_list1'],
'only_in_manifest': scada_vs_manifest['only_in_list2'],
'common': scada_vs_manifest['common'],
'scada_count': len(normalized_scada),
'manifest_count': len(normalized_manifest)
},
'scada_vs_dwg': {
'only_in_scada': scada_vs_dwg['only_in_list1'],
'only_in_dwg': scada_vs_dwg['only_in_list2'],
'common': scada_vs_dwg['common'],
'scada_count': len(normalized_scada),
'dwg_count': len(normalized_dwg)
},
'manifest_vs_dwg': {
'only_in_manifest': manifest_vs_dwg['only_in_list1'],
'only_in_dwg': manifest_vs_dwg['only_in_list2'],
'common': manifest_vs_dwg['common'],
'manifest_count': len(normalized_manifest),
'dwg_count': len(normalized_dwg)
},
'name_mappings': {
'scada': scada_mapping,
'manifest': manifest_mapping,
'dwg': dwg_mapping
},
'repo_id': repo_id,
'repository_url': repository_url,
'comparison_id': comparison_id,
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
'name': SHARED_DATA['comparisons'].get(comparison_id, {}).get('name', f"Updated Comparison {time.strftime('%Y-%m-%d %H:%M:%S')}")
}
# Update shared data
SHARED_DATA['comparisons'][comparison_id] = comparison_data
SHARED_DATA['latest_comparison_id'] = comparison_id
SHARED_DATA['last_update_time'] = time.strftime('%Y-%m-%d %H:%M:%S')
flash('Comparison updated successfully', 'success')
return render_template('results.html', data=comparison_data, comparisons=SHARED_DATA['comparisons'])
except ValueError as e:
flash(str(e), 'danger')
return redirect(url_for('index'))
except Exception as e:
flash(f"An unexpected error occurred: {str(e)}", 'danger')
return redirect(url_for('index'))
@app.route('/rename_comparison/<comparison_id>', methods=['POST'])
def rename_comparison(comparison_id):
"""
Rename a comparison.
Args:
comparison_id: ID of the comparison to rename
Returns:
JSON response with success status
"""
if comparison_id not in SHARED_DATA['comparisons']:
return jsonify({'success': False, 'message': 'Comparison not found'})
name = request.form.get('name')
if not name:
return jsonify({'success': False, 'message': 'Name is required'})
SHARED_DATA['comparisons'][comparison_id]['name'] = name
return jsonify({'success': True})
@app.route('/delete_comparison/<comparison_id>', methods=['POST'])
def delete_comparison(comparison_id):
"""
Delete a comparison.
Args:
comparison_id: ID of the comparison to delete
Returns:
Redirect to homepage
"""
if comparison_id in SHARED_DATA['comparisons']:
# Delete comparison data
del SHARED_DATA['comparisons'][comparison_id]
# Delete comparison folder
comparison_folder = os.path.join(app.config['UPLOAD_FOLDER'], comparison_id)
if os.path.exists(comparison_folder):
try:
for file in os.listdir(comparison_folder):
os.remove(os.path.join(comparison_folder, file))
os.rmdir(comparison_folder)
except Exception as e:
print(f"Warning: Could not delete comparison folder: {str(e)}")
# Update latest comparison ID if needed
if SHARED_DATA['latest_comparison_id'] == comparison_id:
if SHARED_DATA['comparisons']:
SHARED_DATA['latest_comparison_id'] = list(SHARED_DATA['comparisons'].keys())[-1]
else:
SHARED_DATA['latest_comparison_id'] = None
flash('Comparison deleted successfully', 'success')
else:
flash('Comparison not found', 'warning')
return redirect(url_for('index'))
@app.route('/refresh_repository', methods=['POST'])
def refresh_repository():
"""
Handle repository refresh requests and reload data with the latest changes.
Returns:
str: Rendered HTML template with updated comparison results
"""
try:
# Extract repo_id and comparison_id
repo_id = request.form.get('repo_id')
comparison_id = request.form.get('comparison_id')
if not repo_id or not comparison_id:
flash('Repository ID and comparison ID are required', 'danger')
return redirect(url_for('index'))
if comparison_id not in SHARED_DATA['comparisons']:
flash('Comparison not found', 'danger')
return redirect(url_for('index'))
# Get the existing comparison data
comparison_data = SHARED_DATA['comparisons'][comparison_id]
# Get files path from existing comparison
comparison_folder = os.path.join(app.config['UPLOAD_FOLDER'], comparison_id)
manifest_path = os.path.join(comparison_folder, 'manifest.xlsx')
dwg_path = os.path.join(comparison_folder, 'dwg.xlsx')
# Check if files exist
if not os.path.exists(manifest_path) or not os.path.exists(dwg_path):
flash('Required files not found', 'danger')
return redirect(url_for('index'))
# Find repository path
repo_path = os.path.join(app.config['CLONES_FOLDER'], repo_id)
if not os.path.exists(repo_path) or not os.path.exists(os.path.join(repo_path, '.git')):
flash('Repository not found', 'danger')
return redirect(url_for('index'))
# Update the repository
try:
# Try to pull from main branch
try:
subprocess.check_call(['git', '-C', repo_path, 'pull', 'origin', 'main'],
timeout=30)
repository_updated = True
except subprocess.CalledProcessError:
# Try master branch if main fails
try:
subprocess.check_call(['git', '-C', repo_path, 'pull', 'origin', 'master'],
timeout=30)
repository_updated = True
except subprocess.CalledProcessError:
flash('Failed to update repository', 'warning')
repository_updated = False
except Exception as e:
flash(f'Error updating repository: {str(e)}', 'warning')
repository_updated = False
# Reload data from all sources
scada_names = load_scada_names(repo_path)
manifest_names = load_excel_names(manifest_path)
dwg_names = load_excel_names(dwg_path)
# Normalize names for consistent comparison and get name mappings
normalized_scada, scada_mapping = normalize_names(scada_names)
normalized_manifest, manifest_mapping = normalize_names(manifest_names)
normalized_dwg, dwg_mapping = normalize_names(dwg_names)
# Compare all combinations
scada_vs_manifest = compare_name_lists(normalized_scada, normalized_manifest)
scada_vs_dwg = compare_name_lists(normalized_scada, normalized_dwg)
manifest_vs_dwg = compare_name_lists(normalized_manifest, normalized_dwg)
# Prepare comparison data for the template
updated_comparison_data = {
'scada_vs_manifest': {
'only_in_scada': scada_vs_manifest['only_in_list1'],
'only_in_manifest': scada_vs_manifest['only_in_list2'],
'common': scada_vs_manifest['common'],
'scada_count': len(normalized_scada),
'manifest_count': len(normalized_manifest)
},
'scada_vs_dwg': {
'only_in_scada': scada_vs_dwg['only_in_list1'],
'only_in_dwg': scada_vs_dwg['only_in_list2'],
'common': scada_vs_dwg['common'],
'scada_count': len(normalized_scada),
'dwg_count': len(normalized_dwg)
},
'manifest_vs_dwg': {
'only_in_manifest': manifest_vs_dwg['only_in_list1'],
'only_in_dwg': manifest_vs_dwg['only_in_list2'],
'common': manifest_vs_dwg['common'],
'manifest_count': len(normalized_manifest),
'dwg_count': len(normalized_dwg)
},
'name_mappings': {
'scada': scada_mapping,
'manifest': manifest_mapping,
'dwg': dwg_mapping
},
'repo_id': repo_id,
'repository_url': comparison_data['repository_url'],
'comparison_id': comparison_id,
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
'name': comparison_data['name']
}
# Update shared data
SHARED_DATA['comparisons'][comparison_id] = updated_comparison_data
SHARED_DATA['last_update_time'] = time.strftime('%Y-%m-%d %H:%M:%S')
if repository_updated:
flash('Repository updated and data reloaded successfully', 'success')
else:
flash('Data reloaded with existing repository content', 'info')
return render_template('results.html', data=updated_comparison_data, comparisons=SHARED_DATA['comparisons'])
except ValueError as e:
flash(str(e), 'danger')
return redirect(url_for('index'))
except Exception as e:
flash(f"An unexpected error occurred: {str(e)}", 'danger')
return redirect(url_for('index'))
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)