first commit
This commit is contained in:
commit
38c5eb5fa4
51
README.md
Normal file
51
README.md
Normal file
@ -0,0 +1,51 @@
|
||||
# TagProps Transformer
|
||||
|
||||
This tool transforms tag properties in JSON files by replacing the first value in the `tagProps` array with a path based on the element's name, an MCM number, and a device type.
|
||||
|
||||
## Usage
|
||||
|
||||
```bash
|
||||
python tag_transformer.py <input_file.json> <mcm_number> [device_type]
|
||||
```
|
||||
|
||||
### Example:
|
||||
|
||||
```bash
|
||||
python tag_transformer.py sample_input.json 07 Conveyor
|
||||
```
|
||||
|
||||
This will:
|
||||
1. Read the input JSON file (`sample_input.json`)
|
||||
2. For each element in the JSON, replace only the first value in the `tagProps` array with `System/MCM07/Conveyor/[ElementName]`
|
||||
3. Output the transformed JSON to the console
|
||||
|
||||
### Auto Device Type Detection
|
||||
|
||||
If you don't specify a device type, the script will automatically determine the device type based on the element name:
|
||||
|
||||
- If the name contains `EPC` followed by "End" (e.g., `EPCEnd`, `EPC_End`, `EPC1_End`, `EPC2_End_1`), the device type will be `EPC_End`
|
||||
- If the name contains `EPC` followed by a number (e.g., `EPC1`, `EPC2`, `EPC_1`), the device type will be `EPC`
|
||||
- If the name contains `EPC` followed by "Line" (e.g., `EPCLine`, `EPC_Line`), the device type will be `EPC_Line`
|
||||
- For all other cases, the device type will be `status`
|
||||
|
||||
Example:
|
||||
```bash
|
||||
python tag_transformer.py sample_input.json 07
|
||||
```
|
||||
|
||||
## Output Format
|
||||
|
||||
The output will be a valid JSON array with the same structure as the input, but with the first value in each `tagProps` array replaced with the generated path.
|
||||
|
||||
## Saving Output to a File
|
||||
|
||||
To save the output to a file, you can redirect the console output:
|
||||
|
||||
```bash
|
||||
python tag_transformer.py sample_input.json 07 Conveyor > output.json
|
||||
```
|
||||
|
||||
## Requirements
|
||||
|
||||
- Python 3.6+
|
||||
- No additional dependencies required
|
||||
BIN
__pycache__/create_tag_structure.cpython-311.pyc
Normal file
BIN
__pycache__/create_tag_structure.cpython-311.pyc
Normal file
Binary file not shown.
BIN
__pycache__/create_tag_structure.cpython-313.pyc
Normal file
BIN
__pycache__/create_tag_structure.cpython-313.pyc
Normal file
Binary file not shown.
BIN
__pycache__/tag_transformer.cpython-311.pyc
Normal file
BIN
__pycache__/tag_transformer.cpython-311.pyc
Normal file
Binary file not shown.
BIN
__pycache__/tag_transformer.cpython-313.pyc
Normal file
BIN
__pycache__/tag_transformer.cpython-313.pyc
Normal file
Binary file not shown.
229
check_tags.py
Normal file
229
check_tags.py
Normal file
@ -0,0 +1,229 @@
|
||||
# Filename: check_tags.py
|
||||
import json
|
||||
import sys
|
||||
import os
|
||||
import argparse
|
||||
from typing import List, Dict, Any, Set, Optional
|
||||
|
||||
# --- Constants ---
|
||||
VIEW_JSON_FILENAME = 'view.json'
|
||||
TAGS_JSON_FILENAME = 'tags.json' # Used in help text
|
||||
TAGS = 'tags'
|
||||
ROOT = 'root'
|
||||
CHILDREN = 'children'
|
||||
META = 'meta'
|
||||
NAME = 'name'
|
||||
TAG_TYPE = 'tagType'
|
||||
FOLDER = 'Folder'
|
||||
UDT_INSTANCE = 'UdtInstance'
|
||||
IMAGE_NAME = 'Image'
|
||||
SOURCE = 'source' # Key for missing element dict
|
||||
|
||||
# --- File System Functions ---
|
||||
def find_view_files(base_dir: str) -> Optional[List[str]]:
|
||||
"""
|
||||
Recursively finds all view.json files within the base directory.
|
||||
Returns None if the base directory is invalid.
|
||||
"""
|
||||
if not base_dir or not os.path.isdir(base_dir):
|
||||
print(f"Error: Base directory '{base_dir}' not found, is invalid, or is not a directory.")
|
||||
return None # Indicate error
|
||||
|
||||
view_files = []
|
||||
print(f"Scanning for '{VIEW_JSON_FILENAME}' in '{base_dir}'...")
|
||||
for root_dir, _, files in os.walk(base_dir):
|
||||
if VIEW_JSON_FILENAME in files:
|
||||
view_files.append(os.path.join(root_dir, VIEW_JSON_FILENAME))
|
||||
|
||||
print(f"Found {len(view_files)} '{VIEW_JSON_FILENAME}' files.")
|
||||
return view_files
|
||||
|
||||
# --- Data Extraction Functions ---
|
||||
def extract_expected_elements(view_files: List[str]) -> Dict[str, str]:
|
||||
"""
|
||||
Parses view.json files and extracts names of elements expected to have tags.
|
||||
Filters out elements named 'Image'. Handles duplicates by keeping the first encountered.
|
||||
Returns a dictionary mapping element names to their source view.json file path.
|
||||
"""
|
||||
expected_elements_map = {} # {element_name: source_file_path}
|
||||
print(f"\nExtracting expected element names from {len(view_files)} files...")
|
||||
processed_count = 0
|
||||
skipped_structure_count = 0
|
||||
error_count = 0
|
||||
|
||||
for file_path in view_files:
|
||||
try:
|
||||
with open(file_path, 'r') as f:
|
||||
data = json.load(f)
|
||||
|
||||
# Validate basic structure needed to find elements
|
||||
root_node = data.get(ROOT)
|
||||
if not isinstance(root_node, dict):
|
||||
print(f"Warning: Skipping '{file_path}': Missing or invalid '{ROOT}' node.")
|
||||
skipped_structure_count += 1
|
||||
continue
|
||||
|
||||
elements = root_node.get(CHILDREN)
|
||||
if not isinstance(elements, list):
|
||||
print(f"Warning: Skipping '{file_path}': '{ROOT}.{CHILDREN}' is missing or not a list.")
|
||||
skipped_structure_count += 1
|
||||
continue
|
||||
|
||||
found_in_file_count = 0
|
||||
for element in elements:
|
||||
# Ensure element is a dictionary before trying to access keys
|
||||
if not isinstance(element, dict):
|
||||
continue
|
||||
|
||||
element_meta = element.get(META, {})
|
||||
element_name = element_meta.get(NAME)
|
||||
|
||||
# Element should have a name and not be an "Image" to be considered
|
||||
if element_name and element_name != IMAGE_NAME:
|
||||
if element_name in expected_elements_map:
|
||||
# Log duplicate but keep the first one encountered
|
||||
print(f"Warning: Duplicate element name '{element_name}' found. Original source: '{expected_elements_map[element_name]}', New source: '{file_path}'. Keeping first occurrence.")
|
||||
else:
|
||||
expected_elements_map[element_name] = file_path
|
||||
found_in_file_count += 1
|
||||
|
||||
if found_in_file_count > 0:
|
||||
processed_count += 1
|
||||
# If found_in_file_count is 0, it means the file was processed but contained
|
||||
# only skipped elements (like Image) or elements without names. We don't
|
||||
# count this explicitly as skipped_structure or error.
|
||||
|
||||
except json.JSONDecodeError:
|
||||
print(f"Warning: Skipping '{file_path}': Invalid JSON.")
|
||||
error_count += 1
|
||||
except IOError as e:
|
||||
print(f"Warning: Skipping '{file_path}': IO Error reading file - {e}")
|
||||
error_count += 1
|
||||
except Exception as e:
|
||||
print(f"Warning: Skipping '{file_path}': Unexpected error processing file - {e}")
|
||||
error_count += 1
|
||||
|
||||
total_skipped_or_error = skipped_structure_count + error_count
|
||||
print(f"Extraction complete. Found {len(expected_elements_map)} unique expected elements.")
|
||||
print(f" Processed {processed_count} files containing valid elements.")
|
||||
if total_skipped_or_error > 0:
|
||||
print(f" Skipped {skipped_structure_count} files due to structural issues.")
|
||||
print(f" Encountered errors in {error_count} files.")
|
||||
return expected_elements_map
|
||||
|
||||
def _find_udt_instances_recursive(items: List[Dict[str, Any]], found_names: Set[str]):
|
||||
"""Helper function to recursively find UDT instance names within a list of tags/folders."""
|
||||
if not isinstance(items, list):
|
||||
# This case might occur if a folder's 'tags' key is not a list. Log or handle as needed.
|
||||
# print(f"Warning: Expected a list of tags/folders, but got {type(items)}. Skipping.")
|
||||
return # Stop recursion for this branch
|
||||
|
||||
for item in items:
|
||||
if not isinstance(item, dict):
|
||||
continue # Skip malformed items
|
||||
|
||||
item_tag_type = item.get(TAG_TYPE)
|
||||
item_name = item.get(NAME)
|
||||
|
||||
if item_tag_type == UDT_INSTANCE and item_name:
|
||||
found_names.add(item_name)
|
||||
elif item_tag_type == FOLDER:
|
||||
# Recursively check the tags within this folder
|
||||
folder_tags = item.get(TAGS, [])
|
||||
_find_udt_instances_recursive(folder_tags, found_names)
|
||||
|
||||
|
||||
def extract_existing_tag_names(tags_file_path: str) -> Optional[Set[str]]:
|
||||
"""
|
||||
Parses the tags JSON file and extracts names of all UDT instances.
|
||||
Handles nested structures (MCM folders, device folders).
|
||||
Returns a set of names, or None if the file cannot be processed.
|
||||
"""
|
||||
existing_names = set()
|
||||
print(f"\nLoading existing UDT instance names from '{tags_file_path}'...")
|
||||
try:
|
||||
with open(tags_file_path, 'r') as f:
|
||||
data = json.load(f)
|
||||
|
||||
# Expecting a top-level object like {"tags": [...list of MCM folders...]}
|
||||
if not isinstance(data, dict) or TAGS not in data or not isinstance(data[TAGS], list):
|
||||
print(f"Error: '{tags_file_path}' does not contain a top-level '{TAGS}' key with a list value.")
|
||||
return None # Indicate error
|
||||
|
||||
top_level_tags = data[TAGS]
|
||||
# Start the recursive search from the list of MCM folders
|
||||
_find_udt_instances_recursive(top_level_tags, existing_names)
|
||||
|
||||
print(f"Found {len(existing_names)} unique UDT instance names in '{os.path.basename(tags_file_path)}'.")
|
||||
return existing_names
|
||||
|
||||
except FileNotFoundError:
|
||||
print(f"Error: Tags file '{tags_file_path}' not found.")
|
||||
return None
|
||||
except json.JSONDecodeError:
|
||||
print(f"Error: Tags file '{tags_file_path}' is not valid JSON.")
|
||||
return None
|
||||
except IOError as e:
|
||||
print(f"Error: IO Error reading tags file '{tags_file_path}' - {e}")
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f"Error: Unexpected error reading tags file '{tags_file_path}' - {e}")
|
||||
return None # Indicate error
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description=f"Compares element names from '{VIEW_JSON_FILENAME}' files against UDT Instances in a '{TAGS_JSON_FILENAME}' file.",
|
||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter
|
||||
)
|
||||
parser.add_argument("tags_file", help=f"Path to the generated {TAGS_JSON_FILENAME} file.")
|
||||
parser.add_argument("base_directory", help=f"Path to the base directory containing project subfolders (which contain '{VIEW_JSON_FILENAME}' files). Example: './Detailed-Views'")
|
||||
args = parser.parse_args()
|
||||
|
||||
# 1. Find all view.json files
|
||||
view_files = find_view_files(args.base_directory)
|
||||
if view_files is None: # Error handled in function
|
||||
sys.exit(1)
|
||||
if not view_files:
|
||||
print("No view.json files found in the specified directory. Exiting.")
|
||||
sys.exit(0) # Not necessarily an error if the directory was empty
|
||||
|
||||
# 2. Extract expected elements from view.json files
|
||||
expected_elements = extract_expected_elements(view_files)
|
||||
if not expected_elements:
|
||||
print("No elements suitable for tag generation were found in any view.json files. Exiting.")
|
||||
sys.exit(0) # Not an error if views only contained Images etc.
|
||||
|
||||
# 3. Extract existing tag names from tags.json
|
||||
existing_tag_names = extract_existing_tag_names(args.tags_file)
|
||||
if existing_tag_names is None:
|
||||
# Error messages printed within the function
|
||||
print("Failed to load existing tags. Exiting.")
|
||||
sys.exit(1)
|
||||
# No need to check if empty, comparison will handle it.
|
||||
|
||||
# 4. Compare and report missing elements
|
||||
print("\nComparing expected elements against existing UDT instances...")
|
||||
missing_elements = []
|
||||
for name, source_file in expected_elements.items():
|
||||
if name not in existing_tag_names:
|
||||
# Use SOURCE constant for the dictionary key
|
||||
missing_elements.append({NAME: name, SOURCE: source_file})
|
||||
|
||||
print("-" * 40)
|
||||
if not missing_elements:
|
||||
print("Success! All expected elements were found as UDT Instances in the tags file.")
|
||||
exit_code = 0
|
||||
else:
|
||||
print(f"Check Failed: Found {len(missing_elements)} missing UDT Instances:")
|
||||
for missing in missing_elements:
|
||||
# Use constants for accessing dictionary keys
|
||||
print(f" - Name: '{missing[NAME]}', Expected Source: '{missing[SOURCE]}'" )
|
||||
print("\nPlease ensure these elements were processed correctly or excluded intentionally.")
|
||||
exit_code = 1 # Indicate failure
|
||||
print("-" * 40)
|
||||
sys.exit(exit_code)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
127
compare_json.py
Normal file
127
compare_json.py
Normal file
@ -0,0 +1,127 @@
|
||||
#!/usr/bin/env python3
|
||||
import json
|
||||
import sys
|
||||
import argparse
|
||||
import os
|
||||
from typing import Set, List, Dict, Any, Optional
|
||||
|
||||
# --- Constants (mirroring those used in tag structure) ---
|
||||
TAGS = 'tags'
|
||||
NAME = 'name'
|
||||
TAG_TYPE = 'tagType'
|
||||
FOLDER = 'Folder'
|
||||
UDT_INSTANCE = 'UdtInstance'
|
||||
|
||||
|
||||
def load_json_file(file_path: str) -> Optional[Dict[str, Any]]:
|
||||
"""Loads JSON data from a file path. Returns the loaded dict or None on error."""
|
||||
if not os.path.exists(file_path):
|
||||
print(f"Error: File not found: '{file_path}'", file=sys.stderr)
|
||||
return None
|
||||
try:
|
||||
with open(file_path, 'r') as f:
|
||||
data = json.load(f)
|
||||
# Basic validation that it's a dictionary (expected top level)
|
||||
if not isinstance(data, dict):
|
||||
print(f"Error: Expected JSON root to be an object/dictionary in '{file_path}'", file=sys.stderr)
|
||||
return None
|
||||
return data
|
||||
except json.JSONDecodeError:
|
||||
print(f"Error: Invalid JSON format in file: '{file_path}'", file=sys.stderr)
|
||||
return None
|
||||
except IOError as e:
|
||||
print(f"Error reading file '{file_path}': {e}", file=sys.stderr)
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f"An unexpected error occurred while loading '{file_path}': {e}", file=sys.stderr)
|
||||
return None
|
||||
|
||||
def _find_udt_instances_recursive(items: List[Dict[str, Any]], found_names: Set[str]):
|
||||
"""Helper function to recursively find UDT instance names within a list of tags/folders."""
|
||||
# Copy logic from check_tags.py
|
||||
if not isinstance(items, list):
|
||||
return # Stop recursion for this branch
|
||||
|
||||
for item in items:
|
||||
if not isinstance(item, dict):
|
||||
continue # Skip malformed items
|
||||
|
||||
item_tag_type = item.get(TAG_TYPE)
|
||||
item_name = item.get(NAME)
|
||||
|
||||
if item_tag_type == UDT_INSTANCE and item_name:
|
||||
found_names.add(item_name)
|
||||
elif item_tag_type == FOLDER:
|
||||
# Recursively check the tags within this folder
|
||||
folder_tags = item.get(TAGS, [])
|
||||
_find_udt_instances_recursive(folder_tags, found_names)
|
||||
|
||||
def extract_udt_names(file_path: str) -> Optional[Set[str]]:
|
||||
"""
|
||||
Loads a tags JSON file and extracts the names of all UDT instances.
|
||||
Returns a set of names, or None if the file cannot be processed.
|
||||
"""
|
||||
print(f"Extracting UDT names from: {file_path}")
|
||||
data = load_json_file(file_path)
|
||||
if data is None:
|
||||
return None
|
||||
|
||||
# Expecting a top-level object like {"tags": [...list of MCM folders...]}
|
||||
if TAGS not in data or not isinstance(data.get(TAGS), list):
|
||||
print(f"Error: '{file_path}' does not contain a top-level '{TAGS}' key with a list value.", file=sys.stderr)
|
||||
return None
|
||||
|
||||
udt_names = set()
|
||||
top_level_tags = data[TAGS]
|
||||
# Start the recursive search from the list of MCM folders/tags
|
||||
_find_udt_instances_recursive(top_level_tags, udt_names)
|
||||
|
||||
print(f" Found {len(udt_names)} UDT instance names.")
|
||||
return udt_names
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Compares the set of UDT Instance names found in two tags.json files.",
|
||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter
|
||||
)
|
||||
parser.add_argument("file1", help="Path to the first JSON tags file (e.g., baseline).")
|
||||
parser.add_argument("file2", help="Path to the second JSON tags file (e.g., new result).")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
print(f"Comparing UDT Instance names in:\n File 1: {args.file1}\n File 2: {args.file2}\n")
|
||||
|
||||
# Extract UDT names from both files
|
||||
names1 = extract_udt_names(args.file1)
|
||||
if names1 is None:
|
||||
sys.exit(1)
|
||||
|
||||
names2 = extract_udt_names(args.file2)
|
||||
if names2 is None:
|
||||
sys.exit(1)
|
||||
|
||||
# Compare the sets of names
|
||||
print("\n--- Comparison Results ---")
|
||||
if names1 == names2:
|
||||
print(f"Success: Both files contain the exact same {len(names1)} UDT instance names.")
|
||||
sys.exit(0)
|
||||
else:
|
||||
print("Difference Found: The set of UDT instance names differs between the files.")
|
||||
missing_in_file2 = sorted(list(names1 - names2))
|
||||
added_in_file2 = sorted(list(names2 - names1))
|
||||
|
||||
if missing_in_file2:
|
||||
print(f"\nNames in '{os.path.basename(args.file1)}' but NOT in '{os.path.basename(args.file2)}' ({len(missing_in_file2)}):")
|
||||
for name in missing_in_file2:
|
||||
print(f" - {name}")
|
||||
|
||||
if added_in_file2:
|
||||
print(f"\nNames in '{os.path.basename(args.file2)}' but NOT in '{os.path.basename(args.file1)}' ({len(added_in_file2)}):")
|
||||
for name in added_in_file2:
|
||||
print(f" - {name}")
|
||||
|
||||
sys.exit(1) # Exit with a non-zero code to indicate difference
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
117
compare_views.py
Normal file
117
compare_views.py
Normal file
@ -0,0 +1,117 @@
|
||||
#!/usr/bin/env python3
|
||||
import json
|
||||
import sys
|
||||
import argparse
|
||||
import os
|
||||
from typing import Dict, Any, Optional
|
||||
|
||||
# Constants
|
||||
VIEW_JSON_FILENAME = 'view.json'
|
||||
|
||||
def load_json_file(file_path: str) -> Optional[Dict[str, Any]]:
|
||||
"""Loads JSON data from a file path. Returns the loaded dict or None on error."""
|
||||
if not os.path.exists(file_path):
|
||||
# This might be expected if a corresponding file doesn't exist in the baseline
|
||||
# print(f"Warning: File not found: '{file_path}'", file=sys.stderr)
|
||||
return None
|
||||
try:
|
||||
with open(file_path, 'r') as f:
|
||||
data = json.load(f)
|
||||
if not isinstance(data, dict):
|
||||
print(f"Error: Expected JSON root to be an object/dictionary in '{file_path}'", file=sys.stderr)
|
||||
return None # Treat as error
|
||||
return data
|
||||
except json.JSONDecodeError:
|
||||
print(f"Error: Invalid JSON format in file: '{file_path}'", file=sys.stderr)
|
||||
return None # Treat as error
|
||||
except IOError as e:
|
||||
print(f"Error reading file '{file_path}': {e}", file=sys.stderr)
|
||||
return None # Treat as error
|
||||
except Exception as e:
|
||||
print(f"An unexpected error occurred while loading '{file_path}': {e}", file=sys.stderr)
|
||||
return None # Treat as error
|
||||
|
||||
def compare_directories(dir1: str, dir2: str):
|
||||
"""Compares view.json files found in subdirectories of dir1 with corresponding files in dir2."""
|
||||
print(f"Comparing '{VIEW_JSON_FILENAME}' files:")
|
||||
print(f" Directory 1 (Modified): {dir1}")
|
||||
print(f" Directory 2 (Baseline): {dir2}\n")
|
||||
|
||||
differences_found = 0
|
||||
files_compared = 0
|
||||
errors_loading = 0
|
||||
baseline_missing = 0
|
||||
|
||||
for root, _, files in os.walk(dir1):
|
||||
if VIEW_JSON_FILENAME in files:
|
||||
relative_path = os.path.relpath(root, dir1)
|
||||
file1_path = os.path.join(root, VIEW_JSON_FILENAME)
|
||||
file2_path = os.path.join(dir2, relative_path, VIEW_JSON_FILENAME)
|
||||
|
||||
print(f"Comparing: {os.path.join(relative_path, VIEW_JSON_FILENAME)}")
|
||||
files_compared += 1
|
||||
|
||||
data1 = load_json_file(file1_path)
|
||||
data2 = load_json_file(file2_path)
|
||||
|
||||
if data1 is None:
|
||||
print(f" - Error loading modified file: {file1_path}")
|
||||
errors_loading += 1
|
||||
continue # Cannot compare if modified file fails
|
||||
|
||||
if data2 is None:
|
||||
# Check if the baseline file *should* exist
|
||||
if os.path.exists(os.path.dirname(file2_path)):
|
||||
print(f" - Baseline file missing or errored: {file2_path}")
|
||||
baseline_missing += 1
|
||||
# Decide if this counts as a difference? Yes, likely.
|
||||
differences_found += 1
|
||||
else:
|
||||
print(f" - Baseline directory structure missing for: {os.path.join(relative_path, VIEW_JSON_FILENAME)}. Cannot compare.")
|
||||
baseline_missing += 1 # Count as missing baseline structure
|
||||
continue # Cannot compare if baseline is missing/errored
|
||||
|
||||
# Perform the comparison
|
||||
if data1 == data2:
|
||||
print(" - Identical")
|
||||
else:
|
||||
print(" - DIFFERENCE FOUND")
|
||||
differences_found += 1
|
||||
|
||||
print("\n--- Comparison Summary ---")
|
||||
print(f" View files compared: {files_compared}")
|
||||
print(f" Files with differences: {differences_found}")
|
||||
print(f" Errors loading modified files: {errors_loading}")
|
||||
print(f" Baseline files missing/errored: {baseline_missing}")
|
||||
|
||||
if differences_found == 0 and errors_loading == 0 and baseline_missing == 0:
|
||||
print("\nSuccess: All corresponding view.json files are logically identical.")
|
||||
return 0 # Exit code 0 for success
|
||||
else:
|
||||
print("\nCheck Complete: Differences or errors were encountered.")
|
||||
return 1 # Exit code 1 for differences/errors
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description=f"Compares {VIEW_JSON_FILENAME} files between two directory structures.",
|
||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter
|
||||
)
|
||||
parser.add_argument("dir1", help="Path to the first directory (e.g., the modified one).")
|
||||
parser.add_argument("dir2", help="Path to the second directory (e.g., the baseline/original one).")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Validate directories exist
|
||||
if not os.path.isdir(args.dir1):
|
||||
print(f"Error: Directory 1 not found: '{args.dir1}'", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
if not os.path.isdir(args.dir2):
|
||||
print(f"Error: Directory 2 not found: '{args.dir2}'", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
exit_code = compare_directories(args.dir1, args.dir2)
|
||||
sys.exit(exit_code)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
200
create_tag_structure.py
Normal file
200
create_tag_structure.py
Normal file
@ -0,0 +1,200 @@
|
||||
#!/usr/bin/env python3
|
||||
import json
|
||||
import sys
|
||||
import re
|
||||
from typing import List, Dict, Any, Optional
|
||||
|
||||
# --- Constants for dictionary keys and common values ---
|
||||
META = 'meta'
|
||||
NAME = 'name'
|
||||
PROPS = 'props'
|
||||
PATH = 'path'
|
||||
TAGS = 'tags'
|
||||
TAG_TYPE = 'tagType'
|
||||
TYPE_ID = 'typeId'
|
||||
VALUE = 'value'
|
||||
|
||||
FOLDER = 'Folder'
|
||||
UDT_INSTANCE = 'UdtInstance'
|
||||
ATOMIC_TAG = 'AtomicTag'
|
||||
UDT_TYPE_ID = 'TagProp' # Specific type ID for these UDTs
|
||||
|
||||
IMAGE_NAME = 'Image'
|
||||
ALARMST_TAG = 'ALARMST'
|
||||
STATE_TAG = 'STATE'
|
||||
DCN_TAG = 'DCN'
|
||||
|
||||
MCM_PREFIX = 'MCM'
|
||||
STATUS_DEVICE_TYPE = 'Status'
|
||||
MCM_DEVICE_TYPE_INDICATOR = "" # Indicates the element is the MCM itself
|
||||
|
||||
# --- Helper function to create a standard UDT instance ---
|
||||
def _create_udt_instance(element_name: str) -> Dict[str, Any]:
|
||||
"""Creates the dictionary for a standard UDT instance tag."""
|
||||
return {
|
||||
NAME: element_name,
|
||||
TYPE_ID: UDT_TYPE_ID,
|
||||
TAG_TYPE: UDT_INSTANCE,
|
||||
TAGS: [
|
||||
{NAME: ALARMST_TAG, VALUE: 0, TAG_TYPE: ATOMIC_TAG},
|
||||
{NAME: STATE_TAG, TAG_TYPE: ATOMIC_TAG},
|
||||
{NAME: DCN_TAG, TAG_TYPE: ATOMIC_TAG},
|
||||
]
|
||||
}
|
||||
|
||||
# --- Device Type Determination ---
|
||||
def determine_device_type(element: Dict[str, Any]) -> str:
|
||||
"""
|
||||
Determine the device type based on the element's name or props.path.
|
||||
|
||||
Args:
|
||||
element: The element dictionary.
|
||||
|
||||
Returns:
|
||||
The determined device type (e.g., "ControlCabinet"),
|
||||
MCM_DEVICE_TYPE_INDICATOR ("") for elements named MCM/MCMxx,
|
||||
or STATUS_DEVICE_TYPE if path is invalid or missing.
|
||||
"""
|
||||
element_name = element.get(META, {}).get(NAME)
|
||||
# Special case: If element name is MCM or MCMxx, it doesn't belong in a subfolder.
|
||||
if element_name and (element_name == MCM_PREFIX or re.match(rf'^{MCM_PREFIX}\d+$', element_name)):
|
||||
return MCM_DEVICE_TYPE_INDICATOR # Return the indicator for MCM elements
|
||||
|
||||
# Determine type from props.path otherwise
|
||||
element_path = element.get(PROPS, {}).get(PATH)
|
||||
|
||||
if isinstance(element_path, str) and element_path:
|
||||
# Normalize path separators and split
|
||||
parts = element_path.replace('\\', '/').split('/')
|
||||
# The device type is typically the last part of the path
|
||||
if parts and parts[-1]:
|
||||
return parts[-1]
|
||||
|
||||
# Fallback if path is missing, empty, or parsing fails
|
||||
return STATUS_DEVICE_TYPE
|
||||
|
||||
# --- Main Tag Structure Creation Logic ---
|
||||
def create_tag_structure(
|
||||
elements: List[Dict[str, Any]],
|
||||
mcm_number: str,
|
||||
device_type_override: Optional[str] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Create a hierarchical tag structure with MCM as the top folder,
|
||||
device types as subfolders, and elements as UDT instances with atomic tags.
|
||||
|
||||
Args:
|
||||
elements: List of element dictionaries from the JSON.
|
||||
mcm_number: The MCM number (will be zero-padded to 2 digits).
|
||||
device_type_override: Optional override for device type. If None,
|
||||
it's determined from the element.
|
||||
|
||||
Returns:
|
||||
A dictionary representing the structured tags for the MCM.
|
||||
"""
|
||||
mcm_formatted = f"{MCM_PREFIX}{mcm_number.zfill(2)}"
|
||||
|
||||
# Initialize the root structure for this MCM
|
||||
tag_structure = {
|
||||
NAME: mcm_formatted,
|
||||
TAG_TYPE: FOLDER,
|
||||
TAGS: []
|
||||
}
|
||||
|
||||
# Keep track of device folders already created to avoid duplicates
|
||||
device_folders: Dict[str, Dict[str, Any]] = {}
|
||||
|
||||
for element in elements:
|
||||
element_meta = element.get(META, {})
|
||||
element_name = element_meta.get(NAME)
|
||||
|
||||
# Skip elements named "Image" or elements without a name
|
||||
if not element_name or element_name == IMAGE_NAME:
|
||||
continue
|
||||
|
||||
# Determine the device type for this element
|
||||
actual_device_type = device_type_override
|
||||
if actual_device_type is None: # Check for None explicitly
|
||||
actual_device_type = determine_device_type(element)
|
||||
|
||||
# Create the UDT instance tag for the element
|
||||
element_tag = _create_udt_instance(element_name)
|
||||
|
||||
# --- Add the element tag to the structure ---
|
||||
|
||||
# Case 1: Element is the MCM itself (no device subfolder)
|
||||
if actual_device_type == MCM_DEVICE_TYPE_INDICATOR:
|
||||
tag_structure[TAGS].append(element_tag)
|
||||
|
||||
# Case 2: Element belongs in a device type subfolder
|
||||
else:
|
||||
# Create the device type folder if it doesn't exist yet
|
||||
if actual_device_type not in device_folders:
|
||||
device_folder = {
|
||||
NAME: actual_device_type,
|
||||
TAG_TYPE: FOLDER,
|
||||
TAGS: []
|
||||
}
|
||||
device_folders[actual_device_type] = device_folder
|
||||
# Add the new folder to the main MCM structure
|
||||
tag_structure[TAGS].append(device_folder)
|
||||
|
||||
# Add the element tag to its corresponding device folder
|
||||
device_folders[actual_device_type][TAGS].append(element_tag)
|
||||
|
||||
return tag_structure
|
||||
|
||||
# --- Main Execution Block ---
|
||||
def main():
|
||||
# Check if filename and MCM number are provided
|
||||
if len(sys.argv) < 3:
|
||||
print(f"Usage: python {os.path.basename(__file__)} <input_file.json> <mcm_number> [device_type_override]")
|
||||
print(f"Example: python {os.path.basename(__file__)} input.json 07 Conveyor")
|
||||
print("If device_type_override is not provided, it will be determined automatically.")
|
||||
sys.exit(1)
|
||||
|
||||
input_file = sys.argv[1]
|
||||
mcm_number = sys.argv[2]
|
||||
|
||||
# Device type override is optional
|
||||
device_type_override = None
|
||||
if len(sys.argv) >= 4:
|
||||
device_type_override = sys.argv[3]
|
||||
|
||||
try:
|
||||
# Read the input JSON file
|
||||
with open(input_file, 'r') as f:
|
||||
# Assuming the input file contains the list of elements directly
|
||||
# If it contains the {'root': {'children': [...]}} structure, load that first
|
||||
raw_data = json.load(f)
|
||||
# Adjust based on actual input structure if necessary:
|
||||
if isinstance(raw_data, dict) and 'root' in raw_data and 'children' in raw_data['root']:
|
||||
elements_data = raw_data['root']['children']
|
||||
if not isinstance(elements_data, list):
|
||||
print(f"Error: 'root.children' in '{input_file}' is not a list.")
|
||||
sys.exit(1)
|
||||
elif isinstance(raw_data, list):
|
||||
elements_data = raw_data # Assume list is the elements directly
|
||||
else:
|
||||
print(f"Error: Unexpected JSON structure in '{input_file}'. Expected a list of elements or {{'root': {{'children': [...]}}}}.")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
# Create the tag structure
|
||||
result = create_tag_structure(elements_data, mcm_number, device_type_override)
|
||||
|
||||
# Output the result as JSON to stdout
|
||||
print(json.dumps(result, indent=2))
|
||||
|
||||
except FileNotFoundError:
|
||||
print(f"Error: File '{input_file}' not found.")
|
||||
sys.exit(1)
|
||||
except json.JSONDecodeError:
|
||||
print(f"Error: File '{input_file}' is not valid JSON.")
|
||||
sys.exit(1)
|
||||
except Exception as e:
|
||||
print(f"An unexpected error occurred: {str(e)}")
|
||||
sys.exit(1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
68440
output/tags.json
Normal file
68440
output/tags.json
Normal file
File diff suppressed because it is too large
Load Diff
54008
outputs/tags.json
Normal file
54008
outputs/tags.json
Normal file
File diff suppressed because it is too large
Load Diff
49336
outputs/tags_baseline.json
Normal file
49336
outputs/tags_baseline.json
Normal file
File diff suppressed because it is too large
Load Diff
348
process_tags.py
Normal file
348
process_tags.py
Normal file
@ -0,0 +1,348 @@
|
||||
#!/usr/bin/env python3
|
||||
import json
|
||||
import sys
|
||||
import os
|
||||
import re
|
||||
import argparse
|
||||
from typing import Optional, Tuple, Dict, Any, List
|
||||
|
||||
# Assuming these modules are in the same directory or Python path
|
||||
try:
|
||||
from tag_transformer import generate_tag_paths
|
||||
# Import the refactored create_tag_structure and its constants
|
||||
from create_tag_structure import create_tag_structure, TAGS as TAGS_KEY, NAME as NAME_KEY, TAG_TYPE as TAG_TYPE_KEY, FOLDER as FOLDER_TYPE, MCM_PREFIX
|
||||
except ImportError as e:
|
||||
print(f"Error: Failed to import required modules. Make sure 'tag_transformer.py' and 'create_tag_structure.py' are accessible. Details: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
# --- Constants ---
|
||||
VIEW_JSON_FILENAME = 'view.json'
|
||||
TAGS_JSON_FILENAME = 'tags.json'
|
||||
DEFAULT_OUTPUT_DIR = './outputs'
|
||||
# Define keys specific to the view.json structure being processed
|
||||
ROOT_KEY = 'root'
|
||||
CHILDREN_KEY = 'children'
|
||||
|
||||
# --- Utility Functions ---
|
||||
|
||||
def extract_mcm_number_from_name(dir_name: str) -> Optional[str]:
|
||||
"""
|
||||
Extracts the MCM number (digits) from a directory name like 'MCM07 Bypass'.
|
||||
Uses the MCM_PREFIX constant from create_tag_structure.
|
||||
|
||||
Args:
|
||||
dir_name: The directory name string.
|
||||
|
||||
Returns:
|
||||
The extracted, zero-padded MCM number string (e.g., "07"), or None.
|
||||
"""
|
||||
# Use case-insensitive search based on the imported prefix
|
||||
match = re.search(rf'{MCM_PREFIX}(\d+)', dir_name, re.IGNORECASE)
|
||||
if match:
|
||||
return match.group(1).zfill(2) # Return zero-padded number
|
||||
return None
|
||||
|
||||
def _save_json(data: Dict[str, Any], file_path: str, indent: int = 2) -> bool:
|
||||
"""Saves dictionary data to a JSON file. Returns True on success, False on error."""
|
||||
try:
|
||||
with open(file_path, 'w') as f:
|
||||
json.dump(data, f, indent=indent)
|
||||
return True
|
||||
except IOError as e:
|
||||
print(f"Error: Could not write to file '{file_path}': {e}")
|
||||
return False
|
||||
except TypeError as e:
|
||||
print(f"Error: Data for '{file_path}' is not JSON serializable: {e}")
|
||||
return False
|
||||
|
||||
def merge_mcm_structures(existing_mcm_struct: Dict[str, Any], new_mcm_struct: Dict[str, Any]):
|
||||
"""
|
||||
Merges device folders and elements from new_mcm_struct into existing_mcm_struct.
|
||||
Assumes both structs represent the same MCM (e.g., same 'name' like 'MCM07').
|
||||
Modifies existing_mcm_struct in place. Uses imported constants for keys.
|
||||
"""
|
||||
# Ensure the new structure has tags to merge
|
||||
if not new_mcm_struct or TAGS_KEY not in new_mcm_struct or not isinstance(new_mcm_struct[TAGS_KEY], list):
|
||||
mcm_name = existing_mcm_struct.get(NAME_KEY, 'Unknown MCM')
|
||||
print(f" - Warning: New structure for {mcm_name} has no tags to merge. Skipping merge operation for this input.")
|
||||
return # Nothing valid to merge from
|
||||
|
||||
# Ensure the existing structure has a tags list to merge into
|
||||
if TAGS_KEY not in existing_mcm_struct or not isinstance(existing_mcm_struct.get(TAGS_KEY), list):
|
||||
mcm_name = existing_mcm_struct.get(NAME_KEY, 'Unknown MCM')
|
||||
print(f" - Warning: Initializing tags list for existing structure '{mcm_name}'.")
|
||||
existing_mcm_struct[TAGS_KEY] = []
|
||||
|
||||
# Create a lookup for existing device folders/tags by name for efficient merging
|
||||
# We only look at the direct children (device folders or direct UDTs like the MCM tag itself)
|
||||
existing_tags_lookup: Dict[str, Dict[str, Any]] = {}
|
||||
for tag in existing_mcm_struct[TAGS_KEY]:
|
||||
if isinstance(tag, dict) and NAME_KEY in tag:
|
||||
existing_tags_lookup[tag[NAME_KEY]] = tag
|
||||
# else: skip malformed tags in existing structure
|
||||
|
||||
# Iterate through tags (device folders or direct UDTs) from the new structure
|
||||
for new_tag in new_mcm_struct[TAGS_KEY]:
|
||||
if not isinstance(new_tag, dict):
|
||||
continue # Skip malformed new tags
|
||||
|
||||
new_tag_name = new_tag.get(NAME_KEY)
|
||||
if not new_tag_name:
|
||||
print(f" - Warning: Skipping tag in new structure due to missing name: {new_tag}")
|
||||
continue # Skip tags without names
|
||||
|
||||
# Check if an item with the same name already exists in the current MCM structure
|
||||
if new_tag_name in existing_tags_lookup:
|
||||
existing_tag = existing_tags_lookup[new_tag_name]
|
||||
new_tag_type = new_tag.get(TAG_TYPE_KEY)
|
||||
existing_tag_type = existing_tag.get(TAG_TYPE_KEY)
|
||||
|
||||
# Merge only if both are Folders and the new folder has tags
|
||||
if (existing_tag_type == FOLDER_TYPE and
|
||||
new_tag_type == FOLDER_TYPE and
|
||||
TAGS_KEY in new_tag and
|
||||
isinstance(new_tag[TAGS_KEY], list)):
|
||||
|
||||
# Ensure the existing folder has a 'tags' list to extend
|
||||
if TAGS_KEY not in existing_tag or not isinstance(existing_tag.get(TAGS_KEY), list):
|
||||
# print(f" - Initializing tags list for existing folder '{new_tag_name}'") # Optional debug
|
||||
existing_tag[TAGS_KEY] = []
|
||||
|
||||
# Append new elements (UDT instances) to the existing folder's tags list
|
||||
if new_tag[TAGS_KEY]: # Only extend if there's something to add
|
||||
# print(f" - Merging folder '{new_tag_name}'. Adding {len(new_tag[TAGS_KEY])} tags.") # Optional debug
|
||||
existing_tag[TAGS_KEY].extend(new_tag[TAGS_KEY])
|
||||
elif existing_tag_type == FOLDER_TYPE and new_tag_type == FOLDER_TYPE:
|
||||
# Both are folders, but the new one has no tags or an invalid tag list
|
||||
# print(f" - Attempted to merge folder '{new_tag_name}', but new tag list was empty or invalid.") # Optional debug
|
||||
pass # No action needed
|
||||
else:
|
||||
# Names match, but types conflict (e.g., Folder vs UDT) or are not folders.
|
||||
# Currently, we prioritize the existing item and log a warning.
|
||||
print(f" - Warning: Conflict for name '{new_tag_name}'. Existing type '{existing_tag_type}' differs from new type '{new_tag_type}' or not mergeable folders. Keeping existing.")
|
||||
else:
|
||||
# New device folder or direct UDT tag, add it to the existing MCM structure
|
||||
# print(f" - Adding new top-level tag/folder '{new_tag_name}' of type '{new_tag.get(TAG_TYPE_KEY)}'") # Optional debug
|
||||
existing_mcm_struct[TAGS_KEY].append(new_tag)
|
||||
# Add to lookup in case multiple view.json files contribute to the *same* new device folder within one run
|
||||
existing_tags_lookup[new_tag_name] = new_tag
|
||||
|
||||
|
||||
# --- Core Processing Functions ---
|
||||
|
||||
def process_view_json(view_file_path: str, mcm_number: str, device_type_override: Optional[str] = None) -> Optional[Tuple[Dict[str, Any], Dict[str, Any]]]:
|
||||
"""
|
||||
Reads a view.json file, validates structure, runs tag generation logic,
|
||||
and returns both the modified view data and the created tag structure.
|
||||
|
||||
Args:
|
||||
view_file_path: Path to the input view.json file.
|
||||
mcm_number: The MCM number for tag generation.
|
||||
device_type_override: Optional device type override.
|
||||
|
||||
Returns:
|
||||
Tuple of (modified_view_data, mcm_tag_structure), or None on error.
|
||||
"""
|
||||
print(f" - Processing: {os.path.basename(view_file_path)}")
|
||||
try:
|
||||
# Read the input JSON file
|
||||
with open(view_file_path, 'r') as f:
|
||||
view_data = json.load(f)
|
||||
|
||||
# Check for the expected 'root'.'children' structure
|
||||
# Use imported constants ROOT_KEY, CHILDREN_KEY
|
||||
root_node = view_data.get(ROOT_KEY)
|
||||
if not isinstance(root_node, dict) or CHILDREN_KEY not in root_node or not isinstance(root_node[CHILDREN_KEY], list):
|
||||
print(f" - Warning: Skipping '{os.path.basename(view_file_path)}'. Invalid or missing '{ROOT_KEY}.{CHILDREN_KEY}' structure.")
|
||||
return None
|
||||
|
||||
# Get the list of elements to process
|
||||
elements_list = root_node[CHILDREN_KEY]
|
||||
|
||||
# --- Run Tag Generation ---
|
||||
# NOTE: generate_tag_paths modifies elements_list in place,
|
||||
# which modifies view_data indirectly.
|
||||
generate_tag_paths(elements_list, mcm_number, device_type_override)
|
||||
|
||||
# create_tag_structure uses the (potentially modified) elements_list
|
||||
# and returns a *new* structure dictionary.
|
||||
mcm_tag_structure = create_tag_structure(elements_list, mcm_number, device_type_override)
|
||||
|
||||
# The modified view data is the original dict which was changed in-place
|
||||
modified_view_data = view_data
|
||||
|
||||
return modified_view_data, mcm_tag_structure
|
||||
|
||||
except FileNotFoundError:
|
||||
print(f" - Error: File not found: '{view_file_path}'. Skipping.")
|
||||
return None
|
||||
except json.JSONDecodeError:
|
||||
print(f" - Error: Invalid JSON in file: '{view_file_path}'. Skipping.")
|
||||
return None
|
||||
except IOError as e:
|
||||
print(f" - Error: Cannot read file '{view_file_path}': {e}. Skipping.")
|
||||
return None
|
||||
except Exception as e:
|
||||
# Catch potential errors within the imported generation functions too
|
||||
print(f" - Error: Unexpected error processing file '{view_file_path}': {str(e)}. Skipping.")
|
||||
return None
|
||||
|
||||
|
||||
def process_mcm_directories(base_dir: str, output_dir: str, device_type_override: Optional[str] = None):
|
||||
"""
|
||||
Scans subdirectories in base_dir, identifies MCM directories, processes
|
||||
their view.json files, accumulates and merges tag structures per MCM,
|
||||
updates view.json files in place, and saves a final tags.json file.
|
||||
"""
|
||||
print(f"\nScanning base directory: '{base_dir}'")
|
||||
processed_view_count = 0
|
||||
skipped_view_count = 0
|
||||
error_saving_view_count = 0
|
||||
skipped_dir_count = 0
|
||||
# Accumulator for merged tag structures, keyed by MCM name (e.g., "MCM07")
|
||||
merged_mcm_structures: Dict[str, Dict[str, Any]] = {}
|
||||
|
||||
try:
|
||||
sub_items = os.listdir(base_dir)
|
||||
except OSError as e:
|
||||
print(f"Error: Cannot list directory '{base_dir}': {e}")
|
||||
sys.exit(1)
|
||||
|
||||
for item_name in sub_items:
|
||||
item_path = os.path.join(base_dir, item_name)
|
||||
|
||||
if os.path.isdir(item_path):
|
||||
print(f"\nProcessing directory: '{item_name}'")
|
||||
mcm_number = extract_mcm_number_from_name(item_name)
|
||||
|
||||
if mcm_number:
|
||||
print(f" - Identified as MCM directory (Number: {mcm_number})")
|
||||
view_json_path = os.path.join(item_path, VIEW_JSON_FILENAME)
|
||||
|
||||
if os.path.exists(view_json_path):
|
||||
# Process the view.json for this MCM directory
|
||||
results = process_view_json(view_json_path, mcm_number, device_type_override)
|
||||
|
||||
if results:
|
||||
modified_view_data, single_mcm_structure = results
|
||||
|
||||
# --- 1. Overwrite the original view.json file ---
|
||||
print(f" - Attempting to save updated '{VIEW_JSON_FILENAME}' to '{item_path}'")
|
||||
if _save_json(modified_view_data, view_json_path):
|
||||
print(f" - Successfully saved updated '{os.path.basename(view_json_path)}'")
|
||||
else:
|
||||
# Error message printed in _save_json
|
||||
print(f" - Failed to save updated '{os.path.basename(view_json_path)}'. Check permissions or disk space.")
|
||||
error_saving_view_count += 1
|
||||
# Continue processing structure even if view save failed? Yes for now.
|
||||
|
||||
# --- 2. Accumulate and merge the generated tag structure ---
|
||||
if single_mcm_structure and NAME_KEY in single_mcm_structure:
|
||||
mcm_key = single_mcm_structure[NAME_KEY] # e.g., "MCM07"
|
||||
print(f" - Processing generated structure for {mcm_key}")
|
||||
if mcm_key in merged_mcm_structures:
|
||||
print(f" - Merging with existing structure for {mcm_key}...")
|
||||
merge_mcm_structures(merged_mcm_structures[mcm_key], single_mcm_structure)
|
||||
else:
|
||||
print(f" - Adding as new structure for {mcm_key}.")
|
||||
merged_mcm_structures[mcm_key] = single_mcm_structure
|
||||
processed_view_count += 1 # Count successful processing of structure
|
||||
else:
|
||||
print(f" - Warning: Generated structure from '{os.path.basename(view_json_path)}' is invalid or missing name. Skipping structure merge.")
|
||||
skipped_view_count += 1
|
||||
else:
|
||||
# Error message printed in process_view_json
|
||||
skipped_view_count += 1
|
||||
print(f" - Skipping processing for directory '{item_name}' due to issues with '{VIEW_JSON_FILENAME}'.")
|
||||
else:
|
||||
print(f" - Warning: '{VIEW_JSON_FILENAME}' not found in directory '{item_name}'. Skipping.")
|
||||
skipped_dir_count += 1
|
||||
else:
|
||||
print(f" - Skipping directory '{item_name}' (Does not match expected '{MCM_PREFIX}[number]' pattern).")
|
||||
skipped_dir_count += 1
|
||||
# else: ignore files directly under base_dir
|
||||
|
||||
# --- Save the final merged structure ---
|
||||
if merged_mcm_structures:
|
||||
print(f"\nConsolidating {len(merged_mcm_structures)} final MCM structures...")
|
||||
# The desired output format is {"tags": [mcm_structure_1, mcm_structure_2, ...]}
|
||||
final_output_object = {TAGS_KEY: list(merged_mcm_structures.values())}
|
||||
|
||||
final_output_path = os.path.join(output_dir, TAGS_JSON_FILENAME)
|
||||
print(f"Attempting to save final merged tag structure to: '{final_output_path}'")
|
||||
if _save_json(final_output_object, final_output_path):
|
||||
print(f"Successfully saved merged tag structure to: '{final_output_path}'")
|
||||
else:
|
||||
# Error message printed in _save_json
|
||||
print(f"Failed to save final '{TAGS_JSON_FILENAME}'. Processing completed with errors.")
|
||||
# Consider exiting with error? For now, just report.
|
||||
else:
|
||||
print(f"\nNo valid MCM structures were generated or merged. No '{TAGS_JSON_FILENAME}' file created.")
|
||||
|
||||
# --- Final Summary ---
|
||||
print(f"\n--- Processing Summary ---")
|
||||
print(f" Directories Scanned: {len(sub_items)}")
|
||||
print(f" MCM Directories Processed: {processed_view_count + skipped_view_count + error_saving_view_count}")
|
||||
print(f" Skipped Directories (No MCM# or No view.json): {skipped_dir_count}")
|
||||
print("-" * 25)
|
||||
print(f" View Files Processed (Structure Generated): {processed_view_count}")
|
||||
print(f" View Files Skipped (Read/Structure Error): {skipped_view_count}")
|
||||
print(f" View Files Updated & Saved In-Place: {processed_view_count + skipped_view_count}") # Saved if processed, even if structure was invalid
|
||||
print(f" Errors Saving Updated View Files: {error_saving_view_count}")
|
||||
print("-" * 25)
|
||||
if merged_mcm_structures:
|
||||
print(f" Final '{TAGS_JSON_FILENAME}' generated in '{output_dir}' with data for {len(merged_mcm_structures)} MCMs.")
|
||||
else:
|
||||
print(f" No final '{TAGS_JSON_FILENAME}' generated.")
|
||||
print("--- End Summary ---")
|
||||
|
||||
|
||||
# --- Main Execution ---
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description=f"Processes '{VIEW_JSON_FILENAME}' files found in MCM-named subdirectories of a base directory. "
|
||||
f"Updates the '{VIEW_JSON_FILENAME}' files in-place and generates a consolidated '{TAGS_JSON_FILENAME}' file.",
|
||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter
|
||||
)
|
||||
parser.add_argument(
|
||||
"base_directory",
|
||||
help=f"Path to the base directory containing MCM subfolders (e.g., 'MCM07 Bypass', 'MCM08 Control')."
|
||||
)
|
||||
parser.add_argument(
|
||||
"-d", "--device-type",
|
||||
dest="device_type_override",
|
||||
default=None,
|
||||
help="Optional. If provided, overrides automatic device type detection for ALL elements in ALL processed views."
|
||||
)
|
||||
parser.add_argument(
|
||||
"-o", "--output-dir",
|
||||
default=DEFAULT_OUTPUT_DIR,
|
||||
help=f"Directory where the final '{TAGS_JSON_FILENAME}' will be saved."
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
# --- Validate Arguments ---
|
||||
# Check base directory
|
||||
if not os.path.isdir(args.base_directory):
|
||||
print(f"Error: Base directory not found or is not a directory: '{args.base_directory}'")
|
||||
sys.exit(1)
|
||||
|
||||
# Check and create output directory
|
||||
output_dir = args.output_dir
|
||||
if not os.path.exists(output_dir):
|
||||
print(f"Output directory '{output_dir}' does not exist. Attempting to create it...")
|
||||
try:
|
||||
os.makedirs(output_dir)
|
||||
print(f"Successfully created output directory: '{output_dir}'")
|
||||
except OSError as e:
|
||||
print(f"Error: Could not create output directory '{output_dir}': {e}")
|
||||
sys.exit(1)
|
||||
elif not os.path.isdir(output_dir):
|
||||
print(f"Error: Specified output path '{output_dir}' exists but is not a directory.")
|
||||
sys.exit(1)
|
||||
|
||||
# --- Run Processing ---
|
||||
process_mcm_directories(args.base_directory, output_dir, args.device_type_override)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
129
tag_transformer.py
Normal file
129
tag_transformer.py
Normal file
@ -0,0 +1,129 @@
|
||||
#!/usr/bin/env python3
|
||||
import json
|
||||
import sys
|
||||
import re
|
||||
from typing import List, Dict, Any
|
||||
|
||||
def determine_device_type(element: Dict[str, Any]) -> str:
|
||||
"""
|
||||
Determine the device type based on the element's props.path.
|
||||
|
||||
Args:
|
||||
element: The element dictionary.
|
||||
|
||||
Returns:
|
||||
The determined device type (e.g., "ControlCabinet") or "Status" if path is invalid.
|
||||
Returns "" for elements named MCM or MCMxx.
|
||||
"""
|
||||
# Keep special case for MCM elements based on name
|
||||
element_name = element.get('meta', {}).get('name')
|
||||
if element_name and (element_name == "MCM" or re.match(r'^MCM\d+$', element_name)):
|
||||
return ""
|
||||
|
||||
# Determine type from props.path
|
||||
element_path = element.get('props', {}).get('path')
|
||||
|
||||
if isinstance(element_path, str) and element_path:
|
||||
# Extract the last part of the path
|
||||
# Handles both / and \ separators implicitly with split
|
||||
parts = element_path.replace('\\', '/').split('/')
|
||||
if parts:
|
||||
device_type = parts[-1]
|
||||
# Optional: Sanitize if needed, but often the raw name is fine
|
||||
if device_type: # Ensure it's not an empty string
|
||||
return device_type
|
||||
|
||||
# Fallback if path is missing, empty, or parsing fails
|
||||
return "Status"
|
||||
|
||||
def generate_tag_paths(
|
||||
elements: List[Dict[str, Any]],
|
||||
mcm_number: str,
|
||||
device_type: str = None
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Transform the tagProps in each element based on MCM number, device type and element name.
|
||||
Modifies the input list in place.
|
||||
|
||||
Args:
|
||||
elements: List of element dictionaries (e.g., from root['children']).
|
||||
mcm_number: The MCM number (will be zero-padded to 2 digits)
|
||||
device_type: Optional override for device type. If None, it's determined from element path.
|
||||
|
||||
Returns:
|
||||
The same list of elements, modified in place.
|
||||
"""
|
||||
updated_elements = elements # Work directly on the passed list
|
||||
|
||||
mcm_formatted = f"MCM{mcm_number.zfill(2)}"
|
||||
|
||||
for element in updated_elements:
|
||||
if 'meta' in element and element['meta'].get('name') == "Image":
|
||||
continue
|
||||
|
||||
if 'meta' in element and 'name' in element['meta'] and \
|
||||
'props' in element and 'params' in element['props'] and \
|
||||
'tagProps' in element['props']['params']:
|
||||
|
||||
element_name = element['meta']['name']
|
||||
|
||||
# Determine device type: Use override if provided, else determine from element
|
||||
actual_device_type = device_type
|
||||
if not actual_device_type:
|
||||
actual_device_type = determine_device_type(element)
|
||||
|
||||
# Generate the base path
|
||||
if actual_device_type == "": # Handles the MCM case
|
||||
base_path = f"System/{mcm_formatted}/{element_name}"
|
||||
else:
|
||||
base_path = f"System/{mcm_formatted}/{actual_device_type}/{element_name}"
|
||||
|
||||
# Update the first element if the list is not empty
|
||||
tag_props = element['props']['params']['tagProps']
|
||||
if tag_props and len(tag_props) > 0:
|
||||
tag_props[0] = base_path
|
||||
else:
|
||||
# Optional: Decide if logging for missing structure is needed
|
||||
pass
|
||||
|
||||
return updated_elements
|
||||
|
||||
def main():
|
||||
# Check if filename and MCM number are provided as command line arguments
|
||||
if len(sys.argv) < 3:
|
||||
print("Usage: python tag_transformer.py <input_file.json> <mcm_number> [device_type]")
|
||||
print("Example: python tag_transformer.py input.json 07 Conveyor")
|
||||
print("If device_type is not provided, it will be determined automatically from element names.")
|
||||
sys.exit(1)
|
||||
|
||||
input_file = sys.argv[1]
|
||||
mcm_number = sys.argv[2]
|
||||
|
||||
# Device type is optional now
|
||||
device_type = None
|
||||
if len(sys.argv) >= 4:
|
||||
device_type = sys.argv[3]
|
||||
|
||||
try:
|
||||
# Read the input JSON file
|
||||
with open(input_file, 'r') as f:
|
||||
data = json.load(f)
|
||||
|
||||
# Transform the data
|
||||
result = generate_tag_paths(data, mcm_number, device_type)
|
||||
|
||||
# Output the result as JSON to stdout
|
||||
print(json.dumps(result, indent=2))
|
||||
|
||||
except FileNotFoundError:
|
||||
print(f"Error: File '{input_file}' not found.")
|
||||
sys.exit(1)
|
||||
except json.JSONDecodeError:
|
||||
print(f"Error: File '{input_file}' is not valid JSON.")
|
||||
sys.exit(1)
|
||||
except Exception as e:
|
||||
print(f"Error: {str(e)}")
|
||||
sys.exit(1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
68420
tags.json/tags.json
Normal file
68420
tags.json/tags.json
Normal file
File diff suppressed because it is too large
Load Diff
Loading…
x
Reference in New Issue
Block a user