Created Project

This commit is contained in:
gigi.mamaladze 2025-06-23 17:38:52 +04:00
commit a70479208a
15 changed files with 241441 additions and 0 deletions

87
README.md Normal file
View File

@ -0,0 +1,87 @@
# TagProps Transformer
This tool transforms tag properties in JSON files by replacing the first value in the `tagProps` array with a path based on the element's name, an MCM number, and a device type.
## MCM Folder Organization
The tool can also organize tags according to MCMs (Master Control Modules) in your Ignition project. This process will:
1. Process all MCM folders in your Detailed-Views directory
2. Update view.json files with proper tag paths
3. Generate a consolidated tags.json file with all MCM tag structures
### Usage for MCM Organization
```bash
python process_tags.py "C:\Program Files\Inductive Automation\Ignition\data\projects\YOUR_PROJECT\com.inductiveautomation.perspective\views\Detailed-Views" -o "outputs"
```
Replace `YOUR_PROJECT` with your actual Ignition project name.
### Example:
```bash
python process_tags.py "C:\Program Files\Inductive Automation\Ignition\data\projects\MTN6_SCADA\com.inductiveautomation.perspective\views\Detailed-Views" -o "outputs"
```
This will:
1. Scan all MCM folders in the Detailed-Views directory
2. Process each view.json file in the MCM folders
3. Update the view.json files with proper tag paths
4. Generate a consolidated tags.json file in the outputs directory
### Output Structure
The generated tags.json file will contain:
- All MCMs from your Detailed-Views folder
- Properly organized tag structures for each MCM
- Merged tags for MCMs that have multiple folders (e.g., MCM02 with both "Fluid Inbound Merges 5-7" and "Fluid Inbound Upper")
## Single File Transformation
For transforming individual JSON files:
```bash
python tag_transformer.py <input_file.json> <mcm_number> [device_type]
```
### Example:
```bash
python tag_transformer.py sample_input.json 07 Conveyor
```
This will:
1. Read the input JSON file (`sample_input.json`)
2. For each element in the JSON, replace only the first value in the `tagProps` array with `System/MCM07/Conveyor/[ElementName]`
3. Output the transformed JSON to the console
### Auto Device Type Detection
If you don't specify a device type, the script will automatically determine the device type based on the element name:
- If the name contains `EPC` followed by "End" (e.g., `EPCEnd`, `EPC_End`, `EPC1_End`, `EPC2_End_1`), the device type will be `EPC_End`
- If the name contains `EPC` followed by a number (e.g., `EPC1`, `EPC2`, `EPC_1`), the device type will be `EPC`
- If the name contains `EPC` followed by "Line" (e.g., `EPCLine`, `EPC_Line`), the device type will be `EPC_Line`
- For all other cases, the device type will be `status`
Example:
```bash
python tag_transformer.py sample_input.json 07
```
## Output Format
The output will be a valid JSON array with the same structure as the input, but with the first value in each `tagProps` array replaced with the generated path.
## Saving Output to a File
To save the output to a file, you can redirect the console output:
```bash
python tag_transformer.py sample_input.json 07 Conveyor > output.json
```
## Requirements
- Python 3.6+
- No additional dependencies required

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

229
check_tags.py Normal file
View File

@ -0,0 +1,229 @@
# Filename: check_tags.py
import json
import sys
import os
import argparse
from typing import List, Dict, Any, Set, Optional
# --- Constants ---
VIEW_JSON_FILENAME = 'view.json'
TAGS_JSON_FILENAME = 'tags.json' # Used in help text
TAGS = 'tags'
ROOT = 'root'
CHILDREN = 'children'
META = 'meta'
NAME = 'name'
TAG_TYPE = 'tagType'
FOLDER = 'Folder'
UDT_INSTANCE = 'UdtInstance'
IMAGE_NAME = 'Image'
SOURCE = 'source' # Key for missing element dict
# --- File System Functions ---
def find_view_files(base_dir: str) -> Optional[List[str]]:
"""
Recursively finds all view.json files within the base directory.
Returns None if the base directory is invalid.
"""
if not base_dir or not os.path.isdir(base_dir):
print(f"Error: Base directory '{base_dir}' not found, is invalid, or is not a directory.")
return None # Indicate error
view_files = []
print(f"Scanning for '{VIEW_JSON_FILENAME}' in '{base_dir}'...")
for root_dir, _, files in os.walk(base_dir):
if VIEW_JSON_FILENAME in files:
view_files.append(os.path.join(root_dir, VIEW_JSON_FILENAME))
print(f"Found {len(view_files)} '{VIEW_JSON_FILENAME}' files.")
return view_files
# --- Data Extraction Functions ---
def extract_expected_elements(view_files: List[str]) -> Dict[str, str]:
"""
Parses view.json files and extracts names of elements expected to have tags.
Filters out elements named 'Image'. Handles duplicates by keeping the first encountered.
Returns a dictionary mapping element names to their source view.json file path.
"""
expected_elements_map = {} # {element_name: source_file_path}
print(f"\nExtracting expected element names from {len(view_files)} files...")
processed_count = 0
skipped_structure_count = 0
error_count = 0
for file_path in view_files:
try:
with open(file_path, 'r') as f:
data = json.load(f)
# Validate basic structure needed to find elements
root_node = data.get(ROOT)
if not isinstance(root_node, dict):
print(f"Warning: Skipping '{file_path}': Missing or invalid '{ROOT}' node.")
skipped_structure_count += 1
continue
elements = root_node.get(CHILDREN)
if not isinstance(elements, list):
print(f"Warning: Skipping '{file_path}': '{ROOT}.{CHILDREN}' is missing or not a list.")
skipped_structure_count += 1
continue
found_in_file_count = 0
for element in elements:
# Ensure element is a dictionary before trying to access keys
if not isinstance(element, dict):
continue
element_meta = element.get(META, {})
element_name = element_meta.get(NAME)
# Element should have a name and not be an "Image" to be considered
if element_name and element_name != IMAGE_NAME:
if element_name in expected_elements_map:
# Log duplicate but keep the first one encountered
print(f"Warning: Duplicate element name '{element_name}' found. Original source: '{expected_elements_map[element_name]}', New source: '{file_path}'. Keeping first occurrence.")
else:
expected_elements_map[element_name] = file_path
found_in_file_count += 1
if found_in_file_count > 0:
processed_count += 1
# If found_in_file_count is 0, it means the file was processed but contained
# only skipped elements (like Image) or elements without names. We don't
# count this explicitly as skipped_structure or error.
except json.JSONDecodeError:
print(f"Warning: Skipping '{file_path}': Invalid JSON.")
error_count += 1
except IOError as e:
print(f"Warning: Skipping '{file_path}': IO Error reading file - {e}")
error_count += 1
except Exception as e:
print(f"Warning: Skipping '{file_path}': Unexpected error processing file - {e}")
error_count += 1
total_skipped_or_error = skipped_structure_count + error_count
print(f"Extraction complete. Found {len(expected_elements_map)} unique expected elements.")
print(f" Processed {processed_count} files containing valid elements.")
if total_skipped_or_error > 0:
print(f" Skipped {skipped_structure_count} files due to structural issues.")
print(f" Encountered errors in {error_count} files.")
return expected_elements_map
def _find_udt_instances_recursive(items: List[Dict[str, Any]], found_names: Set[str]):
"""Helper function to recursively find UDT instance names within a list of tags/folders."""
if not isinstance(items, list):
# This case might occur if a folder's 'tags' key is not a list. Log or handle as needed.
# print(f"Warning: Expected a list of tags/folders, but got {type(items)}. Skipping.")
return # Stop recursion for this branch
for item in items:
if not isinstance(item, dict):
continue # Skip malformed items
item_tag_type = item.get(TAG_TYPE)
item_name = item.get(NAME)
if item_tag_type == UDT_INSTANCE and item_name:
found_names.add(item_name)
elif item_tag_type == FOLDER:
# Recursively check the tags within this folder
folder_tags = item.get(TAGS, [])
_find_udt_instances_recursive(folder_tags, found_names)
def extract_existing_tag_names(tags_file_path: str) -> Optional[Set[str]]:
"""
Parses the tags JSON file and extracts names of all UDT instances.
Handles nested structures (MCM folders, device folders).
Returns a set of names, or None if the file cannot be processed.
"""
existing_names = set()
print(f"\nLoading existing UDT instance names from '{tags_file_path}'...")
try:
with open(tags_file_path, 'r') as f:
data = json.load(f)
# Expecting a top-level object like {"tags": [...list of MCM folders...]}
if not isinstance(data, dict) or TAGS not in data or not isinstance(data[TAGS], list):
print(f"Error: '{tags_file_path}' does not contain a top-level '{TAGS}' key with a list value.")
return None # Indicate error
top_level_tags = data[TAGS]
# Start the recursive search from the list of MCM folders
_find_udt_instances_recursive(top_level_tags, existing_names)
print(f"Found {len(existing_names)} unique UDT instance names in '{os.path.basename(tags_file_path)}'.")
return existing_names
except FileNotFoundError:
print(f"Error: Tags file '{tags_file_path}' not found.")
return None
except json.JSONDecodeError:
print(f"Error: Tags file '{tags_file_path}' is not valid JSON.")
return None
except IOError as e:
print(f"Error: IO Error reading tags file '{tags_file_path}' - {e}")
return None
except Exception as e:
print(f"Error: Unexpected error reading tags file '{tags_file_path}' - {e}")
return None # Indicate error
def main():
parser = argparse.ArgumentParser(
description=f"Compares element names from '{VIEW_JSON_FILENAME}' files against UDT Instances in a '{TAGS_JSON_FILENAME}' file.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument("tags_file", help=f"Path to the generated {TAGS_JSON_FILENAME} file.")
parser.add_argument("base_directory", help=f"Path to the base directory containing project subfolders (which contain '{VIEW_JSON_FILENAME}' files). Example: './Detailed-Views'")
args = parser.parse_args()
# 1. Find all view.json files
view_files = find_view_files(args.base_directory)
if view_files is None: # Error handled in function
sys.exit(1)
if not view_files:
print("No view.json files found in the specified directory. Exiting.")
sys.exit(0) # Not necessarily an error if the directory was empty
# 2. Extract expected elements from view.json files
expected_elements = extract_expected_elements(view_files)
if not expected_elements:
print("No elements suitable for tag generation were found in any view.json files. Exiting.")
sys.exit(0) # Not an error if views only contained Images etc.
# 3. Extract existing tag names from tags.json
existing_tag_names = extract_existing_tag_names(args.tags_file)
if existing_tag_names is None:
# Error messages printed within the function
print("Failed to load existing tags. Exiting.")
sys.exit(1)
# No need to check if empty, comparison will handle it.
# 4. Compare and report missing elements
print("\nComparing expected elements against existing UDT instances...")
missing_elements = []
for name, source_file in expected_elements.items():
if name not in existing_tag_names:
# Use SOURCE constant for the dictionary key
missing_elements.append({NAME: name, SOURCE: source_file})
print("-" * 40)
if not missing_elements:
print("Success! All expected elements were found as UDT Instances in the tags file.")
exit_code = 0
else:
print(f"Check Failed: Found {len(missing_elements)} missing UDT Instances:")
for missing in missing_elements:
# Use constants for accessing dictionary keys
print(f" - Name: '{missing[NAME]}', Expected Source: '{missing[SOURCE]}'" )
print("\nPlease ensure these elements were processed correctly or excluded intentionally.")
exit_code = 1 # Indicate failure
print("-" * 40)
sys.exit(exit_code)
if __name__ == "__main__":
main()

127
compare_json.py Normal file
View File

@ -0,0 +1,127 @@
#!/usr/bin/env python3
import json
import sys
import argparse
import os
from typing import Set, List, Dict, Any, Optional
# --- Constants (mirroring those used in tag structure) ---
TAGS = 'tags'
NAME = 'name'
TAG_TYPE = 'tagType'
FOLDER = 'Folder'
UDT_INSTANCE = 'UdtInstance'
def load_json_file(file_path: str) -> Optional[Dict[str, Any]]:
"""Loads JSON data from a file path. Returns the loaded dict or None on error."""
if not os.path.exists(file_path):
print(f"Error: File not found: '{file_path}'", file=sys.stderr)
return None
try:
with open(file_path, 'r') as f:
data = json.load(f)
# Basic validation that it's a dictionary (expected top level)
if not isinstance(data, dict):
print(f"Error: Expected JSON root to be an object/dictionary in '{file_path}'", file=sys.stderr)
return None
return data
except json.JSONDecodeError:
print(f"Error: Invalid JSON format in file: '{file_path}'", file=sys.stderr)
return None
except IOError as e:
print(f"Error reading file '{file_path}': {e}", file=sys.stderr)
return None
except Exception as e:
print(f"An unexpected error occurred while loading '{file_path}': {e}", file=sys.stderr)
return None
def _find_udt_instances_recursive(items: List[Dict[str, Any]], found_names: Set[str]):
"""Helper function to recursively find UDT instance names within a list of tags/folders."""
# Copy logic from check_tags.py
if not isinstance(items, list):
return # Stop recursion for this branch
for item in items:
if not isinstance(item, dict):
continue # Skip malformed items
item_tag_type = item.get(TAG_TYPE)
item_name = item.get(NAME)
if item_tag_type == UDT_INSTANCE and item_name:
found_names.add(item_name)
elif item_tag_type == FOLDER:
# Recursively check the tags within this folder
folder_tags = item.get(TAGS, [])
_find_udt_instances_recursive(folder_tags, found_names)
def extract_udt_names(file_path: str) -> Optional[Set[str]]:
"""
Loads a tags JSON file and extracts the names of all UDT instances.
Returns a set of names, or None if the file cannot be processed.
"""
print(f"Extracting UDT names from: {file_path}")
data = load_json_file(file_path)
if data is None:
return None
# Expecting a top-level object like {"tags": [...list of MCM folders...]}
if TAGS not in data or not isinstance(data.get(TAGS), list):
print(f"Error: '{file_path}' does not contain a top-level '{TAGS}' key with a list value.", file=sys.stderr)
return None
udt_names = set()
top_level_tags = data[TAGS]
# Start the recursive search from the list of MCM folders/tags
_find_udt_instances_recursive(top_level_tags, udt_names)
print(f" Found {len(udt_names)} UDT instance names.")
return udt_names
def main():
parser = argparse.ArgumentParser(
description="Compares the set of UDT Instance names found in two tags.json files.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument("file1", help="Path to the first JSON tags file (e.g., baseline).")
parser.add_argument("file2", help="Path to the second JSON tags file (e.g., new result).")
args = parser.parse_args()
print(f"Comparing UDT Instance names in:\n File 1: {args.file1}\n File 2: {args.file2}\n")
# Extract UDT names from both files
names1 = extract_udt_names(args.file1)
if names1 is None:
sys.exit(1)
names2 = extract_udt_names(args.file2)
if names2 is None:
sys.exit(1)
# Compare the sets of names
print("\n--- Comparison Results ---")
if names1 == names2:
print(f"Success: Both files contain the exact same {len(names1)} UDT instance names.")
sys.exit(0)
else:
print("Difference Found: The set of UDT instance names differs between the files.")
missing_in_file2 = sorted(list(names1 - names2))
added_in_file2 = sorted(list(names2 - names1))
if missing_in_file2:
print(f"\nNames in '{os.path.basename(args.file1)}' but NOT in '{os.path.basename(args.file2)}' ({len(missing_in_file2)}):")
for name in missing_in_file2:
print(f" - {name}")
if added_in_file2:
print(f"\nNames in '{os.path.basename(args.file2)}' but NOT in '{os.path.basename(args.file1)}' ({len(added_in_file2)}):")
for name in added_in_file2:
print(f" - {name}")
sys.exit(1) # Exit with a non-zero code to indicate difference
if __name__ == "__main__":
main()

117
compare_views.py Normal file
View File

@ -0,0 +1,117 @@
#!/usr/bin/env python3
import json
import sys
import argparse
import os
from typing import Dict, Any, Optional
# Constants
VIEW_JSON_FILENAME = 'view.json'
def load_json_file(file_path: str) -> Optional[Dict[str, Any]]:
"""Loads JSON data from a file path. Returns the loaded dict or None on error."""
if not os.path.exists(file_path):
# This might be expected if a corresponding file doesn't exist in the baseline
# print(f"Warning: File not found: '{file_path}'", file=sys.stderr)
return None
try:
with open(file_path, 'r') as f:
data = json.load(f)
if not isinstance(data, dict):
print(f"Error: Expected JSON root to be an object/dictionary in '{file_path}'", file=sys.stderr)
return None # Treat as error
return data
except json.JSONDecodeError:
print(f"Error: Invalid JSON format in file: '{file_path}'", file=sys.stderr)
return None # Treat as error
except IOError as e:
print(f"Error reading file '{file_path}': {e}", file=sys.stderr)
return None # Treat as error
except Exception as e:
print(f"An unexpected error occurred while loading '{file_path}': {e}", file=sys.stderr)
return None # Treat as error
def compare_directories(dir1: str, dir2: str):
"""Compares view.json files found in subdirectories of dir1 with corresponding files in dir2."""
print(f"Comparing '{VIEW_JSON_FILENAME}' files:")
print(f" Directory 1 (Modified): {dir1}")
print(f" Directory 2 (Baseline): {dir2}\n")
differences_found = 0
files_compared = 0
errors_loading = 0
baseline_missing = 0
for root, _, files in os.walk(dir1):
if VIEW_JSON_FILENAME in files:
relative_path = os.path.relpath(root, dir1)
file1_path = os.path.join(root, VIEW_JSON_FILENAME)
file2_path = os.path.join(dir2, relative_path, VIEW_JSON_FILENAME)
print(f"Comparing: {os.path.join(relative_path, VIEW_JSON_FILENAME)}")
files_compared += 1
data1 = load_json_file(file1_path)
data2 = load_json_file(file2_path)
if data1 is None:
print(f" - Error loading modified file: {file1_path}")
errors_loading += 1
continue # Cannot compare if modified file fails
if data2 is None:
# Check if the baseline file *should* exist
if os.path.exists(os.path.dirname(file2_path)):
print(f" - Baseline file missing or errored: {file2_path}")
baseline_missing += 1
# Decide if this counts as a difference? Yes, likely.
differences_found += 1
else:
print(f" - Baseline directory structure missing for: {os.path.join(relative_path, VIEW_JSON_FILENAME)}. Cannot compare.")
baseline_missing += 1 # Count as missing baseline structure
continue # Cannot compare if baseline is missing/errored
# Perform the comparison
if data1 == data2:
print(" - Identical")
else:
print(" - DIFFERENCE FOUND")
differences_found += 1
print("\n--- Comparison Summary ---")
print(f" View files compared: {files_compared}")
print(f" Files with differences: {differences_found}")
print(f" Errors loading modified files: {errors_loading}")
print(f" Baseline files missing/errored: {baseline_missing}")
if differences_found == 0 and errors_loading == 0 and baseline_missing == 0:
print("\nSuccess: All corresponding view.json files are logically identical.")
return 0 # Exit code 0 for success
else:
print("\nCheck Complete: Differences or errors were encountered.")
return 1 # Exit code 1 for differences/errors
def main():
parser = argparse.ArgumentParser(
description=f"Compares {VIEW_JSON_FILENAME} files between two directory structures.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument("dir1", help="Path to the first directory (e.g., the modified one).")
parser.add_argument("dir2", help="Path to the second directory (e.g., the baseline/original one).")
args = parser.parse_args()
# Validate directories exist
if not os.path.isdir(args.dir1):
print(f"Error: Directory 1 not found: '{args.dir1}'", file=sys.stderr)
sys.exit(1)
if not os.path.isdir(args.dir2):
print(f"Error: Directory 2 not found: '{args.dir2}'", file=sys.stderr)
sys.exit(1)
exit_code = compare_directories(args.dir1, args.dir2)
sys.exit(exit_code)
if __name__ == "__main__":
main()

200
create_tag_structure.py Normal file
View File

@ -0,0 +1,200 @@
#!/usr/bin/env python3
import json
import sys
import re
from typing import List, Dict, Any, Optional
# --- Constants for dictionary keys and common values ---
META = 'meta'
NAME = 'name'
PROPS = 'props'
PATH = 'path'
TAGS = 'tags'
TAG_TYPE = 'tagType'
TYPE_ID = 'typeId'
VALUE = 'value'
FOLDER = 'Folder'
UDT_INSTANCE = 'UdtInstance'
ATOMIC_TAG = 'AtomicTag'
UDT_TYPE_ID = 'TagProp' # Specific type ID for these UDTs
IMAGE_NAME = 'Image'
ALARMST_TAG = 'ALARMST'
STATE_TAG = 'STATE'
DCN_TAG = 'DCN'
MCM_PREFIX = 'MCM'
STATUS_DEVICE_TYPE = 'Status'
MCM_DEVICE_TYPE_INDICATOR = "" # Indicates the element is the MCM itself
# --- Helper function to create a standard UDT instance ---
def _create_udt_instance(element_name: str) -> Dict[str, Any]:
"""Creates the dictionary for a standard UDT instance tag."""
return {
NAME: element_name,
TYPE_ID: UDT_TYPE_ID,
TAG_TYPE: UDT_INSTANCE,
TAGS: [
{NAME: ALARMST_TAG, VALUE: 0, TAG_TYPE: ATOMIC_TAG},
{NAME: STATE_TAG, TAG_TYPE: ATOMIC_TAG},
{NAME: DCN_TAG, TAG_TYPE: ATOMIC_TAG},
]
}
# --- Device Type Determination ---
def determine_device_type(element: Dict[str, Any]) -> str:
"""
Determine the device type based on the element's name or props.path.
Args:
element: The element dictionary.
Returns:
The determined device type (e.g., "ControlCabinet"),
MCM_DEVICE_TYPE_INDICATOR ("") for elements named MCM/MCMxx,
or STATUS_DEVICE_TYPE if path is invalid or missing.
"""
element_name = element.get(META, {}).get(NAME)
# Special case: If element name is MCM or MCMxx, it doesn't belong in a subfolder.
if element_name and (element_name == MCM_PREFIX or re.match(rf'^{MCM_PREFIX}\d+$', element_name)):
return MCM_DEVICE_TYPE_INDICATOR # Return the indicator for MCM elements
# Determine type from props.path otherwise
element_path = element.get(PROPS, {}).get(PATH)
if isinstance(element_path, str) and element_path:
# Normalize path separators and split
parts = element_path.replace('\\', '/').split('/')
# The device type is typically the last part of the path
if parts and parts[-1]:
return parts[-1]
# Fallback if path is missing, empty, or parsing fails
return STATUS_DEVICE_TYPE
# --- Main Tag Structure Creation Logic ---
def create_tag_structure(
elements: List[Dict[str, Any]],
mcm_number: str,
device_type_override: Optional[str] = None
) -> Dict[str, Any]:
"""
Create a hierarchical tag structure with MCM as the top folder,
device types as subfolders, and elements as UDT instances with atomic tags.
Args:
elements: List of element dictionaries from the JSON.
mcm_number: The MCM number (will be zero-padded to 2 digits).
device_type_override: Optional override for device type. If None,
it's determined from the element.
Returns:
A dictionary representing the structured tags for the MCM.
"""
mcm_formatted = f"{MCM_PREFIX}{mcm_number.zfill(2)}"
# Initialize the root structure for this MCM
tag_structure = {
NAME: mcm_formatted,
TAG_TYPE: FOLDER,
TAGS: []
}
# Keep track of device folders already created to avoid duplicates
device_folders: Dict[str, Dict[str, Any]] = {}
for element in elements:
element_meta = element.get(META, {})
element_name = element_meta.get(NAME)
# Skip elements named "Image" or elements without a name
if not element_name or element_name == IMAGE_NAME:
continue
# Determine the device type for this element
actual_device_type = device_type_override
if actual_device_type is None: # Check for None explicitly
actual_device_type = determine_device_type(element)
# Create the UDT instance tag for the element
element_tag = _create_udt_instance(element_name)
# --- Add the element tag to the structure ---
# Case 1: Element is the MCM itself (no device subfolder)
if actual_device_type == MCM_DEVICE_TYPE_INDICATOR:
tag_structure[TAGS].append(element_tag)
# Case 2: Element belongs in a device type subfolder
else:
# Create the device type folder if it doesn't exist yet
if actual_device_type not in device_folders:
device_folder = {
NAME: actual_device_type,
TAG_TYPE: FOLDER,
TAGS: []
}
device_folders[actual_device_type] = device_folder
# Add the new folder to the main MCM structure
tag_structure[TAGS].append(device_folder)
# Add the element tag to its corresponding device folder
device_folders[actual_device_type][TAGS].append(element_tag)
return tag_structure
# --- Main Execution Block ---
def main():
# Check if filename and MCM number are provided
if len(sys.argv) < 3:
print(f"Usage: python {os.path.basename(__file__)} <input_file.json> <mcm_number> [device_type_override]")
print(f"Example: python {os.path.basename(__file__)} input.json 07 Conveyor")
print("If device_type_override is not provided, it will be determined automatically.")
sys.exit(1)
input_file = sys.argv[1]
mcm_number = sys.argv[2]
# Device type override is optional
device_type_override = None
if len(sys.argv) >= 4:
device_type_override = sys.argv[3]
try:
# Read the input JSON file
with open(input_file, 'r') as f:
# Assuming the input file contains the list of elements directly
# If it contains the {'root': {'children': [...]}} structure, load that first
raw_data = json.load(f)
# Adjust based on actual input structure if necessary:
if isinstance(raw_data, dict) and 'root' in raw_data and 'children' in raw_data['root']:
elements_data = raw_data['root']['children']
if not isinstance(elements_data, list):
print(f"Error: 'root.children' in '{input_file}' is not a list.")
sys.exit(1)
elif isinstance(raw_data, list):
elements_data = raw_data # Assume list is the elements directly
else:
print(f"Error: Unexpected JSON structure in '{input_file}'. Expected a list of elements or {{'root': {{'children': [...]}}}}.")
sys.exit(1)
# Create the tag structure
result = create_tag_structure(elements_data, mcm_number, device_type_override)
# Output the result as JSON to stdout
print(json.dumps(result, indent=2))
except FileNotFoundError:
print(f"Error: File '{input_file}' not found.")
sys.exit(1)
except json.JSONDecodeError:
print(f"Error: File '{input_file}' is not valid JSON.")
sys.exit(1)
except Exception as e:
print(f"An unexpected error occurred: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()

68440
output/tags.json Normal file

File diff suppressed because it is too large Load Diff

54008
outputs/tags.json Normal file

File diff suppressed because it is too large Load Diff

49336
outputs/tags_baseline.json Normal file

File diff suppressed because it is too large Load Diff

348
process_tags.py Normal file
View File

@ -0,0 +1,348 @@
#!/usr/bin/env python3
import json
import sys
import os
import re
import argparse
from typing import Optional, Tuple, Dict, Any, List
# Assuming these modules are in the same directory or Python path
try:
from tag_transformer import generate_tag_paths
# Import the refactored create_tag_structure and its constants
from create_tag_structure import create_tag_structure, TAGS as TAGS_KEY, NAME as NAME_KEY, TAG_TYPE as TAG_TYPE_KEY, FOLDER as FOLDER_TYPE, MCM_PREFIX
except ImportError as e:
print(f"Error: Failed to import required modules. Make sure 'tag_transformer.py' and 'create_tag_structure.py' are accessible. Details: {e}")
sys.exit(1)
# --- Constants ---
VIEW_JSON_FILENAME = 'view.json'
TAGS_JSON_FILENAME = 'tags.json'
DEFAULT_OUTPUT_DIR = './outputs'
# Define keys specific to the view.json structure being processed
ROOT_KEY = 'root'
CHILDREN_KEY = 'children'
# --- Utility Functions ---
def extract_mcm_number_from_name(dir_name: str) -> Optional[str]:
"""
Extracts the MCM number (digits) from a directory name like 'MCM07 Bypass'.
Uses the MCM_PREFIX constant from create_tag_structure.
Args:
dir_name: The directory name string.
Returns:
The extracted, zero-padded MCM number string (e.g., "07"), or None.
"""
# Use case-insensitive search based on the imported prefix
match = re.search(rf'{MCM_PREFIX}(\d+)', dir_name, re.IGNORECASE)
if match:
return match.group(1).zfill(2) # Return zero-padded number
return None
def _save_json(data: Dict[str, Any], file_path: str, indent: int = 2) -> bool:
"""Saves dictionary data to a JSON file. Returns True on success, False on error."""
try:
with open(file_path, 'w') as f:
json.dump(data, f, indent=indent)
return True
except IOError as e:
print(f"Error: Could not write to file '{file_path}': {e}")
return False
except TypeError as e:
print(f"Error: Data for '{file_path}' is not JSON serializable: {e}")
return False
def merge_mcm_structures(existing_mcm_struct: Dict[str, Any], new_mcm_struct: Dict[str, Any]):
"""
Merges device folders and elements from new_mcm_struct into existing_mcm_struct.
Assumes both structs represent the same MCM (e.g., same 'name' like 'MCM07').
Modifies existing_mcm_struct in place. Uses imported constants for keys.
"""
# Ensure the new structure has tags to merge
if not new_mcm_struct or TAGS_KEY not in new_mcm_struct or not isinstance(new_mcm_struct[TAGS_KEY], list):
mcm_name = existing_mcm_struct.get(NAME_KEY, 'Unknown MCM')
print(f" - Warning: New structure for {mcm_name} has no tags to merge. Skipping merge operation for this input.")
return # Nothing valid to merge from
# Ensure the existing structure has a tags list to merge into
if TAGS_KEY not in existing_mcm_struct or not isinstance(existing_mcm_struct.get(TAGS_KEY), list):
mcm_name = existing_mcm_struct.get(NAME_KEY, 'Unknown MCM')
print(f" - Warning: Initializing tags list for existing structure '{mcm_name}'.")
existing_mcm_struct[TAGS_KEY] = []
# Create a lookup for existing device folders/tags by name for efficient merging
# We only look at the direct children (device folders or direct UDTs like the MCM tag itself)
existing_tags_lookup: Dict[str, Dict[str, Any]] = {}
for tag in existing_mcm_struct[TAGS_KEY]:
if isinstance(tag, dict) and NAME_KEY in tag:
existing_tags_lookup[tag[NAME_KEY]] = tag
# else: skip malformed tags in existing structure
# Iterate through tags (device folders or direct UDTs) from the new structure
for new_tag in new_mcm_struct[TAGS_KEY]:
if not isinstance(new_tag, dict):
continue # Skip malformed new tags
new_tag_name = new_tag.get(NAME_KEY)
if not new_tag_name:
print(f" - Warning: Skipping tag in new structure due to missing name: {new_tag}")
continue # Skip tags without names
# Check if an item with the same name already exists in the current MCM structure
if new_tag_name in existing_tags_lookup:
existing_tag = existing_tags_lookup[new_tag_name]
new_tag_type = new_tag.get(TAG_TYPE_KEY)
existing_tag_type = existing_tag.get(TAG_TYPE_KEY)
# Merge only if both are Folders and the new folder has tags
if (existing_tag_type == FOLDER_TYPE and
new_tag_type == FOLDER_TYPE and
TAGS_KEY in new_tag and
isinstance(new_tag[TAGS_KEY], list)):
# Ensure the existing folder has a 'tags' list to extend
if TAGS_KEY not in existing_tag or not isinstance(existing_tag.get(TAGS_KEY), list):
# print(f" - Initializing tags list for existing folder '{new_tag_name}'") # Optional debug
existing_tag[TAGS_KEY] = []
# Append new elements (UDT instances) to the existing folder's tags list
if new_tag[TAGS_KEY]: # Only extend if there's something to add
# print(f" - Merging folder '{new_tag_name}'. Adding {len(new_tag[TAGS_KEY])} tags.") # Optional debug
existing_tag[TAGS_KEY].extend(new_tag[TAGS_KEY])
elif existing_tag_type == FOLDER_TYPE and new_tag_type == FOLDER_TYPE:
# Both are folders, but the new one has no tags or an invalid tag list
# print(f" - Attempted to merge folder '{new_tag_name}', but new tag list was empty or invalid.") # Optional debug
pass # No action needed
else:
# Names match, but types conflict (e.g., Folder vs UDT) or are not folders.
# Currently, we prioritize the existing item and log a warning.
print(f" - Warning: Conflict for name '{new_tag_name}'. Existing type '{existing_tag_type}' differs from new type '{new_tag_type}' or not mergeable folders. Keeping existing.")
else:
# New device folder or direct UDT tag, add it to the existing MCM structure
# print(f" - Adding new top-level tag/folder '{new_tag_name}' of type '{new_tag.get(TAG_TYPE_KEY)}'") # Optional debug
existing_mcm_struct[TAGS_KEY].append(new_tag)
# Add to lookup in case multiple view.json files contribute to the *same* new device folder within one run
existing_tags_lookup[new_tag_name] = new_tag
# --- Core Processing Functions ---
def process_view_json(view_file_path: str, mcm_number: str, device_type_override: Optional[str] = None) -> Optional[Tuple[Dict[str, Any], Dict[str, Any]]]:
"""
Reads a view.json file, validates structure, runs tag generation logic,
and returns both the modified view data and the created tag structure.
Args:
view_file_path: Path to the input view.json file.
mcm_number: The MCM number for tag generation.
device_type_override: Optional device type override.
Returns:
Tuple of (modified_view_data, mcm_tag_structure), or None on error.
"""
print(f" - Processing: {os.path.basename(view_file_path)}")
try:
# Read the input JSON file
with open(view_file_path, 'r') as f:
view_data = json.load(f)
# Check for the expected 'root'.'children' structure
# Use imported constants ROOT_KEY, CHILDREN_KEY
root_node = view_data.get(ROOT_KEY)
if not isinstance(root_node, dict) or CHILDREN_KEY not in root_node or not isinstance(root_node[CHILDREN_KEY], list):
print(f" - Warning: Skipping '{os.path.basename(view_file_path)}'. Invalid or missing '{ROOT_KEY}.{CHILDREN_KEY}' structure.")
return None
# Get the list of elements to process
elements_list = root_node[CHILDREN_KEY]
# --- Run Tag Generation ---
# NOTE: generate_tag_paths modifies elements_list in place,
# which modifies view_data indirectly.
generate_tag_paths(elements_list, mcm_number, device_type_override)
# create_tag_structure uses the (potentially modified) elements_list
# and returns a *new* structure dictionary.
mcm_tag_structure = create_tag_structure(elements_list, mcm_number, device_type_override)
# The modified view data is the original dict which was changed in-place
modified_view_data = view_data
return modified_view_data, mcm_tag_structure
except FileNotFoundError:
print(f" - Error: File not found: '{view_file_path}'. Skipping.")
return None
except json.JSONDecodeError:
print(f" - Error: Invalid JSON in file: '{view_file_path}'. Skipping.")
return None
except IOError as e:
print(f" - Error: Cannot read file '{view_file_path}': {e}. Skipping.")
return None
except Exception as e:
# Catch potential errors within the imported generation functions too
print(f" - Error: Unexpected error processing file '{view_file_path}': {str(e)}. Skipping.")
return None
def process_mcm_directories(base_dir: str, output_dir: str, device_type_override: Optional[str] = None):
"""
Scans subdirectories in base_dir, identifies MCM directories, processes
their view.json files, accumulates and merges tag structures per MCM,
updates view.json files in place, and saves a final tags.json file.
"""
print(f"\nScanning base directory: '{base_dir}'")
processed_view_count = 0
skipped_view_count = 0
error_saving_view_count = 0
skipped_dir_count = 0
# Accumulator for merged tag structures, keyed by MCM name (e.g., "MCM07")
merged_mcm_structures: Dict[str, Dict[str, Any]] = {}
try:
sub_items = os.listdir(base_dir)
except OSError as e:
print(f"Error: Cannot list directory '{base_dir}': {e}")
sys.exit(1)
for item_name in sub_items:
item_path = os.path.join(base_dir, item_name)
if os.path.isdir(item_path):
print(f"\nProcessing directory: '{item_name}'")
mcm_number = extract_mcm_number_from_name(item_name)
if mcm_number:
print(f" - Identified as MCM directory (Number: {mcm_number})")
view_json_path = os.path.join(item_path, VIEW_JSON_FILENAME)
if os.path.exists(view_json_path):
# Process the view.json for this MCM directory
results = process_view_json(view_json_path, mcm_number, device_type_override)
if results:
modified_view_data, single_mcm_structure = results
# --- 1. Overwrite the original view.json file ---
print(f" - Attempting to save updated '{VIEW_JSON_FILENAME}' to '{item_path}'")
if _save_json(modified_view_data, view_json_path):
print(f" - Successfully saved updated '{os.path.basename(view_json_path)}'")
else:
# Error message printed in _save_json
print(f" - Failed to save updated '{os.path.basename(view_json_path)}'. Check permissions or disk space.")
error_saving_view_count += 1
# Continue processing structure even if view save failed? Yes for now.
# --- 2. Accumulate and merge the generated tag structure ---
if single_mcm_structure and NAME_KEY in single_mcm_structure:
mcm_key = single_mcm_structure[NAME_KEY] # e.g., "MCM07"
print(f" - Processing generated structure for {mcm_key}")
if mcm_key in merged_mcm_structures:
print(f" - Merging with existing structure for {mcm_key}...")
merge_mcm_structures(merged_mcm_structures[mcm_key], single_mcm_structure)
else:
print(f" - Adding as new structure for {mcm_key}.")
merged_mcm_structures[mcm_key] = single_mcm_structure
processed_view_count += 1 # Count successful processing of structure
else:
print(f" - Warning: Generated structure from '{os.path.basename(view_json_path)}' is invalid or missing name. Skipping structure merge.")
skipped_view_count += 1
else:
# Error message printed in process_view_json
skipped_view_count += 1
print(f" - Skipping processing for directory '{item_name}' due to issues with '{VIEW_JSON_FILENAME}'.")
else:
print(f" - Warning: '{VIEW_JSON_FILENAME}' not found in directory '{item_name}'. Skipping.")
skipped_dir_count += 1
else:
print(f" - Skipping directory '{item_name}' (Does not match expected '{MCM_PREFIX}[number]' pattern).")
skipped_dir_count += 1
# else: ignore files directly under base_dir
# --- Save the final merged structure ---
if merged_mcm_structures:
print(f"\nConsolidating {len(merged_mcm_structures)} final MCM structures...")
# The desired output format is {"tags": [mcm_structure_1, mcm_structure_2, ...]}
final_output_object = {TAGS_KEY: list(merged_mcm_structures.values())}
final_output_path = os.path.join(output_dir, TAGS_JSON_FILENAME)
print(f"Attempting to save final merged tag structure to: '{final_output_path}'")
if _save_json(final_output_object, final_output_path):
print(f"Successfully saved merged tag structure to: '{final_output_path}'")
else:
# Error message printed in _save_json
print(f"Failed to save final '{TAGS_JSON_FILENAME}'. Processing completed with errors.")
# Consider exiting with error? For now, just report.
else:
print(f"\nNo valid MCM structures were generated or merged. No '{TAGS_JSON_FILENAME}' file created.")
# --- Final Summary ---
print(f"\n--- Processing Summary ---")
print(f" Directories Scanned: {len(sub_items)}")
print(f" MCM Directories Processed: {processed_view_count + skipped_view_count + error_saving_view_count}")
print(f" Skipped Directories (No MCM# or No view.json): {skipped_dir_count}")
print("-" * 25)
print(f" View Files Processed (Structure Generated): {processed_view_count}")
print(f" View Files Skipped (Read/Structure Error): {skipped_view_count}")
print(f" View Files Updated & Saved In-Place: {processed_view_count + skipped_view_count}") # Saved if processed, even if structure was invalid
print(f" Errors Saving Updated View Files: {error_saving_view_count}")
print("-" * 25)
if merged_mcm_structures:
print(f" Final '{TAGS_JSON_FILENAME}' generated in '{output_dir}' with data for {len(merged_mcm_structures)} MCMs.")
else:
print(f" No final '{TAGS_JSON_FILENAME}' generated.")
print("--- End Summary ---")
# --- Main Execution ---
def main():
parser = argparse.ArgumentParser(
description=f"Processes '{VIEW_JSON_FILENAME}' files found in MCM-named subdirectories of a base directory. "
f"Updates the '{VIEW_JSON_FILENAME}' files in-place and generates a consolidated '{TAGS_JSON_FILENAME}' file.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument(
"base_directory",
help=f"Path to the base directory containing MCM subfolders (e.g., 'MCM07 Bypass', 'MCM08 Control')."
)
parser.add_argument(
"-d", "--device-type",
dest="device_type_override",
default=None,
help="Optional. If provided, overrides automatic device type detection for ALL elements in ALL processed views."
)
parser.add_argument(
"-o", "--output-dir",
default=DEFAULT_OUTPUT_DIR,
help=f"Directory where the final '{TAGS_JSON_FILENAME}' will be saved."
)
args = parser.parse_args()
# --- Validate Arguments ---
# Check base directory
if not os.path.isdir(args.base_directory):
print(f"Error: Base directory not found or is not a directory: '{args.base_directory}'")
sys.exit(1)
# Check and create output directory
output_dir = args.output_dir
if not os.path.exists(output_dir):
print(f"Output directory '{output_dir}' does not exist. Attempting to create it...")
try:
os.makedirs(output_dir)
print(f"Successfully created output directory: '{output_dir}'")
except OSError as e:
print(f"Error: Could not create output directory '{output_dir}': {e}")
sys.exit(1)
elif not os.path.isdir(output_dir):
print(f"Error: Specified output path '{output_dir}' exists but is not a directory.")
sys.exit(1)
# --- Run Processing ---
process_mcm_directories(args.base_directory, output_dir, args.device_type_override)
if __name__ == "__main__":
main()

129
tag_transformer.py Normal file
View File

@ -0,0 +1,129 @@
#!/usr/bin/env python3
import json
import sys
import re
from typing import List, Dict, Any
def determine_device_type(element: Dict[str, Any]) -> str:
"""
Determine the device type based on the element's props.path.
Args:
element: The element dictionary.
Returns:
The determined device type (e.g., "ControlCabinet") or "Status" if path is invalid.
Returns "" for elements named MCM or MCMxx.
"""
# Keep special case for MCM elements based on name
element_name = element.get('meta', {}).get('name')
if element_name and (element_name == "MCM" or re.match(r'^MCM\d+$', element_name)):
return ""
# Determine type from props.path
element_path = element.get('props', {}).get('path')
if isinstance(element_path, str) and element_path:
# Extract the last part of the path
# Handles both / and \ separators implicitly with split
parts = element_path.replace('\\', '/').split('/')
if parts:
device_type = parts[-1]
# Optional: Sanitize if needed, but often the raw name is fine
if device_type: # Ensure it's not an empty string
return device_type
# Fallback if path is missing, empty, or parsing fails
return "Status"
def generate_tag_paths(
elements: List[Dict[str, Any]],
mcm_number: str,
device_type: str = None
) -> List[Dict[str, Any]]:
"""
Transform the tagProps in each element based on MCM number, device type and element name.
Modifies the input list in place.
Args:
elements: List of element dictionaries (e.g., from root['children']).
mcm_number: The MCM number (will be zero-padded to 2 digits)
device_type: Optional override for device type. If None, it's determined from element path.
Returns:
The same list of elements, modified in place.
"""
updated_elements = elements # Work directly on the passed list
mcm_formatted = f"MCM{mcm_number.zfill(2)}"
for element in updated_elements:
if 'meta' in element and element['meta'].get('name') == "Image":
continue
if 'meta' in element and 'name' in element['meta'] and \
'props' in element and 'params' in element['props'] and \
'tagProps' in element['props']['params']:
element_name = element['meta']['name']
# Determine device type: Use override if provided, else determine from element
actual_device_type = device_type
if not actual_device_type:
actual_device_type = determine_device_type(element)
# Generate the base path
if actual_device_type == "": # Handles the MCM case
base_path = f"System/{mcm_formatted}/{element_name}"
else:
base_path = f"System/{mcm_formatted}/{actual_device_type}/{element_name}"
# Update the first element if the list is not empty
tag_props = element['props']['params']['tagProps']
if tag_props and len(tag_props) > 0:
tag_props[0] = base_path
else:
# Optional: Decide if logging for missing structure is needed
pass
return updated_elements
def main():
# Check if filename and MCM number are provided as command line arguments
if len(sys.argv) < 3:
print("Usage: python tag_transformer.py <input_file.json> <mcm_number> [device_type]")
print("Example: python tag_transformer.py input.json 07 Conveyor")
print("If device_type is not provided, it will be determined automatically from element names.")
sys.exit(1)
input_file = sys.argv[1]
mcm_number = sys.argv[2]
# Device type is optional now
device_type = None
if len(sys.argv) >= 4:
device_type = sys.argv[3]
try:
# Read the input JSON file
with open(input_file, 'r') as f:
data = json.load(f)
# Transform the data
result = generate_tag_paths(data, mcm_number, device_type)
# Output the result as JSON to stdout
print(json.dumps(result, indent=2))
except FileNotFoundError:
print(f"Error: File '{input_file}' not found.")
sys.exit(1)
except json.JSONDecodeError:
print(f"Error: File '{input_file}' is not valid JSON.")
sys.exit(1)
except Exception as e:
print(f"Error: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()

68420
tags.json/tags.json Normal file

File diff suppressed because it is too large Load Diff