MonitorProgress/find_missing_units.py
ilia.gurielidze@autStand.com cf2d46edd4 first commit
2025-04-09 15:00:09 +04:00

243 lines
11 KiB
Python

import csv
import os
import json
import argparse
import re
from collections import defaultdict # Use defaultdict for easier dictionary building
def normalize_string(s):
"""Convert string to lowercase and replace hyphens/whitespace with underscores."""
if not isinstance(s, str):
return ""
return re.sub(r'[-\s]+', '_', s.lower())
def get_control_panel_units(csv_filepath):
"""Reads Control Panel, Aliases, and equipment details from the CSV."""
# Store a list of dictionaries for each panel: [{'alias': 'a1', 'eq': 't1', 'conv': 'c1'}, ...]
panel_details_list = defaultdict(list)
# Keep track of unique aliases found for counting purposes
unique_aliases = set()
try:
with open(csv_filepath, mode='r', encoding='utf-8-sig') as infile:
reader = csv.reader(infile)
header = next(reader)
try:
panel_index = header.index('Control Panel')
alias_index = header.index('Alias')
eq_type_index = header.index('Equipment Type') # <-- New
conv_type_index = header.index('Type of Conveyor') # <-- New
except ValueError as e:
print(f"Error: Required column ('Control Panel', 'Alias', 'Equipment Type', or 'Type of Conveyor') not found in {csv_filepath}: {e}")
return None, 0 # Return None for map, 0 for total count
required_indices = [panel_index, alias_index, eq_type_index, conv_type_index]
max_index = max(required_indices)
for row in reader:
if len(row) > max_index:
panel = row[panel_index].strip()
alias = row[alias_index].strip()
eq_type = row[eq_type_index].strip() # <-- Read eq type
conv_type = row[conv_type_index].strip() # <-- Read conveyor type
if panel and alias:
# Store the details as a dictionary
panel_details_list[panel].append({
'alias': alias,
'equipment_type': eq_type,
'conveyor_type': conv_type
})
# Add alias to set for unique counting
unique_aliases.add(alias)
except FileNotFoundError:
print(f"Error: CSV file not found at {csv_filepath}")
return None, 0
except Exception as e:
print(f"Error reading CSV file {csv_filepath}: {e}")
return None, 0
total_csv_aliases = len(unique_aliases) # Total count is the size of the set
if not panel_details_list:
print(f"Warning: No valid Control Panel/Alias pairs found in {csv_filepath}")
print(f"Loaded {total_csv_aliases} unique aliases (with details) across {len(panel_details_list)} control panels from {csv_filepath}")
# Return the map of panels to lists of detail dicts, and the total unique alias count
return panel_details_list, total_csv_aliases
def extract_names_from_json(data, name_set):
"""Recursively extracts and normalizes 'name' properties from JSON data."""
if isinstance(data, dict):
if 'name' in data and isinstance(data['name'], str):
normalized_name = normalize_string(data['name'])
if normalized_name:
name_set.add(normalized_name)
for key, value in data.items():
if key != 'parent':
extract_names_from_json(value, name_set)
elif isinstance(data, list):
for item in data:
extract_names_from_json(item, name_set)
def collect_element_names(views_directory):
"""Scans all view.json files and collects all unique, normalized element names."""
found_names = set()
if not os.path.isdir(views_directory):
print(f"Error: Views directory not found at {views_directory}")
return None
print(f"Scanning directory: {views_directory} for element names...")
view_count = 0
for root, dirs, files in os.walk(views_directory):
for filename in files:
if filename.lower() == 'view.json':
filepath = os.path.join(root, filename)
view_count += 1
try:
with open(filepath, 'r', encoding='utf-8') as f:
view_data = json.load(f)
extract_names_from_json(view_data, found_names)
except json.JSONDecodeError:
print(f"Warning: Could not decode JSON for file: {filepath}")
except Exception as e:
print(f"Error processing file {filepath}: {e}")
print(f"Scanned {view_count} view.json files.")
return found_names
def calculate_progress(csv_filepath, views_directory):
"""Calculates the progress based on CSV aliases and view element names, including details."""
panel_details_map, total_csv_aliases = get_control_panel_units(csv_filepath)
if panel_details_map is None:
return None
if not panel_details_map:
print("No control panel/alias data loaded from CSV.")
return { "overall": {"total_csv": 0, "missing": 0, "found_json": 0, "percentage": 0, "missing_list": [], "found_list": [] }, "panels": {} }
found_normalized_names = collect_element_names(views_directory)
if found_normalized_names is None:
return None
total_found_json_names = len(found_normalized_names)
print(f"Found {total_found_json_names} unique normalized element names across all views.")
progress_data = {
"overall": {
"total_csv": total_csv_aliases,
"missing": 0,
"found_json": total_found_json_names,
"percentage": 0,
"missing_list": [], # Initialize overall lists
"found_list": []
},
"panels": {}
}
total_missing_count = 0
overall_missing_list_agg = [] # Temp lists for aggregation
overall_found_list_agg = []
for panel, details_list in panel_details_map.items():
panel_total = len(details_list)
panel_missing_count = 0
panel_missing_details_list = []
panel_found_details_list = []
for item_details in details_list:
alias = item_details['alias']
normalized_alias = normalize_string(alias)
if normalized_alias not in found_normalized_names:
panel_missing_details_list.append(item_details)
overall_missing_list_agg.append(item_details) # Add to overall missing
panel_missing_count += 1
else:
panel_found_details_list.append(item_details)
overall_found_list_agg.append(item_details) # Add to overall found
sorted_missing_list = sorted(panel_missing_details_list, key=lambda x: x['alias'])
sorted_found_list = sorted(panel_found_details_list, key=lambda x: x['alias'])
progress_data["panels"][panel] = {
"total": panel_total,
"missing": panel_missing_count,
"missing_list": sorted_missing_list,
"found_list": sorted_found_list,
"percentage": round(((panel_total - panel_missing_count) / panel_total * 100), 2) if panel_total > 0 else 0
}
total_missing_count += panel_missing_count
# Finalize overall data
progress_data["overall"]["missing"] = total_missing_count
progress_data["overall"]["percentage"] = round(((total_csv_aliases - total_missing_count) / total_csv_aliases * 100), 2) if total_csv_aliases > 0 else 0
# Sort and store aggregated overall lists
progress_data["overall"]["missing_list"] = sorted(overall_missing_list_agg, key=lambda x: x['alias'])
progress_data["overall"]["found_list"] = sorted(overall_found_list_agg, key=lambda x: x['alias'])
return progress_data
def generate_report(progress_data, csv_filename):
"""Generates a list of strings forming the Markdown report."""
report_lines = []
report_lines.append("# Report: Missing Aliases by Control Panel")
report_lines.append("\n---")
if progress_data["overall"]["missing"] == 0 and progress_data["overall"]["total_csv"] > 0 :
report_lines.append("\nAll aliases from the CSV were found associated with their Control Panels in view element names (case/hyphen/underscore insensitive).")
elif progress_data["overall"]["total_csv"] == 0:
report_lines.append("\nNo aliases found in the CSV file.")
else:
report_lines.append(f"\nFound **{progress_data['overall']['missing']}** alias(es) from `{csv_filename}` NOT found in any view element name:")
sorted_panels = sorted(progress_data["panels"].keys())
for panel in sorted_panels:
panel_data = progress_data["panels"][panel]
if panel_data["missing"] > 0:
report_lines.append(f"\n## {panel}")
report_lines.append(f"Missing {panel_data['missing']} of {panel_data['total']} ({100 - panel_data['percentage']:.2f}% missing)")
# Iterate through the list of missing details dictionaries
for item in panel_data["missing_list"]:
# Display alias and optionally other details in the text report
report_lines.append(f"- {item['alias']} (Eq: {item['equipment_type']}, Conv: {item['conveyor_type']})")
report_lines.append("\n---")
report_lines.append("\nScan complete.")
return report_lines
# Keep the command-line interface functionality
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Group missing aliases by Control Panel based on CSV and view element names.')
parser.add_argument('csv_file', help='Path to the CSV file (e.g., "Manifest.csv")')
parser.add_argument('views_dir', help='Path to the directory containing detailed view folders (e.g., "Detailed-Views")')
parser.add_argument('-o', '--output', help='Optional path to save the report as a Markdown file (e.g., "report.md")')
args = parser.parse_args()
csv_filepath = args.csv_file
views_directory = args.views_dir
# Calculate progress data
progress_results = calculate_progress(csv_filepath, views_directory)
if progress_results is not None:
# Generate report content from the results
report_lines = generate_report(progress_results, os.path.basename(csv_filepath))
# Output Report
if args.output:
try:
with open(args.output, 'w', encoding='utf-8') as outfile:
for line in report_lines:
outfile.write(line + '\n')
print(f"\nReport successfully saved to: {args.output}")
except Exception as e:
print(f"\nError writing report file {args.output}: {e}")
print("\n--- CONSOLE REPORT FALLBACK ---")
for line in report_lines:
print(line)
else:
print("\n".join(report_lines))
else:
print("\nFailed to calculate progress. Check errors above.")