174 lines
8.4 KiB
Python
174 lines
8.4 KiB
Python
import os
|
|
import re
|
|
import glob # Import glob for finding CSV files
|
|
import config
|
|
# Need pypdf for text extraction
|
|
from pypdf import PdfReader
|
|
|
|
def discover_projects():
|
|
"""Discovers projects by listing subdirectories in the PROJECTS_ROOT_DIR."""
|
|
projects = []
|
|
if not os.path.exists(config.PROJECTS_ROOT_DIR):
|
|
print(f"Warning: Projects root directory not found: {config.PROJECTS_ROOT_DIR}")
|
|
return []
|
|
|
|
for item in os.listdir(config.PROJECTS_ROOT_DIR):
|
|
item_path = os.path.join(config.PROJECTS_ROOT_DIR, item)
|
|
if os.path.isdir(item_path):
|
|
# Simple check: assume any directory is a project
|
|
# More robust check could look for specific files/folders inside
|
|
projects.append(item)
|
|
print(f"Discovered projects: {projects}")
|
|
return projects
|
|
|
|
def get_project_base_path(project_name):
|
|
"""Returns the absolute path to a specific project's directory."""
|
|
return os.path.join(config.PROJECTS_ROOT_DIR, project_name)
|
|
|
|
def get_repo_path(project_name):
|
|
"""Returns the absolute path to the repository directory for a given project."""
|
|
# Assume repo is always in a subdir named 'repo' within the project base
|
|
return os.path.join(get_project_base_path(project_name), "repo")
|
|
|
|
def find_csv_path(project_name):
|
|
"""Finds the first CSV file within the project's base directory."""
|
|
project_base = get_project_base_path(project_name)
|
|
csv_files = glob.glob(os.path.join(project_base, '*.csv'))
|
|
if csv_files:
|
|
if len(csv_files) > 1:
|
|
print(f"Warning: Multiple CSV files found in {project_base}. Using the first one: {csv_files[0]}")
|
|
return csv_files[0]
|
|
else:
|
|
print(f"Error: No CSV file found in project directory: {project_base}")
|
|
return None
|
|
|
|
def get_views_dir_path(project_name):
|
|
"""Returns the absolute path to the SCADA views directory within the project's repo."""
|
|
repo_path = get_repo_path(project_name)
|
|
# Dynamically find the SCADA data directory (e.g., 'MTN6_SCADA')
|
|
scada_data_dir = None
|
|
try:
|
|
for item in os.listdir(repo_path):
|
|
item_path = os.path.join(repo_path, item)
|
|
# Simple check: find first directory ending with '_SCADA' (case-insensitive)
|
|
if os.path.isdir(item_path) and item.upper().endswith('_SCADA'):
|
|
scada_data_dir = item_path
|
|
print(f"[{project_name}] Found SCADA data directory: {scada_data_dir}")
|
|
break # Use the first one found
|
|
except FileNotFoundError:
|
|
print(f"Warning: Repo path not found for project '{project_name}' at '{repo_path}' when searching for SCADA dir.")
|
|
# Fall through to return a potentially invalid path
|
|
except Exception as e:
|
|
print(f"Warning: Error searching for SCADA dir in '{repo_path}': {e}")
|
|
# Fall through
|
|
|
|
if not scada_data_dir:
|
|
print(f"Warning: Could not automatically find a *_SCADA directory in {repo_path}. Using fallback path structure.")
|
|
# Fallback: Reconstruct a path assuming a fixed name (less ideal)
|
|
# Or simply return None or let it fail? Returning the best guess path:
|
|
scada_data_dir = os.path.join(repo_path, f"{project_name}_SCADA") # Guess the folder name
|
|
|
|
# Append the common relative path from config
|
|
return os.path.join(scada_data_dir, config.VIEWS_DIR_RELATIVE)
|
|
|
|
def get_text_output_dir_path(project_name):
|
|
"""Returns the absolute path to the extracted drawing text output directory for a project."""
|
|
# Uses the relative folder name from config
|
|
return os.path.join(get_project_base_path(project_name), config.TEXT_OUTPUT_FOLDER_RELATIVE)
|
|
|
|
def get_pdf_dir_path(project_name):
|
|
"""Returns the absolute path to the source PDF directory for a project."""
|
|
# ASSUMPTION: PDFs are stored in a 'pdfs' subdirectory within the project base path
|
|
# Adjust 'pdfs' if the actual directory name is different.
|
|
return os.path.join(get_project_base_path(project_name), 'pdfs')
|
|
|
|
def normalize(text):
|
|
"""Normalize string for comparison: lowercase, treat '-' and '_' the same, remove all whitespace."""
|
|
if not isinstance(text, str):
|
|
return ""
|
|
text = text.lower() # Convert to lowercase
|
|
text = text.replace('-', '_') # Replace hyphens with underscores
|
|
text = re.sub(r'\s+', '', text) # Remove ALL whitespace characters (including newlines)
|
|
return text
|
|
|
|
def extract_text_from_pdf(pdf_path, txt_path):
|
|
"""
|
|
Extracts text from a single PDF file and saves it to a TXT file.
|
|
Returns True on success (incl. writing empty file), False on failure.
|
|
"""
|
|
base_filename = os.path.basename(pdf_path)
|
|
print(f" [Extractor] Attempting to process: {base_filename}")
|
|
extracted_text = ""
|
|
success = False # Track overall success
|
|
reader = None # Initialize reader to None
|
|
|
|
try:
|
|
# --- Step 1: Open and Decrypt (if necessary) ---
|
|
try:
|
|
print(f" [Extractor] Opening PDF: {base_filename}")
|
|
reader = PdfReader(pdf_path)
|
|
print(f" [Extractor] PDF opened successfully: {base_filename}")
|
|
except Exception as open_err:
|
|
print(f" [Extractor] CRITICAL ERROR opening PDF {base_filename}: {open_err}")
|
|
# Log traceback for detailed debugging
|
|
import traceback
|
|
traceback.print_exc()
|
|
return False # Cannot proceed
|
|
|
|
if reader.is_encrypted:
|
|
print(f" [Extractor] PDF is encrypted: {base_filename}. Attempting decryption...")
|
|
try:
|
|
# Try decrypting with empty password - adjust if needed
|
|
reader.decrypt('')
|
|
print(f" [Extractor] Decryption successful (or not needed) for {base_filename}")
|
|
except Exception as decrypt_err:
|
|
print(f" [Extractor] WARNING: Could not decrypt PDF {base_filename}: {decrypt_err}. Skipping.")
|
|
return False # Treat decryption failure as critical for this file
|
|
|
|
# --- Step 2: Extract Text Page by Page ---
|
|
print(f" [Extractor] Starting page-by-page text extraction for: {base_filename} ({len(reader.pages)} pages)")
|
|
page_texts = []
|
|
for i, page in enumerate(reader.pages):
|
|
try:
|
|
# print(f" [Extractor] Extracting text from page {i+1}") # Can be verbose
|
|
page_text = page.extract_text()
|
|
if page_text:
|
|
page_texts.append(page_text)
|
|
# else: print(f" [Extractor] No text found on page {i+1}")
|
|
except Exception as page_err:
|
|
# Log page-specific errors but continue if possible
|
|
print(f" [Extractor] WARNING: Error extracting text from page {i+1} in {base_filename}: {page_err}")
|
|
# Decide if this is fatal for the file? For now, we continue.
|
|
|
|
extracted_text = "\n".join(page_texts)
|
|
print(f" [Extractor] Finished text extraction for {base_filename}. Total chars extracted: {len(extracted_text)}")
|
|
|
|
# Handle case where no text is extracted - write empty file to prevent re-attempts
|
|
if not extracted_text:
|
|
print(f" [Extractor] WARNING: No text extracted from {base_filename}. An empty TXT file will be created.")
|
|
|
|
# --- Step 3: Write to TXT File ---
|
|
print(f" [Extractor] Attempting to write TXT file: {os.path.basename(txt_path)}")
|
|
try:
|
|
with open(txt_path, 'w', encoding='utf-8') as txt_file:
|
|
txt_file.write(extracted_text)
|
|
print(f" [Extractor] Successfully wrote TXT file: {os.path.basename(txt_path)}")
|
|
success = True # Mark as successful
|
|
except Exception as write_err:
|
|
print(f" [Extractor] ERROR writing text file {os.path.basename(txt_path)}: {write_err}")
|
|
success = False # Failed to write
|
|
|
|
except FileNotFoundError:
|
|
# This should technically be caught by the initial open_err block now
|
|
print(f" [Extractor] ERROR: PDF file not found at {pdf_path}.")
|
|
success = False
|
|
except Exception as e:
|
|
# Catch-all for unexpected errors during the process
|
|
print(f" [Extractor] UNEXPECTED CRITICAL ERROR processing PDF {base_filename}: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
success = False
|
|
# --- No finally block needed as we return directly ---
|
|
|
|
print(f" [Extractor] Finished processing {base_filename}. Result: {'Success' if success else 'Failure'}")
|
|
return success |