1406 lines
58 KiB
Python
1406 lines
58 KiB
Python
import os
|
|
import hashlib
|
|
import subprocess
|
|
import json
|
|
import re
|
|
import time
|
|
import socket
|
|
import traceback
|
|
import sys
|
|
from pathlib import Path
|
|
import pandas as pd
|
|
from flask import Flask, render_template, request, url_for, redirect, flash, jsonify
|
|
from flask_wtf.csrf import CSRFProtect
|
|
import uuid
|
|
|
|
app = Flask(__name__)
|
|
app.config['SECRET_KEY'] = 'your-secret-key'
|
|
app.config['UPLOAD_FOLDER'] = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'shared_uploads')
|
|
app.config['CLONES_FOLDER'] = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'shared_clones')
|
|
app.config['MAX_CONTENT_LENGTH'] = 50 * 1024 * 1024 # 50 MB limit for uploads
|
|
|
|
# Initialize CSRF protection
|
|
csrf = CSRFProtect(app)
|
|
|
|
# Enhanced logging setup
|
|
def log_error(msg, exc_info=None):
|
|
"""Log error messages with stacktrace if provided"""
|
|
err_msg = f"ERROR: {msg}"
|
|
if exc_info:
|
|
err_msg += f"\n{traceback.format_exc()}"
|
|
print(err_msg, file=sys.stderr)
|
|
|
|
def log_info(msg):
|
|
"""Log informational messages"""
|
|
print(f"INFO: {msg}", file=sys.stdout)
|
|
|
|
def log_debug(msg):
|
|
"""Log debug messages"""
|
|
print(f"DEBUG: {msg}", file=sys.stdout)
|
|
|
|
# Log startup information
|
|
log_info(f"Flask app starting with UPLOAD_FOLDER: {app.config['UPLOAD_FOLDER']}")
|
|
log_info(f"CLONES_FOLDER: {app.config['CLONES_FOLDER']}")
|
|
|
|
# Ensure directories exist with appropriate permissions
|
|
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
|
|
os.makedirs(app.config['CLONES_FOLDER'], exist_ok=True)
|
|
|
|
# Set permissions to allow all users to read/write
|
|
try:
|
|
os.chmod(app.config['UPLOAD_FOLDER'], 0o777)
|
|
os.chmod(app.config['CLONES_FOLDER'], 0o777)
|
|
except Exception as e:
|
|
log_error(f"Could not set permissions on shared folders: {str(e)}")
|
|
|
|
# Shared data storage - stores all comparisons
|
|
SHARED_DATA = {
|
|
'comparisons': {}, # Dictionary of comparison_id -> comparison_data
|
|
'latest_comparison_id': None,
|
|
'last_update_time': None
|
|
}
|
|
|
|
def validate_repo_url(url):
|
|
"""
|
|
Validate a Git repository URL format.
|
|
|
|
Args:
|
|
url (str): Repository URL to validate
|
|
|
|
Returns:
|
|
bool: True if URL is valid, False otherwise
|
|
str: Error message if invalid, None if valid
|
|
"""
|
|
# Check for empty URL
|
|
if not url or not url.strip():
|
|
return False, "Repository URL is required"
|
|
|
|
# Pattern for Git HTTP/HTTPS URLs (including ports)
|
|
http_pattern = r'^https?:\/\/(?:[\w.-]+)(?::\d+)?\/[\w.-]+\/[\w.-]+(?:\.git)?$'
|
|
|
|
# Pattern for Git SSH URLs
|
|
ssh_pattern = r'^git@(?:[\w.-]+):[\w.-]+\/[\w.-]+(?:\.git)?$'
|
|
|
|
if re.match(http_pattern, url) or re.match(ssh_pattern, url):
|
|
return True, None
|
|
else:
|
|
return False, "Invalid Git repository URL format. Please use a valid HTTP/HTTPS or SSH URL."
|
|
|
|
def validate_excel_file(file):
|
|
"""
|
|
Validate uploaded Excel file.
|
|
|
|
Args:
|
|
file: File object from request.files
|
|
|
|
Returns:
|
|
bool: True if file is valid, False otherwise
|
|
str: Error message if invalid, None if valid
|
|
"""
|
|
# Check if file exists
|
|
if not file or file.filename == '':
|
|
return False, "No file selected"
|
|
|
|
# Check file extension
|
|
allowed_extensions = {'.xlsx', '.xls'}
|
|
file_ext = os.path.splitext(file.filename)[1].lower()
|
|
|
|
if file_ext not in allowed_extensions:
|
|
return False, f"Invalid file type. Please upload an Excel file (.xlsx or .xls)"
|
|
|
|
# Check file size (although this is also enforced by MAX_CONTENT_LENGTH)
|
|
if len(file.read()) > app.config['MAX_CONTENT_LENGTH']:
|
|
file.seek(0) # Reset file pointer after reading
|
|
return False, f"File too large. Maximum size is {app.config['MAX_CONTENT_LENGTH'] // (1024 * 1024)} MB"
|
|
|
|
file.seek(0) # Reset file pointer after reading
|
|
return True, None
|
|
|
|
def create_or_update_repo(repo_url):
|
|
"""
|
|
Clone or update a Git repository.
|
|
|
|
Args:
|
|
repo_url (str): URL of the Git repository
|
|
|
|
Returns:
|
|
str: Path to the cloned repository
|
|
|
|
Raises:
|
|
ValueError: If repository URL is invalid or operation fails
|
|
"""
|
|
# Validate repo URL
|
|
is_valid, error_msg = validate_repo_url(repo_url)
|
|
if not is_valid:
|
|
raise ValueError(error_msg)
|
|
|
|
try:
|
|
# Generate a secure repo_id from the URL using SHA1
|
|
repo_id = hashlib.sha1(repo_url.encode()).hexdigest()
|
|
repo_path = os.path.join(app.config['CLONES_FOLDER'], repo_id)
|
|
|
|
# Set timeout for Git operations (30 seconds)
|
|
timeout = 30
|
|
|
|
# Extract domain and port for connectivity check
|
|
domain = "github.com" # Default fallback
|
|
port = 443 # Default HTTPS port
|
|
|
|
if "://" in repo_url:
|
|
domain_part = repo_url.split("://")[1].split("/")[0]
|
|
if ":" in domain_part:
|
|
domain, port_str = domain_part.split(":", 1)
|
|
port = int(port_str)
|
|
else:
|
|
domain = domain_part
|
|
# Set default port based on protocol
|
|
port = 80 if repo_url.startswith("http://") else 443
|
|
elif "@" in repo_url and ":" in repo_url:
|
|
domain = repo_url.split("@")[1].split(":")[0]
|
|
port = 22 # Default SSH port
|
|
|
|
if os.path.exists(repo_path):
|
|
# Check if it's actually a Git repository
|
|
if not os.path.exists(os.path.join(repo_path, '.git')):
|
|
raise ValueError(f"Directory exists but is not a Git repository: {repo_path}")
|
|
|
|
# Repository already exists, perform a pull
|
|
try:
|
|
# Check for internet connectivity first
|
|
socket.create_connection((domain, port), timeout=5)
|
|
|
|
# Try to pull from main branch
|
|
try:
|
|
subprocess.check_call(['git', '-C', repo_path, 'pull', 'origin', 'main'],
|
|
timeout=timeout)
|
|
except subprocess.CalledProcessError:
|
|
# Try master branch if main fails
|
|
try:
|
|
subprocess.check_call(['git', '-C', repo_path, 'pull', 'origin', 'master'],
|
|
timeout=timeout)
|
|
except subprocess.CalledProcessError as e:
|
|
raise ValueError(f"Failed to pull from repository: {str(e)}")
|
|
except subprocess.TimeoutExpired:
|
|
raise ValueError(f"Git pull operation timed out after {timeout} seconds. Repository might be too large or network is slow.")
|
|
|
|
except socket.error:
|
|
raise ValueError(f"Network error: Cannot connect to {domain}:{port}. Please check your internet connection.")
|
|
except PermissionError:
|
|
raise ValueError(f"Permission denied: Cannot access the repository directory: {repo_path}")
|
|
else:
|
|
# Clone the repository
|
|
try:
|
|
# Check for internet connectivity first
|
|
socket.create_connection((domain, port), timeout=5)
|
|
|
|
# Attempt to clone the repository
|
|
try:
|
|
subprocess.check_call(['git', 'clone', repo_url, repo_path],
|
|
timeout=timeout)
|
|
except subprocess.CalledProcessError as e:
|
|
# Provide more detailed error messages for common issues
|
|
if "Authentication failed" in str(e):
|
|
raise ValueError("Authentication failed. The repository might be private or the credentials are invalid.")
|
|
elif "Repository not found" in str(e):
|
|
raise ValueError(f"Repository not found: {repo_url}. Please check the URL and try again.")
|
|
else:
|
|
raise ValueError(f"Failed to clone repository: {str(e)}")
|
|
except subprocess.TimeoutExpired:
|
|
raise ValueError(f"Git clone operation timed out after {timeout} seconds. Repository might be too large or network is slow.")
|
|
|
|
except socket.error:
|
|
raise ValueError(f"Network error: Cannot connect to {domain}:{port}. Please check your internet connection.")
|
|
except PermissionError:
|
|
raise ValueError(f"Permission denied: Cannot create the repository directory: {repo_path}")
|
|
|
|
# Verify that the repository contains JSON files
|
|
json_files = list(Path(repo_path).glob('**/*.json'))
|
|
if not json_files:
|
|
raise ValueError("The repository doesn't contain any JSON files. Please check the repository and try again.")
|
|
|
|
return repo_path
|
|
except subprocess.CalledProcessError as e:
|
|
error_msg = f"Git operation failed: {str(e)}"
|
|
raise ValueError(error_msg)
|
|
except Exception as e:
|
|
error_msg = f"Repository operation failed: {str(e)}"
|
|
raise ValueError(error_msg)
|
|
|
|
def extract_folder_name(file_path):
|
|
"""
|
|
Extract the control panel name from a JSON file path.
|
|
Example: "/path/Detailed-Views/MCM01 Fluid Inbound Merges 1-4/view.json" -> "MCM01 Fluid Inbound Merges"
|
|
|
|
Args:
|
|
file_path (str): Path to the JSON file
|
|
|
|
Returns:
|
|
str: Extracted control panel name, or empty string if not found
|
|
"""
|
|
try:
|
|
# Convert to Path object for easier path manipulation
|
|
path = Path(file_path)
|
|
|
|
# Check if it's in a Detailed-Views subfolder
|
|
parts = path.parts
|
|
detailed_views_index = -1
|
|
|
|
for i, part in enumerate(parts):
|
|
if part == "Detailed-Views":
|
|
detailed_views_index = i
|
|
break
|
|
|
|
if detailed_views_index >= 0 and detailed_views_index < len(parts) - 1:
|
|
# Get the folder name right after "Detailed-Views"
|
|
folder_name = parts[detailed_views_index + 1]
|
|
|
|
# Clean up the name - remove any numbering suffix like "1-4"
|
|
# This matches the format in the example: "MCM01 Fluid Inbound Merges 1-4" -> "MCM01 Fluid Inbound Merges"
|
|
clean_name = re.sub(r'\s+\d+-\d+$', '', folder_name)
|
|
return clean_name
|
|
|
|
return ""
|
|
except Exception:
|
|
return ""
|
|
|
|
def should_exclude_name(name):
|
|
"""
|
|
Check if a name should be excluded based on the presence of certain keywords.
|
|
|
|
Args:
|
|
name (str): The name to check
|
|
|
|
Returns:
|
|
bool: True if the name should be excluded, False otherwise
|
|
"""
|
|
exclude_terms = ['button', 'camera', 'line', 'end', 'image', 'label', 'embeddedview', 'root','flexcontainer','buton']
|
|
name_lower = name.lower()
|
|
|
|
for term in exclude_terms:
|
|
if term.lower() in name_lower:
|
|
return True
|
|
|
|
return False
|
|
|
|
def load_scada_names(repo_path):
|
|
"""
|
|
Find JSON files in the Detailed-Views folder and extract component names.
|
|
Names can be found in 'meta.name' fields at both the root level and in nested children.
|
|
Excludes names containing specific terms.
|
|
|
|
Args:
|
|
repo_path (str): Path to the repository
|
|
|
|
Returns:
|
|
list: List of dictionaries containing SCADA names and control panels
|
|
|
|
Raises:
|
|
ValueError: If operation fails
|
|
"""
|
|
try:
|
|
names_with_panels = []
|
|
repo_dir = Path(repo_path)
|
|
|
|
# Find JSON files only in Detailed-Views folder
|
|
json_files = list(repo_dir.glob('**/Detailed-Views/**/*.json'))
|
|
|
|
if not json_files:
|
|
print(f"Warning: No JSON files found in Detailed-Views folder at {repo_path}")
|
|
|
|
for json_file in json_files:
|
|
try:
|
|
with open(json_file, 'r') as f:
|
|
data = json.load(f)
|
|
|
|
# Extract control panel name from file path
|
|
control_panel = extract_folder_name(str(json_file))
|
|
|
|
# Extract names recursively from the JSON structure, now with control panel info
|
|
extract_names_recursive(data, names_with_panels, control_panel, visited=None)
|
|
|
|
except json.JSONDecodeError:
|
|
# Skip invalid JSON files
|
|
continue
|
|
except Exception as e:
|
|
# Skip files with other errors
|
|
continue
|
|
|
|
return names_with_panels
|
|
except Exception as e:
|
|
error_msg = f"Failed to load SCADA names: {str(e)}"
|
|
raise ValueError(error_msg)
|
|
|
|
def extract_names_recursive(obj, names_list, control_panel, visited=None):
|
|
"""
|
|
Recursively extract all 'meta.name' values from a nested JSON object.
|
|
Excludes names containing terms defined in should_exclude_name function.
|
|
|
|
Args:
|
|
obj: The JSON object or list to process
|
|
names_list: List to append found names to
|
|
control_panel: The control panel name extracted from file path
|
|
visited: Set of object ids already visited (to prevent infinite recursion)
|
|
"""
|
|
if visited is None:
|
|
visited = set()
|
|
|
|
# Skip already visited objects or non-container types
|
|
if not isinstance(obj, (dict, list)) or id(obj) in visited:
|
|
return
|
|
|
|
# Mark this object as visited
|
|
visited.add(id(obj))
|
|
|
|
if isinstance(obj, dict):
|
|
# Check if this object has a meta.name field
|
|
if 'meta' in obj and isinstance(obj['meta'], dict) and 'name' in obj['meta']:
|
|
name = obj['meta']['name']
|
|
if name and isinstance(name, str) and not should_exclude_name(name):
|
|
names_list.append({
|
|
"name": name,
|
|
"control_panel": control_panel
|
|
})
|
|
|
|
# Check for children array and process only this key specifically
|
|
if 'children' in obj and isinstance(obj['children'], list):
|
|
for child in obj['children']:
|
|
extract_names_recursive(child, names_list, control_panel, visited)
|
|
|
|
# Only process a few key dictionary values that might contain component definitions
|
|
keys_to_process = ['root', 'props', 'custom']
|
|
for key in keys_to_process:
|
|
if key in obj:
|
|
extract_names_recursive(obj[key], names_list, control_panel, visited)
|
|
|
|
elif isinstance(obj, list):
|
|
# Process only the first 1000 items to prevent excessive recursion
|
|
for item in obj[:1000]:
|
|
extract_names_recursive(item, names_list, control_panel, visited)
|
|
|
|
def load_excel_names(file_path):
|
|
"""
|
|
Extract names from an Excel file.
|
|
|
|
Args:
|
|
file_path (str): Path to the Excel file
|
|
|
|
Returns:
|
|
list: List of dictionaries containing names and control panels from the Excel file
|
|
|
|
Raises:
|
|
ValueError: If file doesn't exist, isn't a valid Excel file, or doesn't contain a "Name" column
|
|
"""
|
|
try:
|
|
# Check if file exists
|
|
if not os.path.exists(file_path):
|
|
raise ValueError(f"Excel file not found: {file_path}")
|
|
|
|
# Check file size before attempting to read
|
|
file_size_mb = os.path.getsize(file_path) / (1024 * 1024)
|
|
if file_size_mb > 50: # 50 MB limit
|
|
raise ValueError(f"Excel file too large ({file_size_mb:.1f} MB). Maximum size is 50 MB.")
|
|
|
|
# Try to read Excel file with a timeout
|
|
try:
|
|
df = pd.read_excel(file_path)
|
|
except pd.errors.EmptyDataError:
|
|
raise ValueError(f"Excel file is empty: {file_path}")
|
|
except pd.errors.ParserError:
|
|
raise ValueError(f"Invalid Excel file format or corrupted file: {file_path}")
|
|
except Exception as e:
|
|
raise ValueError(f"Failed to read Excel file: {str(e)}")
|
|
|
|
# Check if any data exists
|
|
if df.empty:
|
|
raise ValueError(f"Excel file contains no data: {file_path}")
|
|
|
|
# Find name column - check for 'Name' column (case-insensitive)
|
|
name_col = None
|
|
for col in df.columns:
|
|
if isinstance(col, str) and col.lower() == 'name':
|
|
name_col = col
|
|
break
|
|
|
|
if not name_col:
|
|
raise ValueError(f"Excel file missing required 'Name' column")
|
|
|
|
# Find control panel column (if it exists) - check for any column containing "control" or "panel" (case-insensitive)
|
|
control_panel_col = None
|
|
for col in df.columns:
|
|
if isinstance(col, str) and ('control' in col.lower() or 'panel' in col.lower()):
|
|
control_panel_col = col
|
|
break
|
|
|
|
# Extract names and control panels, ignoring NaN values
|
|
result = []
|
|
|
|
for _, row in df.iterrows():
|
|
name = row.get(name_col)
|
|
if pd.isna(name):
|
|
continue
|
|
|
|
# Get Control Panel value, default to empty string if missing
|
|
control_panel = ""
|
|
if control_panel_col and control_panel_col in df.columns:
|
|
control_panel = row.get(control_panel_col, "")
|
|
# Handle NaN values
|
|
if pd.isna(control_panel):
|
|
control_panel = ""
|
|
|
|
# Store as dictionary with name and control_panel keys
|
|
result.append({
|
|
"name": str(name).strip(),
|
|
"control_panel": str(control_panel).strip()
|
|
})
|
|
|
|
# Check if we got any names
|
|
if not result:
|
|
raise ValueError(f"No valid names found in Excel file: {file_path}")
|
|
|
|
return result
|
|
except pd.errors.EmptyDataError:
|
|
raise ValueError(f"Excel file is empty: {file_path}")
|
|
except pd.errors.ParserError:
|
|
raise ValueError(f"Invalid Excel file format: {file_path}")
|
|
except Exception as e:
|
|
error_msg = f"Failed to load names from Excel file: {str(e)}"
|
|
raise ValueError(error_msg)
|
|
|
|
def normalize_names(names_with_panels):
|
|
"""
|
|
Normalize a list of name dictionaries for consistent comparison.
|
|
|
|
Normalization includes:
|
|
- Stripping whitespace
|
|
- Converting to uppercase
|
|
- Removing duplicates
|
|
|
|
Args:
|
|
names_with_panels (list): List of dictionaries containing 'name' and 'control_panel'
|
|
|
|
Returns:
|
|
tuple: (normalized_list, name_mapping) where:
|
|
- normalized_list is the list of normalized name dictionaries
|
|
- name_mapping is a dict mapping normalized names to original info
|
|
|
|
Raises:
|
|
ValueError: If input is not a valid list
|
|
"""
|
|
try:
|
|
if not isinstance(names_with_panels, list):
|
|
raise ValueError("Input must be a list of name dictionaries")
|
|
|
|
normalized = []
|
|
name_mapping = {} # Map normalized names to original names and control panels
|
|
|
|
for item in names_with_panels:
|
|
if not isinstance(item, dict) or 'name' not in item:
|
|
continue
|
|
|
|
name = item['name']
|
|
control_panel = item.get('control_panel', '')
|
|
|
|
if name is None:
|
|
continue
|
|
|
|
# Convert to string if not already
|
|
if not isinstance(name, str):
|
|
name = str(name)
|
|
|
|
# Preserve the original name exactly as it appears in the source
|
|
original_name = name.strip()
|
|
|
|
# Apply normalization steps
|
|
normalized_name = original_name.upper()
|
|
|
|
normalized.append({
|
|
'name': normalized_name,
|
|
'control_panel': control_panel
|
|
})
|
|
|
|
# Store the mapping (use normalized name as key)
|
|
name_mapping[normalized_name] = {
|
|
'original_name': original_name,
|
|
'control_panel': control_panel
|
|
}
|
|
|
|
# Remove duplicates while preserving order
|
|
normalized_unique = []
|
|
seen = set()
|
|
for item in normalized:
|
|
name = item['name']
|
|
if name not in seen and name: # Skip empty strings
|
|
seen.add(name)
|
|
normalized_unique.append(item)
|
|
|
|
return normalized_unique, name_mapping
|
|
except Exception as e:
|
|
error_msg = f"Failed to normalize names: {str(e)}"
|
|
raise ValueError(error_msg)
|
|
|
|
def compare_name_lists(list1, list2):
|
|
"""
|
|
Compare two lists of name dictionaries and identify differences.
|
|
|
|
Args:
|
|
list1 (list): First list of name dictionaries
|
|
list2 (list): Second list of name dictionaries
|
|
|
|
Returns:
|
|
dict: Dictionary containing:
|
|
- 'only_in_list1': Items in list1 but not in list2
|
|
- 'only_in_list2': Items in list2 but not in list1
|
|
- 'common': Items present in both lists
|
|
"""
|
|
try:
|
|
if not isinstance(list1, list) or not isinstance(list2, list):
|
|
raise ValueError("Both inputs must be lists")
|
|
|
|
# Extract just the names for set operations
|
|
names1 = {item['name'] for item in list1}
|
|
names2 = {item['name'] for item in list2}
|
|
|
|
# Find names unique to each list and common names
|
|
only_in_list1_names = names1 - names2
|
|
only_in_list2_names = names2 - names1
|
|
common_names = names1 & names2
|
|
|
|
# Build result lists with full item info
|
|
only_in_list1 = [item for item in list1 if item['name'] in only_in_list1_names]
|
|
only_in_list2 = [item for item in list2 if item['name'] in only_in_list2_names]
|
|
|
|
# For common items, we need to merge control panel info from both lists
|
|
common_items = []
|
|
|
|
# Create lookup dictionaries for faster access
|
|
list1_dict = {item['name']: item for item in list1}
|
|
list2_dict = {item['name']: item for item in list2}
|
|
|
|
for name in common_names:
|
|
item1 = list1_dict[name]
|
|
item2 = list2_dict[name]
|
|
|
|
# Use control panel from list1 if available, otherwise from list2
|
|
control_panel = item1.get('control_panel') or item2.get('control_panel', '')
|
|
|
|
common_items.append({
|
|
'name': name,
|
|
'control_panel': control_panel
|
|
})
|
|
|
|
# Sort results for consistent output
|
|
only_in_list1.sort(key=lambda x: x['name'])
|
|
only_in_list2.sort(key=lambda x: x['name'])
|
|
common_items.sort(key=lambda x: x['name'])
|
|
|
|
# Return comparison results
|
|
return {
|
|
'only_in_list1': only_in_list1,
|
|
'only_in_list2': only_in_list2,
|
|
'common': common_items
|
|
}
|
|
except Exception as e:
|
|
error_msg = f"Failed to compare name lists: {str(e)}"
|
|
raise ValueError(error_msg)
|
|
|
|
def force_update_repo(repo_id):
|
|
"""
|
|
Force update a repository regardless of which route is being accessed.
|
|
|
|
Args:
|
|
repo_id (str): ID of the repository to update
|
|
|
|
Returns:
|
|
bool: True if update was successful, False otherwise
|
|
"""
|
|
try:
|
|
# Find repository path
|
|
repo_path = os.path.join(app.config['CLONES_FOLDER'], repo_id)
|
|
|
|
if not os.path.exists(repo_path) or not os.path.exists(os.path.join(repo_path, '.git')):
|
|
return False
|
|
|
|
# Try to pull from main branch
|
|
try:
|
|
subprocess.check_call(['git', '-C', repo_path, 'pull', 'origin', 'main'],
|
|
timeout=30)
|
|
except subprocess.CalledProcessError:
|
|
# Try master branch if main fails
|
|
try:
|
|
subprocess.check_call(['git', '-C', repo_path, 'pull', 'origin', 'master'],
|
|
timeout=30)
|
|
except subprocess.CalledProcessError:
|
|
return False
|
|
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
# Routes
|
|
@app.route('/')
|
|
def index():
|
|
"""
|
|
Render the homepage with the upload form.
|
|
|
|
Returns:
|
|
str: Rendered HTML template
|
|
"""
|
|
log_info(f"Accessed index page - Request: {request.method} {request.path}")
|
|
|
|
# Check if there are any previous results to show
|
|
has_previous_results = len(SHARED_DATA['comparisons']) > 0
|
|
last_update_time = SHARED_DATA['last_update_time']
|
|
comparisons = SHARED_DATA['comparisons']
|
|
|
|
return render_template('index.html',
|
|
has_previous_results=has_previous_results,
|
|
last_update_time=last_update_time,
|
|
comparisons=comparisons)
|
|
|
|
@app.route('/compare', methods=['POST'])
|
|
def compare():
|
|
"""
|
|
Handle form submission, process data, and display comparison results.
|
|
|
|
Returns:
|
|
str: Rendered HTML template with comparison results
|
|
"""
|
|
log_info(f"Compare route accessed - IP: {request.remote_addr}")
|
|
log_debug(f"Request method: {request.method}")
|
|
log_debug(f"Request content type: {request.content_type}")
|
|
log_debug(f"Request headers: {dict(request.headers)}")
|
|
|
|
try:
|
|
# Log request details
|
|
log_debug(f"Form data keys: {list(request.form.keys())}")
|
|
log_debug(f"Files keys: {list(request.files.keys())}")
|
|
|
|
if not request.form:
|
|
log_error("No form data received")
|
|
flash('No form data received. Please try again.', 'danger')
|
|
return redirect(url_for('index'))
|
|
|
|
# Check for CSRF token
|
|
if 'csrf_token' not in request.form:
|
|
log_error("CSRF token missing from form data")
|
|
flash('CSRF token missing. Please refresh the page and try again.', 'danger')
|
|
return redirect(url_for('index'))
|
|
|
|
# Extract repo URL and validate
|
|
repo_url = request.form.get('repo_url')
|
|
if not repo_url:
|
|
log_error("Repository URL is missing from form data")
|
|
flash('Repository URL is required', 'danger')
|
|
return redirect(url_for('index'))
|
|
|
|
log_info(f"Processing comparison with repo URL: {repo_url}")
|
|
|
|
# Handle file uploads
|
|
if 'manifest_file' not in request.files or 'dwg_file' not in request.files:
|
|
missing_files = []
|
|
if 'manifest_file' not in request.files:
|
|
missing_files.append('manifest_file')
|
|
if 'dwg_file' not in request.files:
|
|
missing_files.append('dwg_file')
|
|
log_error(f"Missing required files: {', '.join(missing_files)}")
|
|
flash(f'Missing required files: {", ".join(missing_files)}', 'danger')
|
|
return redirect(url_for('index'))
|
|
|
|
manifest_file = request.files['manifest_file']
|
|
dwg_file = request.files['dwg_file']
|
|
|
|
log_debug(f"Manifest filename: {manifest_file.filename}")
|
|
log_debug(f"DWG filename: {dwg_file.filename}")
|
|
|
|
# Check for empty filenames
|
|
if not manifest_file.filename or not dwg_file.filename:
|
|
log_error(f"Empty filenames - Manifest: '{manifest_file.filename}', DWG: '{dwg_file.filename}'")
|
|
if not manifest_file.filename:
|
|
flash("No manifest file selected", 'danger')
|
|
if not dwg_file.filename:
|
|
flash("No DWG file selected", 'danger')
|
|
return redirect(url_for('index'))
|
|
|
|
# Validate uploaded files
|
|
is_valid, error_msg = validate_excel_file(manifest_file)
|
|
if not is_valid:
|
|
log_error(f"Manifest file validation error: {error_msg}")
|
|
flash(f"Manifest file error: {error_msg}", 'danger')
|
|
return redirect(url_for('index'))
|
|
|
|
is_valid, error_msg = validate_excel_file(dwg_file)
|
|
if not is_valid:
|
|
log_error(f"DWG file validation error: {error_msg}")
|
|
flash(f"DWG file error: {error_msg}", 'danger')
|
|
return redirect(url_for('index'))
|
|
|
|
# Generate unique ID for this comparison
|
|
comparison_id = str(uuid.uuid4())
|
|
log_info(f"Created comparison ID: {comparison_id}")
|
|
|
|
# Create folder for this comparison
|
|
comparison_folder = os.path.join(app.config['UPLOAD_FOLDER'], comparison_id)
|
|
os.makedirs(comparison_folder, exist_ok=True)
|
|
log_debug(f"Created comparison folder: {comparison_folder}")
|
|
|
|
# Save uploaded files with shared permissions
|
|
manifest_path = os.path.join(comparison_folder, 'manifest.xlsx')
|
|
dwg_path = os.path.join(comparison_folder, 'dwg.xlsx')
|
|
|
|
try:
|
|
manifest_file.save(manifest_path)
|
|
log_debug(f"Saved manifest file to: {manifest_path}")
|
|
except Exception as e:
|
|
log_error(f"Failed to save manifest file: {str(e)}", exc_info=True)
|
|
flash(f"Failed to save manifest file: {str(e)}", 'danger')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
dwg_file.save(dwg_path)
|
|
log_debug(f"Saved DWG file to: {dwg_path}")
|
|
except Exception as e:
|
|
log_error(f"Failed to save DWG file: {str(e)}", exc_info=True)
|
|
flash(f"Failed to save DWG file: {str(e)}", 'danger')
|
|
return redirect(url_for('index'))
|
|
|
|
# Set permissions on uploaded files
|
|
try:
|
|
os.chmod(manifest_path, 0o666)
|
|
os.chmod(dwg_path, 0o666)
|
|
except Exception as e:
|
|
log_error(f"Could not set permissions on uploaded files: {str(e)}")
|
|
|
|
# Clone or update repository
|
|
try:
|
|
repo_path = create_or_update_repo(repo_url)
|
|
log_info(f"Repository path: {repo_path}")
|
|
except ValueError as e:
|
|
log_error(f"Repository error: {str(e)}")
|
|
flash(str(e), 'danger')
|
|
return redirect(url_for('index'))
|
|
|
|
# Load data from all sources
|
|
try:
|
|
log_info("Loading SCADA names from repository")
|
|
scada_names = load_scada_names(repo_path)
|
|
log_debug(f"Loaded {len(scada_names)} SCADA names")
|
|
|
|
log_info("Loading manifest names from Excel")
|
|
manifest_names = load_excel_names(manifest_path)
|
|
log_debug(f"Loaded {len(manifest_names)} manifest names")
|
|
|
|
log_info("Loading DWG names from Excel")
|
|
dwg_names = load_excel_names(dwg_path)
|
|
log_debug(f"Loaded {len(dwg_names)} DWG names")
|
|
except ValueError as e:
|
|
log_error(f"Error loading data: {str(e)}")
|
|
flash(str(e), 'danger')
|
|
return redirect(url_for('index'))
|
|
|
|
# Normalize names for consistent comparison and get name mappings
|
|
normalized_scada, scada_mapping = normalize_names(scada_names)
|
|
log_debug(f"Normalized SCADA names: {len(normalized_scada)}")
|
|
|
|
normalized_manifest, manifest_mapping = normalize_names(manifest_names)
|
|
log_debug(f"Normalized manifest names: {len(normalized_manifest)}")
|
|
|
|
normalized_dwg, dwg_mapping = normalize_names(dwg_names)
|
|
log_debug(f"Normalized DWG names: {len(normalized_dwg)}")
|
|
|
|
# Generate repo_id for future updates
|
|
repo_id = hashlib.sha1(repo_url.encode()).hexdigest()
|
|
|
|
# Compare all combinations
|
|
log_info("Comparing normalized name lists")
|
|
scada_vs_manifest = compare_name_lists(normalized_scada, normalized_manifest)
|
|
scada_vs_dwg = compare_name_lists(normalized_scada, normalized_dwg)
|
|
manifest_vs_dwg = compare_name_lists(normalized_manifest, normalized_dwg)
|
|
|
|
# Prepare comparison data for the template
|
|
comparison_data = {
|
|
'scada_vs_manifest': {
|
|
'only_in_scada': scada_vs_manifest['only_in_list1'],
|
|
'only_in_manifest': scada_vs_manifest['only_in_list2'],
|
|
'common': scada_vs_manifest['common'],
|
|
'scada_count': len(normalized_scada),
|
|
'manifest_count': len(normalized_manifest)
|
|
},
|
|
'scada_vs_dwg': {
|
|
'only_in_scada': scada_vs_dwg['only_in_list1'],
|
|
'only_in_dwg': scada_vs_dwg['only_in_list2'],
|
|
'common': scada_vs_dwg['common'],
|
|
'scada_count': len(normalized_scada),
|
|
'dwg_count': len(normalized_dwg)
|
|
},
|
|
'manifest_vs_dwg': {
|
|
'only_in_manifest': manifest_vs_dwg['only_in_list1'],
|
|
'only_in_dwg': manifest_vs_dwg['only_in_list2'],
|
|
'common': manifest_vs_dwg['common'],
|
|
'manifest_count': len(normalized_manifest),
|
|
'dwg_count': len(normalized_dwg)
|
|
},
|
|
'name_mappings': {
|
|
'scada': scada_mapping,
|
|
'manifest': manifest_mapping,
|
|
'dwg': dwg_mapping
|
|
},
|
|
'repo_id': repo_id,
|
|
'repository_url': repo_url,
|
|
'comparison_id': comparison_id,
|
|
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
|
|
'name': f"Comparison {time.strftime('%Y-%m-%d %H:%M:%S')}"
|
|
}
|
|
|
|
# Update shared data for all users
|
|
global SHARED_DATA
|
|
SHARED_DATA['comparisons'][comparison_id] = comparison_data
|
|
SHARED_DATA['latest_comparison_id'] = comparison_id
|
|
SHARED_DATA['last_update_time'] = time.strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
log_info(f"Comparison completed successfully - ID: {comparison_id}")
|
|
return render_template('results.html', data=comparison_data, comparisons=SHARED_DATA['comparisons'])
|
|
|
|
except ValueError as e:
|
|
log_error(f"ValueError during comparison: {str(e)}")
|
|
flash(str(e), 'danger')
|
|
return redirect(url_for('index'))
|
|
except Exception as e:
|
|
log_error(f"Unexpected error during comparison: {str(e)}", exc_info=True)
|
|
flash(f"An unexpected error occurred: {str(e)}", 'danger')
|
|
return redirect(url_for('index'))
|
|
|
|
# Custom error handler for CSRFs
|
|
@app.errorhandler(400)
|
|
def handle_csrf_error(e):
|
|
log_error(f"400 error: {str(e)}")
|
|
log_debug(f"Form data at error: {dict(request.form) if request.form else 'No form data'}")
|
|
log_debug(f"Request URL: {request.url}")
|
|
log_debug(f"Request method: {request.method}")
|
|
log_debug(f"Request headers: {dict(request.headers)}")
|
|
|
|
flash("Your form submission failed. This could be due to an expired session. Please try again.", "danger")
|
|
return redirect(url_for('index'))
|
|
|
|
@app.errorhandler(413)
|
|
def request_entity_too_large(e):
|
|
log_error(f"413 error: File too large")
|
|
flash("The uploaded file is too large. Maximum size is 50 MB.", "danger")
|
|
return redirect(url_for('index'))
|
|
|
|
@app.route('/latest')
|
|
def latest_results():
|
|
"""
|
|
Show the latest comparison results from any user.
|
|
Always update the repository before displaying results.
|
|
|
|
Returns:
|
|
str: Rendered HTML template with the latest comparison results
|
|
"""
|
|
if not SHARED_DATA['latest_comparison_id'] or len(SHARED_DATA['comparisons']) == 0:
|
|
flash('No comparison results available yet. Please upload files to generate results.', 'info')
|
|
return redirect(url_for('index'))
|
|
|
|
comparison_id = SHARED_DATA['latest_comparison_id']
|
|
data = SHARED_DATA['comparisons'][comparison_id]
|
|
|
|
# Update repository and reload data if possible
|
|
repo_id = data.get('repo_id')
|
|
if repo_id:
|
|
repository_updated = force_update_repo(repo_id)
|
|
if repository_updated:
|
|
# Get files path
|
|
comparison_folder = os.path.join(app.config['UPLOAD_FOLDER'], comparison_id)
|
|
manifest_path = os.path.join(comparison_folder, 'manifest.xlsx')
|
|
dwg_path = os.path.join(comparison_folder, 'dwg.xlsx')
|
|
|
|
if os.path.exists(manifest_path) and os.path.exists(dwg_path):
|
|
try:
|
|
# Repository path
|
|
repo_path = os.path.join(app.config['CLONES_FOLDER'], repo_id)
|
|
|
|
# Reload data
|
|
scada_names = load_scada_names(repo_path)
|
|
manifest_names = load_excel_names(manifest_path)
|
|
dwg_names = load_excel_names(dwg_path)
|
|
|
|
# Normalize names
|
|
normalized_scada, scada_mapping = normalize_names(scada_names)
|
|
normalized_manifest, manifest_mapping = normalize_names(manifest_names)
|
|
normalized_dwg, dwg_mapping = normalize_names(dwg_names)
|
|
|
|
# Compare data
|
|
scada_vs_manifest = compare_name_lists(normalized_scada, normalized_manifest)
|
|
scada_vs_dwg = compare_name_lists(normalized_scada, normalized_dwg)
|
|
manifest_vs_dwg = compare_name_lists(normalized_manifest, normalized_dwg)
|
|
|
|
# Update comparison data
|
|
data = {
|
|
'scada_vs_manifest': {
|
|
'only_in_scada': scada_vs_manifest['only_in_list1'],
|
|
'only_in_manifest': scada_vs_manifest['only_in_list2'],
|
|
'common': scada_vs_manifest['common'],
|
|
'scada_count': len(normalized_scada),
|
|
'manifest_count': len(normalized_manifest)
|
|
},
|
|
'scada_vs_dwg': {
|
|
'only_in_scada': scada_vs_dwg['only_in_list1'],
|
|
'only_in_dwg': scada_vs_dwg['only_in_list2'],
|
|
'common': scada_vs_dwg['common'],
|
|
'scada_count': len(normalized_scada),
|
|
'dwg_count': len(normalized_dwg)
|
|
},
|
|
'manifest_vs_dwg': {
|
|
'only_in_manifest': manifest_vs_dwg['only_in_list1'],
|
|
'only_in_dwg': manifest_vs_dwg['only_in_list2'],
|
|
'common': manifest_vs_dwg['common'],
|
|
'manifest_count': len(normalized_manifest),
|
|
'dwg_count': len(normalized_dwg)
|
|
},
|
|
'name_mappings': {
|
|
'scada': scada_mapping,
|
|
'manifest': manifest_mapping,
|
|
'dwg': dwg_mapping
|
|
},
|
|
'repo_id': repo_id,
|
|
'repository_url': data['repository_url'],
|
|
'comparison_id': comparison_id,
|
|
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
|
|
'name': data['name']
|
|
}
|
|
|
|
# Update shared data
|
|
SHARED_DATA['comparisons'][comparison_id] = data
|
|
SHARED_DATA['last_update_time'] = time.strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
flash('Repository has been updated and data has been reloaded with the latest changes.', 'success')
|
|
except Exception as e:
|
|
flash(f'Repository updated but error reloading data: {str(e)}', 'warning')
|
|
else:
|
|
flash('Repository has been updated with the latest changes, but data files not found.', 'warning')
|
|
|
|
return render_template('results.html', data=data, comparisons=SHARED_DATA['comparisons'])
|
|
|
|
@app.route('/comparison/<comparison_id>')
|
|
def view_comparison(comparison_id):
|
|
"""
|
|
View a specific comparison by ID.
|
|
Always update the repository before displaying results.
|
|
|
|
Args:
|
|
comparison_id: ID of the comparison to view
|
|
|
|
Returns:
|
|
str: Rendered HTML template with the comparison results
|
|
"""
|
|
if comparison_id not in SHARED_DATA['comparisons']:
|
|
flash('Comparison not found. It may have been deleted.', 'warning')
|
|
return redirect(url_for('index'))
|
|
|
|
data = SHARED_DATA['comparisons'][comparison_id]
|
|
|
|
# Update repository and reload data if possible
|
|
repo_id = data.get('repo_id')
|
|
if repo_id:
|
|
repository_updated = force_update_repo(repo_id)
|
|
if repository_updated:
|
|
# Get files path
|
|
comparison_folder = os.path.join(app.config['UPLOAD_FOLDER'], comparison_id)
|
|
manifest_path = os.path.join(comparison_folder, 'manifest.xlsx')
|
|
dwg_path = os.path.join(comparison_folder, 'dwg.xlsx')
|
|
|
|
if os.path.exists(manifest_path) and os.path.exists(dwg_path):
|
|
try:
|
|
# Repository path
|
|
repo_path = os.path.join(app.config['CLONES_FOLDER'], repo_id)
|
|
|
|
# Reload data
|
|
scada_names = load_scada_names(repo_path)
|
|
manifest_names = load_excel_names(manifest_path)
|
|
dwg_names = load_excel_names(dwg_path)
|
|
|
|
# Normalize names
|
|
normalized_scada, scada_mapping = normalize_names(scada_names)
|
|
normalized_manifest, manifest_mapping = normalize_names(manifest_names)
|
|
normalized_dwg, dwg_mapping = normalize_names(dwg_names)
|
|
|
|
# Compare data
|
|
scada_vs_manifest = compare_name_lists(normalized_scada, normalized_manifest)
|
|
scada_vs_dwg = compare_name_lists(normalized_scada, normalized_dwg)
|
|
manifest_vs_dwg = compare_name_lists(normalized_manifest, normalized_dwg)
|
|
|
|
# Update comparison data
|
|
data = {
|
|
'scada_vs_manifest': {
|
|
'only_in_scada': scada_vs_manifest['only_in_list1'],
|
|
'only_in_manifest': scada_vs_manifest['only_in_list2'],
|
|
'common': scada_vs_manifest['common'],
|
|
'scada_count': len(normalized_scada),
|
|
'manifest_count': len(normalized_manifest)
|
|
},
|
|
'scada_vs_dwg': {
|
|
'only_in_scada': scada_vs_dwg['only_in_list1'],
|
|
'only_in_dwg': scada_vs_dwg['only_in_list2'],
|
|
'common': scada_vs_dwg['common'],
|
|
'scada_count': len(normalized_scada),
|
|
'dwg_count': len(normalized_dwg)
|
|
},
|
|
'manifest_vs_dwg': {
|
|
'only_in_manifest': manifest_vs_dwg['only_in_list1'],
|
|
'only_in_dwg': manifest_vs_dwg['only_in_list2'],
|
|
'common': manifest_vs_dwg['common'],
|
|
'manifest_count': len(normalized_manifest),
|
|
'dwg_count': len(normalized_dwg)
|
|
},
|
|
'name_mappings': {
|
|
'scada': scada_mapping,
|
|
'manifest': manifest_mapping,
|
|
'dwg': dwg_mapping
|
|
},
|
|
'repo_id': repo_id,
|
|
'repository_url': data['repository_url'],
|
|
'comparison_id': comparison_id,
|
|
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
|
|
'name': data['name']
|
|
}
|
|
|
|
# Update shared data
|
|
SHARED_DATA['comparisons'][comparison_id] = data
|
|
SHARED_DATA['last_update_time'] = time.strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
flash('Repository has been updated and data has been reloaded with the latest changes.', 'success')
|
|
except Exception as e:
|
|
flash(f'Repository updated but error reloading data: {str(e)}', 'warning')
|
|
else:
|
|
flash('Repository has been updated with the latest changes, but data files not found.', 'warning')
|
|
|
|
return render_template('results.html', data=data, comparisons=SHARED_DATA['comparisons'])
|
|
|
|
@app.route('/update_files', methods=['GET', 'POST'])
|
|
def update_files():
|
|
"""
|
|
Handle re-upload of files for an existing comparison.
|
|
|
|
Returns:
|
|
str: Rendered HTML template with updated comparison results
|
|
"""
|
|
# If GET request, redirect to index
|
|
if request.method == 'GET':
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
# Extract repo_id and comparison_id
|
|
repo_id = request.form.get('repo_id')
|
|
comparison_id = request.form.get('comparison_id')
|
|
|
|
if not repo_id:
|
|
flash('Repository ID is required for updates', 'danger')
|
|
return redirect(url_for('index'))
|
|
|
|
# Check if repository exists
|
|
repo_path = os.path.join(app.config['CLONES_FOLDER'], repo_id)
|
|
if not os.path.exists(repo_path):
|
|
flash('Repository not found. Please start a new comparison.', 'danger')
|
|
return redirect(url_for('index'))
|
|
|
|
# Create a new comparison ID if not provided
|
|
if not comparison_id or comparison_id not in SHARED_DATA['comparisons']:
|
|
comparison_id = str(uuid.uuid4())
|
|
comparison_folder = os.path.join(app.config['UPLOAD_FOLDER'], comparison_id)
|
|
os.makedirs(comparison_folder, exist_ok=True)
|
|
else:
|
|
comparison_folder = os.path.join(app.config['UPLOAD_FOLDER'], comparison_id)
|
|
if not os.path.exists(comparison_folder):
|
|
os.makedirs(comparison_folder, exist_ok=True)
|
|
|
|
# Handle file uploads
|
|
manifest_path = os.path.join(comparison_folder, 'manifest.xlsx')
|
|
dwg_path = os.path.join(comparison_folder, 'dwg.xlsx')
|
|
|
|
# Check if new manifest file was uploaded
|
|
if 'manifest_file' in request.files and request.files['manifest_file'].filename != '':
|
|
manifest_file = request.files['manifest_file']
|
|
|
|
# Validate uploaded file
|
|
is_valid, error_msg = validate_excel_file(manifest_file)
|
|
if not is_valid:
|
|
flash(f"Manifest file error: {error_msg}", 'danger')
|
|
return redirect(url_for('index'))
|
|
|
|
manifest_file.save(manifest_path)
|
|
elif not os.path.exists(manifest_path):
|
|
flash('Manifest file not found. Please upload a manifest file.', 'danger')
|
|
return redirect(url_for('index'))
|
|
|
|
# Check if new DWG file was uploaded
|
|
if 'dwg_file' in request.files and request.files['dwg_file'].filename != '':
|
|
dwg_file = request.files['dwg_file']
|
|
|
|
# Validate uploaded file
|
|
is_valid, error_msg = validate_excel_file(dwg_file)
|
|
if not is_valid:
|
|
flash(f"DWG file error: {error_msg}", 'danger')
|
|
return redirect(url_for('index'))
|
|
|
|
dwg_file.save(dwg_path)
|
|
elif not os.path.exists(dwg_path):
|
|
flash('DWG file not found. Please upload a DWG file.', 'danger')
|
|
return redirect(url_for('index'))
|
|
|
|
# Extract repository_url from existing comparison or use default
|
|
repository_url = SHARED_DATA['comparisons'].get(comparison_id, {}).get('repository_url', 'Unknown Repository')
|
|
|
|
# Load data from all sources
|
|
scada_names = load_scada_names(repo_path)
|
|
manifest_names = load_excel_names(manifest_path)
|
|
dwg_names = load_excel_names(dwg_path)
|
|
|
|
# Normalize names for consistent comparison and get name mappings
|
|
normalized_scada, scada_mapping = normalize_names(scada_names)
|
|
normalized_manifest, manifest_mapping = normalize_names(manifest_names)
|
|
normalized_dwg, dwg_mapping = normalize_names(dwg_names)
|
|
|
|
# Compare all combinations
|
|
scada_vs_manifest = compare_name_lists(normalized_scada, normalized_manifest)
|
|
scada_vs_dwg = compare_name_lists(normalized_scada, normalized_dwg)
|
|
manifest_vs_dwg = compare_name_lists(normalized_manifest, normalized_dwg)
|
|
|
|
# Prepare comparison data for the template
|
|
comparison_data = {
|
|
'scada_vs_manifest': {
|
|
'only_in_scada': scada_vs_manifest['only_in_list1'],
|
|
'only_in_manifest': scada_vs_manifest['only_in_list2'],
|
|
'common': scada_vs_manifest['common'],
|
|
'scada_count': len(normalized_scada),
|
|
'manifest_count': len(normalized_manifest)
|
|
},
|
|
'scada_vs_dwg': {
|
|
'only_in_scada': scada_vs_dwg['only_in_list1'],
|
|
'only_in_dwg': scada_vs_dwg['only_in_list2'],
|
|
'common': scada_vs_dwg['common'],
|
|
'scada_count': len(normalized_scada),
|
|
'dwg_count': len(normalized_dwg)
|
|
},
|
|
'manifest_vs_dwg': {
|
|
'only_in_manifest': manifest_vs_dwg['only_in_list1'],
|
|
'only_in_dwg': manifest_vs_dwg['only_in_list2'],
|
|
'common': manifest_vs_dwg['common'],
|
|
'manifest_count': len(normalized_manifest),
|
|
'dwg_count': len(normalized_dwg)
|
|
},
|
|
'name_mappings': {
|
|
'scada': scada_mapping,
|
|
'manifest': manifest_mapping,
|
|
'dwg': dwg_mapping
|
|
},
|
|
'repo_id': repo_id,
|
|
'repository_url': repository_url,
|
|
'comparison_id': comparison_id,
|
|
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
|
|
'name': SHARED_DATA['comparisons'].get(comparison_id, {}).get('name', f"Updated Comparison {time.strftime('%Y-%m-%d %H:%M:%S')}")
|
|
}
|
|
|
|
# Update shared data
|
|
SHARED_DATA['comparisons'][comparison_id] = comparison_data
|
|
SHARED_DATA['latest_comparison_id'] = comparison_id
|
|
SHARED_DATA['last_update_time'] = time.strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
flash('Comparison updated successfully', 'success')
|
|
return render_template('results.html', data=comparison_data, comparisons=SHARED_DATA['comparisons'])
|
|
|
|
except ValueError as e:
|
|
flash(str(e), 'danger')
|
|
return redirect(url_for('index'))
|
|
except Exception as e:
|
|
flash(f"An unexpected error occurred: {str(e)}", 'danger')
|
|
return redirect(url_for('index'))
|
|
|
|
@app.route('/rename_comparison/<comparison_id>', methods=['POST'])
|
|
def rename_comparison(comparison_id):
|
|
"""
|
|
Rename a comparison.
|
|
|
|
Args:
|
|
comparison_id: ID of the comparison to rename
|
|
|
|
Returns:
|
|
JSON response with success status
|
|
"""
|
|
if comparison_id not in SHARED_DATA['comparisons']:
|
|
return jsonify({'success': False, 'message': 'Comparison not found'})
|
|
|
|
name = request.form.get('name')
|
|
if not name:
|
|
return jsonify({'success': False, 'message': 'Name is required'})
|
|
|
|
SHARED_DATA['comparisons'][comparison_id]['name'] = name
|
|
return jsonify({'success': True})
|
|
|
|
@app.route('/delete_comparison/<comparison_id>', methods=['POST'])
|
|
def delete_comparison(comparison_id):
|
|
"""
|
|
Delete a comparison.
|
|
|
|
Args:
|
|
comparison_id: ID of the comparison to delete
|
|
|
|
Returns:
|
|
Redirect to homepage
|
|
"""
|
|
if comparison_id in SHARED_DATA['comparisons']:
|
|
# Delete comparison data
|
|
del SHARED_DATA['comparisons'][comparison_id]
|
|
|
|
# Delete comparison folder
|
|
comparison_folder = os.path.join(app.config['UPLOAD_FOLDER'], comparison_id)
|
|
if os.path.exists(comparison_folder):
|
|
try:
|
|
for file in os.listdir(comparison_folder):
|
|
os.remove(os.path.join(comparison_folder, file))
|
|
os.rmdir(comparison_folder)
|
|
except Exception as e:
|
|
print(f"Warning: Could not delete comparison folder: {str(e)}")
|
|
|
|
# Update latest comparison ID if needed
|
|
if SHARED_DATA['latest_comparison_id'] == comparison_id:
|
|
if SHARED_DATA['comparisons']:
|
|
SHARED_DATA['latest_comparison_id'] = list(SHARED_DATA['comparisons'].keys())[-1]
|
|
else:
|
|
SHARED_DATA['latest_comparison_id'] = None
|
|
|
|
flash('Comparison deleted successfully', 'success')
|
|
else:
|
|
flash('Comparison not found', 'warning')
|
|
|
|
return redirect(url_for('index'))
|
|
|
|
@app.route('/refresh_repository', methods=['POST'])
|
|
def refresh_repository():
|
|
"""
|
|
Handle repository refresh requests and reload data with the latest changes.
|
|
|
|
Returns:
|
|
str: Rendered HTML template with updated comparison results
|
|
"""
|
|
try:
|
|
# Extract repo_id and comparison_id
|
|
repo_id = request.form.get('repo_id')
|
|
comparison_id = request.form.get('comparison_id')
|
|
|
|
if not repo_id or not comparison_id:
|
|
flash('Repository ID and comparison ID are required', 'danger')
|
|
return redirect(url_for('index'))
|
|
|
|
if comparison_id not in SHARED_DATA['comparisons']:
|
|
flash('Comparison not found', 'danger')
|
|
return redirect(url_for('index'))
|
|
|
|
# Get the existing comparison data
|
|
comparison_data = SHARED_DATA['comparisons'][comparison_id]
|
|
|
|
# Get files path from existing comparison
|
|
comparison_folder = os.path.join(app.config['UPLOAD_FOLDER'], comparison_id)
|
|
manifest_path = os.path.join(comparison_folder, 'manifest.xlsx')
|
|
dwg_path = os.path.join(comparison_folder, 'dwg.xlsx')
|
|
|
|
# Check if files exist
|
|
if not os.path.exists(manifest_path) or not os.path.exists(dwg_path):
|
|
flash('Required files not found', 'danger')
|
|
return redirect(url_for('index'))
|
|
|
|
# Find repository path
|
|
repo_path = os.path.join(app.config['CLONES_FOLDER'], repo_id)
|
|
|
|
if not os.path.exists(repo_path) or not os.path.exists(os.path.join(repo_path, '.git')):
|
|
flash('Repository not found', 'danger')
|
|
return redirect(url_for('index'))
|
|
|
|
# Update the repository
|
|
try:
|
|
# Try to pull from main branch
|
|
try:
|
|
subprocess.check_call(['git', '-C', repo_path, 'pull', 'origin', 'main'],
|
|
timeout=30)
|
|
repository_updated = True
|
|
except subprocess.CalledProcessError:
|
|
# Try master branch if main fails
|
|
try:
|
|
subprocess.check_call(['git', '-C', repo_path, 'pull', 'origin', 'master'],
|
|
timeout=30)
|
|
repository_updated = True
|
|
except subprocess.CalledProcessError:
|
|
flash('Failed to update repository', 'warning')
|
|
repository_updated = False
|
|
except Exception as e:
|
|
flash(f'Error updating repository: {str(e)}', 'warning')
|
|
repository_updated = False
|
|
|
|
# Reload data from all sources
|
|
scada_names = load_scada_names(repo_path)
|
|
manifest_names = load_excel_names(manifest_path)
|
|
dwg_names = load_excel_names(dwg_path)
|
|
|
|
# Normalize names for consistent comparison and get name mappings
|
|
normalized_scada, scada_mapping = normalize_names(scada_names)
|
|
normalized_manifest, manifest_mapping = normalize_names(manifest_names)
|
|
normalized_dwg, dwg_mapping = normalize_names(dwg_names)
|
|
|
|
# Compare all combinations
|
|
scada_vs_manifest = compare_name_lists(normalized_scada, normalized_manifest)
|
|
scada_vs_dwg = compare_name_lists(normalized_scada, normalized_dwg)
|
|
manifest_vs_dwg = compare_name_lists(normalized_manifest, normalized_dwg)
|
|
|
|
# Prepare comparison data for the template
|
|
updated_comparison_data = {
|
|
'scada_vs_manifest': {
|
|
'only_in_scada': scada_vs_manifest['only_in_list1'],
|
|
'only_in_manifest': scada_vs_manifest['only_in_list2'],
|
|
'common': scada_vs_manifest['common'],
|
|
'scada_count': len(normalized_scada),
|
|
'manifest_count': len(normalized_manifest)
|
|
},
|
|
'scada_vs_dwg': {
|
|
'only_in_scada': scada_vs_dwg['only_in_list1'],
|
|
'only_in_dwg': scada_vs_dwg['only_in_list2'],
|
|
'common': scada_vs_dwg['common'],
|
|
'scada_count': len(normalized_scada),
|
|
'dwg_count': len(normalized_dwg)
|
|
},
|
|
'manifest_vs_dwg': {
|
|
'only_in_manifest': manifest_vs_dwg['only_in_list1'],
|
|
'only_in_dwg': manifest_vs_dwg['only_in_list2'],
|
|
'common': manifest_vs_dwg['common'],
|
|
'manifest_count': len(normalized_manifest),
|
|
'dwg_count': len(normalized_dwg)
|
|
},
|
|
'name_mappings': {
|
|
'scada': scada_mapping,
|
|
'manifest': manifest_mapping,
|
|
'dwg': dwg_mapping
|
|
},
|
|
'repo_id': repo_id,
|
|
'repository_url': comparison_data['repository_url'],
|
|
'comparison_id': comparison_id,
|
|
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
|
|
'name': comparison_data['name']
|
|
}
|
|
|
|
# Update shared data
|
|
SHARED_DATA['comparisons'][comparison_id] = updated_comparison_data
|
|
SHARED_DATA['last_update_time'] = time.strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
if repository_updated:
|
|
flash('Repository updated and data reloaded successfully', 'success')
|
|
else:
|
|
flash('Data reloaded with existing repository content', 'info')
|
|
|
|
return render_template('results.html', data=updated_comparison_data, comparisons=SHARED_DATA['comparisons'])
|
|
|
|
except ValueError as e:
|
|
flash(str(e), 'danger')
|
|
return redirect(url_for('index'))
|
|
except Exception as e:
|
|
flash(f"An unexpected error occurred: {str(e)}", 'danger')
|
|
return redirect(url_for('index'))
|
|
|
|
if __name__ == '__main__':
|
|
app.run(host='0.0.0.0', port=5000, debug=True) |