MonitorProgress/utils.py
2025-04-10 04:08:55 +04:00

174 lines
8.4 KiB
Python

import os
import re
import glob # Import glob for finding CSV files
import config
# Need pypdf for text extraction
from pypdf import PdfReader
def discover_projects():
"""Discovers projects by listing subdirectories in the PROJECTS_ROOT_DIR."""
projects = []
if not os.path.exists(config.PROJECTS_ROOT_DIR):
print(f"Warning: Projects root directory not found: {config.PROJECTS_ROOT_DIR}")
return []
for item in os.listdir(config.PROJECTS_ROOT_DIR):
item_path = os.path.join(config.PROJECTS_ROOT_DIR, item)
if os.path.isdir(item_path):
# Simple check: assume any directory is a project
# More robust check could look for specific files/folders inside
projects.append(item)
print(f"Discovered projects: {projects}")
return projects
def get_project_base_path(project_name):
"""Returns the absolute path to a specific project's directory."""
return os.path.join(config.PROJECTS_ROOT_DIR, project_name)
def get_repo_path(project_name):
"""Returns the absolute path to the repository directory for a given project."""
# Assume repo is always in a subdir named 'repo' within the project base
return os.path.join(get_project_base_path(project_name), "repo")
def find_csv_path(project_name):
"""Finds the first CSV file within the project's base directory."""
project_base = get_project_base_path(project_name)
csv_files = glob.glob(os.path.join(project_base, '*.csv'))
if csv_files:
if len(csv_files) > 1:
print(f"Warning: Multiple CSV files found in {project_base}. Using the first one: {csv_files[0]}")
return csv_files[0]
else:
print(f"Error: No CSV file found in project directory: {project_base}")
return None
def get_views_dir_path(project_name):
"""Returns the absolute path to the SCADA views directory within the project's repo."""
repo_path = get_repo_path(project_name)
# Dynamically find the SCADA data directory (e.g., 'MTN6_SCADA')
scada_data_dir = None
try:
for item in os.listdir(repo_path):
item_path = os.path.join(repo_path, item)
# Simple check: find first directory ending with '_SCADA' (case-insensitive)
if os.path.isdir(item_path) and item.upper().endswith('_SCADA'):
scada_data_dir = item_path
print(f"[{project_name}] Found SCADA data directory: {scada_data_dir}")
break # Use the first one found
except FileNotFoundError:
print(f"Warning: Repo path not found for project '{project_name}' at '{repo_path}' when searching for SCADA dir.")
# Fall through to return a potentially invalid path
except Exception as e:
print(f"Warning: Error searching for SCADA dir in '{repo_path}': {e}")
# Fall through
if not scada_data_dir:
print(f"Warning: Could not automatically find a *_SCADA directory in {repo_path}. Using fallback path structure.")
# Fallback: Reconstruct a path assuming a fixed name (less ideal)
# Or simply return None or let it fail? Returning the best guess path:
scada_data_dir = os.path.join(repo_path, f"{project_name}_SCADA") # Guess the folder name
# Append the common relative path from config
return os.path.join(scada_data_dir, config.VIEWS_DIR_RELATIVE)
def get_text_output_dir_path(project_name):
"""Returns the absolute path to the extracted drawing text output directory for a project."""
# Uses the relative folder name from config
return os.path.join(get_project_base_path(project_name), config.TEXT_OUTPUT_FOLDER_RELATIVE)
def get_pdf_dir_path(project_name):
"""Returns the absolute path to the source PDF directory for a project."""
# ASSUMPTION: PDFs are stored in a 'pdfs' subdirectory within the project base path
# Adjust 'pdfs' if the actual directory name is different.
return os.path.join(get_project_base_path(project_name), 'pdfs')
def normalize(text):
"""Normalize string for comparison: lowercase, treat '-' and '_' the same, remove all whitespace."""
if not isinstance(text, str):
return ""
text = text.lower() # Convert to lowercase
text = text.replace('-', '_') # Replace hyphens with underscores
text = re.sub(r'\s+', '', text) # Remove ALL whitespace characters (including newlines)
return text
def extract_text_from_pdf(pdf_path, txt_path):
"""
Extracts text from a single PDF file and saves it to a TXT file.
Returns True on success (incl. writing empty file), False on failure.
"""
base_filename = os.path.basename(pdf_path)
print(f" [Extractor] Attempting to process: {base_filename}")
extracted_text = ""
success = False # Track overall success
reader = None # Initialize reader to None
try:
# --- Step 1: Open and Decrypt (if necessary) ---
try:
print(f" [Extractor] Opening PDF: {base_filename}")
reader = PdfReader(pdf_path)
print(f" [Extractor] PDF opened successfully: {base_filename}")
except Exception as open_err:
print(f" [Extractor] CRITICAL ERROR opening PDF {base_filename}: {open_err}")
# Log traceback for detailed debugging
import traceback
traceback.print_exc()
return False # Cannot proceed
if reader.is_encrypted:
print(f" [Extractor] PDF is encrypted: {base_filename}. Attempting decryption...")
try:
# Try decrypting with empty password - adjust if needed
reader.decrypt('')
print(f" [Extractor] Decryption successful (or not needed) for {base_filename}")
except Exception as decrypt_err:
print(f" [Extractor] WARNING: Could not decrypt PDF {base_filename}: {decrypt_err}. Skipping.")
return False # Treat decryption failure as critical for this file
# --- Step 2: Extract Text Page by Page ---
print(f" [Extractor] Starting page-by-page text extraction for: {base_filename} ({len(reader.pages)} pages)")
page_texts = []
for i, page in enumerate(reader.pages):
try:
# print(f" [Extractor] Extracting text from page {i+1}") # Can be verbose
page_text = page.extract_text()
if page_text:
page_texts.append(page_text)
# else: print(f" [Extractor] No text found on page {i+1}")
except Exception as page_err:
# Log page-specific errors but continue if possible
print(f" [Extractor] WARNING: Error extracting text from page {i+1} in {base_filename}: {page_err}")
# Decide if this is fatal for the file? For now, we continue.
extracted_text = "\n".join(page_texts)
print(f" [Extractor] Finished text extraction for {base_filename}. Total chars extracted: {len(extracted_text)}")
# Handle case where no text is extracted - write empty file to prevent re-attempts
if not extracted_text:
print(f" [Extractor] WARNING: No text extracted from {base_filename}. An empty TXT file will be created.")
# --- Step 3: Write to TXT File ---
print(f" [Extractor] Attempting to write TXT file: {os.path.basename(txt_path)}")
try:
with open(txt_path, 'w', encoding='utf-8') as txt_file:
txt_file.write(extracted_text)
print(f" [Extractor] Successfully wrote TXT file: {os.path.basename(txt_path)}")
success = True # Mark as successful
except Exception as write_err:
print(f" [Extractor] ERROR writing text file {os.path.basename(txt_path)}: {write_err}")
success = False # Failed to write
except FileNotFoundError:
# This should technically be caught by the initial open_err block now
print(f" [Extractor] ERROR: PDF file not found at {pdf_path}.")
success = False
except Exception as e:
# Catch-all for unexpected errors during the process
print(f" [Extractor] UNEXPECTED CRITICAL ERROR processing PDF {base_filename}: {e}")
import traceback
traceback.print_exc()
success = False
# --- No finally block needed as we return directly ---
print(f" [Extractor] Finished processing {base_filename}. Result: {'Success' if success else 'Failure'}")
return success