ilia-gurielidze-autstand 9e6d0a6911 first commit
2025-05-05 12:12:46 +04:00

835 lines
34 KiB
Python

"""
Employee Workstation Activity Tracking - Flask API Server
This Flask application provides a REST API for receiving and storing user activity
events from client workstations. It exposes endpoints for reporting activity state
changes and retrieving aggregated reports.
"""
import os
import logging
from logging.handlers import RotatingFileHandler
from datetime import datetime, timedelta
from dotenv import load_dotenv # Import load_dotenv
from flask import Flask, request, jsonify, render_template
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy import text, func, case, cast, Integer
import sys
# Load environment variables from config.env file
load_dotenv() # Try .env first
config_env_path = os.path.join(os.path.dirname(__file__), 'config.env')
if os.path.exists(config_env_path):
load_dotenv(config_env_path)
print(f"Loaded environment variables from {config_env_path}")
else:
print(f"Warning: config.env file not found at {config_env_path}")
# Print all DATABASE* environment variables for debugging (masked)
for key, value in os.environ.items():
if key.startswith("DATABASE"):
masked_value = value
if "@" in value:
parts = value.split("@")
masked_value = "****@" + parts[1]
print(f"{key}: {masked_value}")
# Initialize Flask app
app = Flask(__name__, instance_relative_config=True)
# Load configuration
# In production, use environment variables or .env file
app.config.from_mapping(
SECRET_KEY=os.environ.get('SECRET_KEY', 'dev'),
SQLALCHEMY_DATABASE_URI=os.environ.get('DATABASE_URI', os.environ.get('DATABASE_URL')),
SQLALCHEMY_TRACK_MODIFICATIONS=False
)
# Ensure DATABASE_URI is set
if not app.config['SQLALCHEMY_DATABASE_URI']:
raise ValueError("DATABASE_URI or DATABASE_URL environment variable must be set for production mode")
# Ensure the instance folder exists
try:
os.makedirs(app.instance_path)
except OSError:
pass
# Configure logging
log_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
log_handler = RotatingFileHandler(
os.path.join(app.instance_path, 'server.log'),
maxBytes=1024 * 1024 * 5, # 5 MB
backupCount=5
)
log_handler.setFormatter(log_formatter)
log_handler.setLevel(logging.INFO)
if not app.debug: # Avoid duplicate logs in debug mode if Werkzeug logger is also active
app.logger.addHandler(log_handler)
app.logger.setLevel(logging.INFO)
app.logger.info('Flask application starting up...') # Log startup
# Initialize database
db = SQLAlchemy()
db.init_app(app)
# Define database models
class WorkEvent(db.Model):
"""
Represents a user activity event with state transitions (working/stopped).
"""
__tablename__ = 'work_events'
id = db.Column(db.Integer, primary_key=True)
user = db.Column(db.String(100), nullable=False, index=True)
state = db.Column(db.String(10), nullable=False) # 'working' or 'stopped'
ts = db.Column(db.DateTime, nullable=False,
server_default=db.func.current_timestamp(),
index=True)
def __repr__(self):
return f"<WorkEvent(user='{self.user}', state='{self.state}', ts='{self.ts}')>"
def to_dict(self):
"""Convert model to dictionary for API responses"""
return {
'id': self.id,
'user': self.user,
'state': self.state,
'ts': self.ts.isoformat() if self.ts else None
}
# Print model metadata for debugging
app.logger.info(f"WorkEvent __tablename__: {WorkEvent.__tablename__}")
app.logger.info(f"WorkEvent columns: {[column.name for column in WorkEvent.__table__.columns]}")
try:
# Check if there are any attribute mappers or event listeners
app.logger.info(f"WorkEvent attribute names: {dir(WorkEvent)}")
except Exception as e:
app.logger.error(f"Error inspecting WorkEvent class: {str(e)}")
# API Routes
@app.route('/api/report', methods=['POST'])
def report_event():
"""
Endpoint for clients to report activity state changes.
Expected JSON payload:
{
"user": "username",
"state": "working|stopped",
"ts": "2023-07-08T12:30:45Z" (optional, ISO 8601)
}
"""
data = request.get_json()
app.logger.info(f"Received report request: {data}") # Log request
if not data or 'user' not in data or 'state' not in data:
app.logger.warning("Invalid report request payload.")
return jsonify({
'success': False,
'message': 'Missing required fields: user, state'
}), 400
# Validate state value
if data['state'] not in ['working', 'stopped']:
return jsonify({
'success': False,
'message': 'Invalid state value. Must be "working" or "stopped"'
}), 400
# Parse timestamp if provided, otherwise use current time
timestamp = None
if 'ts' in data and data['ts']:
try:
timestamp = datetime.fromisoformat(data['ts'].replace('Z', '+00:00'))
except ValueError:
return jsonify({
'success': False,
'message': 'Invalid timestamp format. Use ISO 8601 (YYYY-MM-DDTHH:MM:SSZ)'
}), 400
# Create and store the event
user = data['user']
state = data['state']
ts_str = data.get('ts') # Optional timestamp
event_ts = datetime.utcnow()
if ts_str:
try:
# Attempt to parse ISO 8601 format
event_ts = datetime.fromisoformat(ts_str.replace('Z', '+00:00'))
except ValueError:
app.logger.warning(f"Invalid timestamp format received: {ts_str}. Using current UTC time.")
# Optionally return an error here if strict timestamp validation is needed
# return jsonify({"success": False, "message": "Invalid timestamp format"}), 400
new_event = WorkEvent(user=user, state=state, ts=event_ts)
try:
app.logger.info(f"Attempting to add event to database: User={user}, State={state}, TS={event_ts}")
db.session.add(new_event)
db.session.commit()
app.logger.info(f"Successfully recorded event: User={user}, State={state}") # Already added, maybe refine slightly
return jsonify({"success": True}), 201
except SQLAlchemyError as e:
db.session.rollback()
app.logger.error(f"Database error while recording event: {e}")
return jsonify({"success": False, "message": "Database error"}), 500
except Exception as e:
app.logger.error(f"Unexpected error processing report request: {e}") # Refined outer error
return jsonify({"success": False, "message": "Internal server error"}), 500
# --- Helper Functions for Duration Calculation ---
def calculate_duration_sql(time_period):
"""
Generates the core SQL query to calculate working durations.
Uses LEAD() window function to pair 'working' with the next event.
Calculates duration in hours using PostgreSQL functions.
"""
# PostgreSQL date functions (already compatible with the database)
period_grouping = {
'daily': "DATE_TRUNC('day', start_time)",
'weekly': "DATE_TRUNC('week', start_time)", # PostgreSQL week starts Monday
'monthly': "DATE_TRUNC('month', start_time)"
}.get(time_period, "DATE_TRUNC('day', start_time)") # Default to daily if invalid
# Calculate duration using EXTRACT(EPOCH FROM interval) / 3600 for hours
duration_calculation = "EXTRACT(EPOCH FROM (next_event_time - start_time)) / 3600.0"
# Use public schema explicitly, ensure proper aggregation by user and period
sql_query = f"""
WITH EventPairs AS (
SELECT
"user",
ts AS start_time,
state,
LEAD(ts) OVER (PARTITION BY "user" ORDER BY ts) AS next_event_time,
LEAD(state) OVER (PARTITION BY "user" ORDER BY ts) AS next_event_state
FROM public.work_events
ORDER BY "user", ts
),
CalculatedDurations AS (
SELECT
"user",
{period_grouping} AS period_start,
SUM(
CASE
WHEN state = 'working' AND next_event_time IS NOT NULL THEN
{duration_calculation}
ELSE 0 -- Ignore intervals starting with 'stopped' or without a following event
END
) AS total_hours,
MIN(CASE WHEN state = 'working' THEN start_time END) AS first_login_time
FROM EventPairs
WHERE state = 'working' -- Only consider intervals that start with 'working'
GROUP BY "user", period_start
)
-- Final aggregation to ensure one row per user per period
SELECT
"user",
period_start,
SUM(total_hours) AS total_hours,
MIN(first_login_time) AS first_login_time
FROM CalculatedDurations
GROUP BY "user", period_start
ORDER BY "user", period_start DESC;
"""
# Add debug logging to see the SQL query
app.logger.info(f"Generated SQL query: {sql_query}")
return sql_query
def filter_sql_by_user(base_sql, user):
"""Applies a user filter to the SQL query safely."""
# Find the position of GROUP BY to insert WHERE clause correctly
group_by_pos = base_sql.find("GROUP BY")
if group_by_pos != -1:
# Make the user filter case-insensitive using LOWER()
where_clause = "WHERE state = 'working' AND LOWER(\"user\") = LOWER(:user)\n "
# Replace the generic WHERE clause with the user-specific one
filtered_sql = base_sql[:base_sql.find("WHERE")] + where_clause + base_sql[group_by_pos:]
return filtered_sql
else:
# Should not happen with the current query structure, but handle defensively
return base_sql # Return original if GROUP BY not found
def fetch_duration_report(time_period, user_filter=None):
"""Fetches duration report data from the database."""
app.logger.debug(f"Fetching duration report. Period: {time_period}, User: {user_filter}")
sql_query = calculate_duration_sql(time_period)
params = {}
if user_filter:
# Note: filter_sql_by_user modifies the query string directly
sql_query = filter_sql_by_user(sql_query, user_filter)
params['user'] = user_filter
app.logger.debug(f"Applying user filter: {user_filter}")
# Add debug to show database connection info
db_uri = app.config['SQLALCHEMY_DATABASE_URI']
masked_uri = db_uri
if '@' in db_uri:
parts = db_uri.split('@')
masked_uri = "****@" + parts[1]
app.logger.info(f"Executing query using database: {masked_uri}")
try:
# Add a simple count query to verify data in the database
count_query = "SELECT COUNT(*) FROM work_events"
count_result = db.session.execute(text(count_query)).scalar()
app.logger.info(f"Total records in work_events table: {count_result}")
# Add another query to check distinct users
users_query = "SELECT DISTINCT \"user\" FROM work_events"
users_result = db.session.execute(text(users_query)).fetchall()
user_list = [row[0] for row in users_result]
app.logger.info(f"Distinct users in work_events table: {user_list}")
results = db.session.execute(text(sql_query), params).mappings().all()
app.logger.debug(f"Database query executed. Found {len(results)} rows.")
return results
except Exception as e:
app.logger.error(f"Error executing duration report query: {e}")
# Re-raise the exception to be handled by the endpoint's error handler
raise
def fetch_user_activity(username, start_date, end_date):
"""Fetches detailed user activity logs for a specific date range."""
app.logger.debug(f"Fetching activity logs for user: {username}, from: {start_date}, to: {end_date}")
# SQL query to match working and stopped pairs and calculate durations
sql_query = """
WITH EventPairs AS (
SELECT
w1."user",
DATE(w1.ts) AS work_date,
w1.ts AS start_time,
w2.ts AS end_time,
EXTRACT(EPOCH FROM (w2.ts - w1.ts))/3600 AS session_duration_hours
FROM
work_events w1
JOIN
work_events w2 ON w1."user" = w2."user"
AND w1.state = 'working'
AND w2.state = 'stopped'
AND w2.ts > w1.ts
AND NOT EXISTS (
SELECT 1 FROM work_events w3
WHERE w3."user" = w1."user"
AND w3.ts > w1.ts AND w3.ts < w2.ts
)
WHERE
w1."user" = :username
AND DATE(w1.ts) BETWEEN :start_date AND :end_date
ORDER BY
w1.ts
)
SELECT * FROM EventPairs
"""
try:
params = {
'username': username,
'start_date': start_date,
'end_date': end_date
}
results = db.session.execute(text(sql_query), params).mappings().all()
app.logger.debug(f"User activity query executed. Found {len(results)} rows.")
return results
except Exception as e:
app.logger.error(f"Error executing user activity query: {e}")
raise
def format_report_data(results, time_period):
"""Formats the raw database results into a list of dictionaries for the API."""
app.logger.debug(f"Formatting report data for period: {time_period}. Input rows: {len(results)}")
period_key_map = {
'daily': 'day',
'weekly': 'week_start',
'monthly': 'month_start'
}
period_key = period_key_map.get(time_period, 'period_start') # Default if unexpected period
# First convert raw rows to dictionaries
raw_data = []
for row in results:
# Ensure period_start is converted to string if it's a date/datetime object
period_value = row['period_start']
if hasattr(period_value, 'isoformat'):
period_value = period_value.isoformat()
# Format first_login_time
first_login_time = row['first_login_time']
if hasattr(first_login_time, 'isoformat'):
first_login_time = first_login_time.isoformat()
# Ensure duration_hours is a float, not a string or Decimal
duration_hours = row['total_hours']
if duration_hours is None:
duration_hours = 0.0
else:
# Convert to float explicitly to ensure it's JSON serializable as a number
duration_hours = float(duration_hours)
raw_data.append({
'user': row['user'],
period_key: period_value,
'duration_hours': duration_hours,
'first_login_time': first_login_time
})
# Additional preprocessing to consolidate any duplicate user entries
user_period_map = {}
for entry in raw_data:
user = entry['user']
period = entry[period_key]
key = f"{user}_{period}"
if key in user_period_map:
# Aggregate duration for existing user+period
user_period_map[key]['duration_hours'] += entry['duration_hours']
# Use the earliest first_login_time
existing_time = user_period_map[key]['first_login_time']
new_time = entry['first_login_time']
if existing_time and new_time:
if new_time < existing_time:
user_period_map[key]['first_login_time'] = new_time
else:
# New user+period combination
user_period_map[key] = entry
# Convert consolidated map back to list
formatted_data = list(user_period_map.values())
app.logger.debug(f"Formatted report data created. Output rows: {len(formatted_data)}")
return formatted_data
def format_user_activity(results):
"""Formats the raw user activity results into a list of dictionaries."""
formatted_data = []
for row in results:
start_time = row['start_time']
end_time = row['end_time']
# Format timestamps for display
if hasattr(start_time, 'isoformat'):
start_time = start_time.isoformat()
if hasattr(end_time, 'isoformat'):
end_time = end_time.isoformat()
# Format duration as float
duration = float(row['session_duration_hours']) if row['session_duration_hours'] is not None else 0.0
formatted_data.append({
'date': row['work_date'].isoformat() if hasattr(row['work_date'], 'isoformat') else str(row['work_date']),
'start_time': start_time,
'end_time': end_time,
'duration_hours': round(duration, 2)
})
return formatted_data
# --- Reporting Endpoints ---
# Reporting endpoints (basic implementation)
@app.route('/api/reports/daily', methods=['GET'])
def get_daily_report():
app.logger.info("Daily report API requested.")
try:
app.logger.info("Fetching daily report data...")
# Get date parameter or use today as default
selected_date = request.args.get('date')
if selected_date:
app.logger.info(f"Using selected date: {selected_date}")
else:
selected_date = datetime.now().strftime('%Y-%m-%d')
app.logger.info(f"No date provided, using today: {selected_date}")
user_filter = request.args.get('user')
# Get regular daily report results
results = fetch_duration_report('daily', user_filter)
# Filter to only include entries for the selected date
filtered_results = []
for row in results:
row_date = row['period_start']
if hasattr(row_date, 'isoformat'):
row_date = row_date.isoformat()
# Check if the row's date matches the selected date
if row_date and row_date.startswith(selected_date):
filtered_results.append(row)
# Add debug logging for usernames in raw results
app.logger.info(f"Raw results usernames for date {selected_date}: {[r['user'] for r in filtered_results]}")
report = format_report_data(filtered_results, 'daily')
# Add debug logging for usernames in formatted data
app.logger.info(f"Formatted data usernames: {[r['user'] for r in report]}")
app.logger.info(f"Successfully generated daily report for date: {selected_date}, user: {request.args.get('user', 'All')}. Found {len(filtered_results)} records.")
return jsonify({"success": True, "data": report})
except Exception as e:
app.logger.error(f"Error generating daily report: {e}")
return jsonify({"success": False, "message": "Error generating report"}), 500
@app.route('/api/reports/weekly', methods=['GET'])
def get_weekly_report():
app.logger.info("Weekly report API requested.")
try:
app.logger.info("Fetching weekly report data...")
# Check if a specific day within the week was requested
day_filter = request.args.get('day')
user_filter = request.args.get('user')
if day_filter:
app.logger.info(f"Filtering weekly report for specific day: {day_filter}")
# Use daily query with the specific date
results = fetch_duration_report('daily', user_filter)
# Filter results to only include the requested day
filtered_results = []
for row in results:
row_date = row['period_start']
if hasattr(row_date, 'isoformat'):
row_date = row_date.isoformat()
# Check if the row's date matches the requested day
if row_date and row_date.startswith(day_filter):
filtered_results.append(row)
results = filtered_results
else:
# Get current week dates for filtering
now = datetime.now()
# Get Monday of current week
current_week_start = now - timedelta(days=now.weekday())
current_week_start = current_week_start.replace(hour=0, minute=0, second=0, microsecond=0)
# Regular weekly report (whole week)
results = fetch_duration_report('weekly', user_filter)
# Filter to just include current week and aggregate by user
filtered_results = []
user_aggregated = {}
for row in results:
row_date = row['period_start']
# Convert to datetime if it's a string
if isinstance(row_date, str):
try:
row_date = datetime.fromisoformat(row_date.replace('Z', '+00:00'))
except ValueError:
continue
# Check if it's from current week
if row_date and row_date.date() == current_week_start.date():
username = row['user']
if username in user_aggregated:
# Add duration hours
user_aggregated[username]['total_hours'] += row['total_hours']
# Keep earliest first_login_time
if row['first_login_time'] and user_aggregated[username]['first_login_time']:
row_login = row['first_login_time']
if isinstance(row_login, str):
try:
row_login = datetime.fromisoformat(row_login.replace('Z', '+00:00'))
except ValueError:
row_login = None
existing_login = user_aggregated[username]['first_login_time']
if isinstance(existing_login, str):
try:
existing_login = datetime.fromisoformat(existing_login.replace('Z', '+00:00'))
except ValueError:
existing_login = None
if row_login and existing_login and row_login < existing_login:
user_aggregated[username]['first_login_time'] = row['first_login_time']
else:
# First entry for this user
user_aggregated[username] = {
'user': username,
'period_start': row_date,
'total_hours': row['total_hours'],
'first_login_time': row['first_login_time']
}
# Convert aggregated dict back to list
filtered_results = list(user_aggregated.values())
results = filtered_results if filtered_results else results
# Add debug logging for usernames in raw results
app.logger.info(f"Raw results usernames: {[r['user'] for r in results]}")
report = format_report_data(results, 'weekly')
# Add debug logging for usernames in formatted data
app.logger.info(f"Formatted data usernames: {[r['user'] for r in report]}")
app.logger.info(f"Successfully generated weekly report for user: {request.args.get('user', 'All')}. Found {len(results)} records.")
return jsonify({"success": True, "data": report})
except Exception as e:
app.logger.error(f"Error generating weekly report: {e}")
return jsonify({"success": False, "message": "Error generating report"}), 500
@app.route('/api/reports/monthly', methods=['GET'])
def get_monthly_report():
app.logger.info("Monthly report API requested.")
try:
app.logger.info("Fetching monthly report data...")
# Get current month for filtering
now = datetime.now()
current_month_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
# Get regular monthly report
results = fetch_duration_report('monthly', request.args.get('user'))
# Filter to only include current month and aggregate by user
filtered_results = []
user_aggregated = {}
for row in results:
row_date = row['period_start']
# Convert to datetime if it's a string
if isinstance(row_date, str):
try:
row_date = datetime.fromisoformat(row_date.replace('Z', '+00:00'))
except ValueError:
continue
# Check if it's from current month
if row_date and row_date.year == current_month_start.year and row_date.month == current_month_start.month:
username = row['user']
if username in user_aggregated:
# Add duration hours
user_aggregated[username]['total_hours'] += row['total_hours']
else:
# First entry for this user
user_aggregated[username] = {
'user': username,
'period_start': row_date,
'total_hours': row['total_hours'],
'first_login_time': row['first_login_time']
}
# Convert aggregated dict back to list
filtered_results = list(user_aggregated.values())
results = filtered_results if filtered_results else results
# Add debug logging for usernames in raw results
app.logger.info(f"Raw results usernames: {[r['user'] for r in results]}")
report = format_report_data(results, 'monthly')
# Add debug logging for usernames in formatted data
app.logger.info(f"Formatted data usernames: {[r['user'] for r in report]}")
app.logger.info(f"Successfully generated monthly report for user: {request.args.get('user', 'All')}. Found {len(results)} records.")
return jsonify({"success": True, "data": report})
except Exception as e:
app.logger.error(f"Error generating monthly report: {e}")
return jsonify({"success": False, "message": "Error generating report"}), 500
@app.route('/api/user-activity/<username>', methods=['GET'])
def get_user_activity(username):
"""Gets detailed activity logs for a specific user."""
app.logger.info(f"User activity logs requested for: {username}")
# Get date range from query parameters, default to current day if not provided
start_date = request.args.get('start_date', datetime.now().strftime('%Y-%m-%d'))
end_date = request.args.get('end_date', start_date)
try:
app.logger.info(f"Fetching activity logs for user: {username}, from: {start_date}, to: {end_date}")
results = fetch_user_activity(username, start_date, end_date)
activity_logs = format_user_activity(results)
app.logger.info(f"Successfully retrieved {len(activity_logs)} activity records for user: {username}")
return jsonify({
"success": True,
"data": {
"username": username,
"start_date": start_date,
"end_date": end_date,
"activities": activity_logs
}
})
except Exception as e:
app.logger.error(f"Error retrieving user activity logs: {e}")
return jsonify({"success": False, "message": "Error retrieving activity logs"}), 500
# Error handlers
@app.errorhandler(400)
def bad_request(error):
return jsonify({
'success': False,
'message': 'Bad request'
}), 400
@app.errorhandler(404)
def not_found_error(error):
app.logger.warning(f"404 Not Found error triggered for URL: {request.url}")
return jsonify({"success": False, "message": "Resource not found"}), 404
@app.errorhandler(405)
def method_not_allowed(error):
return jsonify({
'success': False,
'message': 'Method not allowed'
}), 405
@app.errorhandler(500)
def internal_error(error):
# Note: The specific error causing the 500 might have already been logged
app.logger.error(f"Global 500 Internal Server error handler triggered: {error}")
return jsonify({"success": False, "message": "Internal server error"}), 500
# Simple dashboard (optional)
@app.route('/')
def dashboard():
app.logger.info("Dashboard page requested.")
# Add direct query to verify data
try:
# Direct query to list all distinct users
direct_query = "SELECT DISTINCT \"user\" FROM work_events"
direct_results = db.session.execute(text(direct_query)).fetchall()
user_list = [row[0] for row in direct_results]
app.logger.info(f"DIRECT QUERY - Distinct users in database: {user_list}")
# Direct query to count records
count_query = "SELECT COUNT(*) FROM work_events"
count_result = db.session.execute(text(count_query)).scalar()
app.logger.info(f"DIRECT QUERY - Total records in work_events: {count_result}")
# Get first few records to inspect
sample_query = "SELECT id, \"user\", state, ts FROM work_events LIMIT 5"
sample_results = db.session.execute(text(sample_query)).fetchall()
app.logger.info(f"DIRECT QUERY - Sample records: {sample_results}")
# Check the current schema name
schema_query = "SELECT current_schema()"
schema_result = db.session.execute(text(schema_query)).scalar()
app.logger.info(f"DIRECT QUERY - Current schema: {schema_result}")
# List all schemas in the database
schemas_query = "SELECT schema_name FROM information_schema.schemata"
schemas_results = db.session.execute(text(schemas_query)).fetchall()
schema_list = [row[0] for row in schemas_results]
app.logger.info(f"DIRECT QUERY - Available schemas: {schema_list}")
# List all tables in the public schema
tables_query = "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'"
tables_results = db.session.execute(text(tables_query)).fetchall()
table_list = [row[0] for row in tables_results]
app.logger.info(f"DIRECT QUERY - Tables in public schema: {table_list}")
except Exception as e:
app.logger.error(f"Error during direct database query debugging: {str(e)}")
return render_template('dashboard.html')
# Create the database tables
def init_db():
with app.app_context():
# Log the full database URI (with sensitive info removed)
db_uri = app.config['SQLALCHEMY_DATABASE_URI']
if 'postgresql' in db_uri:
# Mask password if using PostgreSQL
masked_uri = db_uri.replace(db_uri.split('@')[0], 'postgresql://****:****')
app.logger.info(f"Using database URI: {masked_uri}")
# Initialize PostgreSQL-specific components
app.logger.info("Detected PostgreSQL database")
# Check for extensions (can add more as needed)
try:
db.session.execute(text("SELECT 1 FROM pg_extension WHERE extname = 'pgcrypto'"))
app.logger.info("PostgreSQL database version check completed")
except Exception as e:
app.logger.warning(f"PostgreSQL extension check failed: {e}")
else:
app.logger.info(f"Using database URI: {db_uri}")
# For SQLite, ensure parent directory exists and is writable
if db_uri and db_uri.startswith('sqlite:///'):
db_file = db_uri.replace('sqlite:///', '')
db_dir = os.path.dirname(db_file)
app.logger.info(f"SQLite database file path: {db_file}")
app.logger.info(f"SQLite database directory exists: {os.path.exists(db_dir)}")
if os.path.exists(db_dir):
app.logger.info(f"SQLite database directory writable: {os.access(db_dir, os.W_OK)}")
# Create the tables
db.create_all()
app.logger.info("Database initialized")
if __name__ == '__main__':
print("Starting application...")
# Create database tables if they don't exist
print(f"Instance path: {app.instance_path}")
instance_exists = os.path.exists(app.instance_path)
print(f"Instance path exists: {instance_exists}")
# Make sure the instance directory exists
if not instance_exists:
try:
os.makedirs(app.instance_path)
print(f"Created instance directory: {app.instance_path}")
except Exception as e:
print(f"Error creating instance directory: {e}")
# Check instance directory permissions
has_write_access = os.access(app.instance_path, os.W_OK)
print(f"Instance path write access: {has_write_access}")
# Print database configuration
db_uri = app.config['SQLALCHEMY_DATABASE_URI']
print(f"Database URI: {db_uri}")
# For SQLite, print more details about the database file
if db_uri.startswith('sqlite:///'):
db_file = db_uri.replace('sqlite:///', '')
print(f"Database file path: {db_file}")
db_dir = os.path.dirname(db_file)
print(f"Database directory: {db_dir}")
print(f"Database directory exists: {os.path.exists(db_dir)}")
print(f"Database directory writable: {os.access(db_dir, os.W_OK)}")
print(f"Database file exists: {os.path.exists(db_file)}")
if os.path.exists(db_file):
print(f"Database file writable: {os.access(db_file, os.W_OK)}")
# Use try/except to catch and log any initialization errors
try:
print("Initializing database...")
init_db() # Ensure database and tables are created on first run
print("Database initialization successful")
except Exception as e:
print(f"Error during database initialization: {e}")
import traceback
traceback.print_exc()
# Check if database file was created (for SQLite)
if db_uri.startswith('sqlite:///'):
db_file = db_uri.replace('sqlite:///', '')
print(f"After init: Database file exists: {os.path.exists(db_file)}")
# Run the Flask application
host = os.environ.get('HOST', '0.0.0.0')
port = int(os.environ.get('PORT', 5000))
debug = os.environ.get('DEBUG', 'False').lower() == 'true'
print(f"Starting Flask application on {host}:{port} (debug={debug})")
app.run(host=host, port=port, debug=debug)