work-tracing/check_db.py
ilia-gurielidze-autstand 9e6d0a6911 first commit
2025-05-05 12:12:46 +04:00

82 lines
2.8 KiB
Python

"""
PostgreSQL Connection Test Script
Verifies connection to PostgreSQL database and lists existing tables.
"""
import os
import sys
import traceback
from dotenv import load_dotenv
from sqlalchemy import create_engine, text
# Print diagnostic information
print("Python version:", sys.version)
print("Current working directory:", os.getcwd())
# Load environment variables from both potential sources
print("Loading environment variables...")
load_dotenv() # From .env
load_dotenv("config.env") # From config.env
# Print all loaded environment variables (excluding sensitive info)
for key, value in os.environ.items():
if key.startswith("DATABASE"):
masked_value = value
if "@" in value:
parts = value.split("@")
masked_value = "****@" + parts[1]
print(f"{key}: {masked_value}")
# Get database URI
db_uri = os.environ.get('DATABASE_URI', os.environ.get('DATABASE_URL'))
if not db_uri:
print("ERROR: DATABASE_URI or DATABASE_URL environment variable not set!")
print("Available environment variables:", [key for key in os.environ.keys() if not key.startswith("_")])
exit(1)
print(f"Attempting to connect to database: {db_uri.split('@')[1] if '@' in db_uri else 'unknown'}")
try:
# Create engine with echo for verbose output
print("Creating SQLAlchemy engine...")
engine = create_engine(db_uri, echo=True)
# Test connection
print("Attempting to connect...")
with engine.connect() as conn:
print("✓ Successfully connected to PostgreSQL database!")
# Get list of tables
print("Querying for tables...")
result = conn.execute(text("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'"))
tables = [row[0] for row in result]
if tables:
print("\nExisting tables:")
for table in tables:
print(f" - {table}")
else:
print("\nNo tables found in the database.")
# Check if work_events table exists
if 'work_events' in tables:
# Count records
result = conn.execute(text("SELECT COUNT(*) FROM work_events"))
count = result.scalar()
print(f"\nThe work_events table contains {count} records.")
# Sample data
if count > 0:
result = conn.execute(text("SELECT * FROM work_events LIMIT 5"))
rows = result.fetchall()
print("\nSample data:")
for row in rows:
print(f" {row}")
else:
print("\nThe work_events table does not exist yet.")
except Exception as e:
print(f"ERROR: Failed to connect to the database: {e}")
print("\nDetailed error information:")
traceback.print_exc()
exit(1)