""" Employee Workstation Activity Tracking - Flask API Server This Flask application provides a REST API for receiving and storing user activity events from client workstations. It exposes endpoints for reporting activity state changes and retrieving aggregated reports. """ import os import logging from logging.handlers import RotatingFileHandler from datetime import datetime, timedelta from dotenv import load_dotenv # Import load_dotenv from flask import Flask, request, jsonify, render_template from flask_sqlalchemy import SQLAlchemy from sqlalchemy.exc import SQLAlchemyError from sqlalchemy import text, func, case, cast, Integer import sys # Load environment variables from config.env file load_dotenv() # Try .env first config_env_path = os.path.join(os.path.dirname(__file__), 'config.env') if os.path.exists(config_env_path): load_dotenv(config_env_path) print(f"Loaded environment variables from {config_env_path}") else: print(f"Warning: config.env file not found at {config_env_path}") # Print all DATABASE* environment variables for debugging (masked) for key, value in os.environ.items(): if key.startswith("DATABASE"): masked_value = value if "@" in value: parts = value.split("@") masked_value = "****@" + parts[1] print(f"{key}: {masked_value}") # Initialize Flask app app = Flask(__name__, instance_relative_config=True) # Load configuration # In production, use environment variables or .env file app.config.from_mapping( SECRET_KEY=os.environ.get('SECRET_KEY', 'dev'), SQLALCHEMY_DATABASE_URI=os.environ.get('DATABASE_URI', os.environ.get('DATABASE_URL')), SQLALCHEMY_TRACK_MODIFICATIONS=False ) # Ensure DATABASE_URI is set if not app.config['SQLALCHEMY_DATABASE_URI']: raise ValueError("DATABASE_URI or DATABASE_URL environment variable must be set for production mode") # Ensure the instance folder exists try: os.makedirs(app.instance_path) except OSError: pass # Configure logging log_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') log_handler = RotatingFileHandler( os.path.join(app.instance_path, 'server.log'), maxBytes=1024 * 1024 * 5, # 5 MB backupCount=5 ) log_handler.setFormatter(log_formatter) log_handler.setLevel(logging.INFO) if not app.debug: # Avoid duplicate logs in debug mode if Werkzeug logger is also active app.logger.addHandler(log_handler) app.logger.setLevel(logging.INFO) app.logger.info('Flask application starting up...') # Log startup # Initialize database db = SQLAlchemy() db.init_app(app) # Define database models class WorkEvent(db.Model): """ Represents a user activity event with state transitions (working/stopped). """ __tablename__ = 'work_events' id = db.Column(db.Integer, primary_key=True) user = db.Column(db.String(100), nullable=False, index=True) state = db.Column(db.String(10), nullable=False) # 'working' or 'stopped' ts = db.Column(db.DateTime, nullable=False, server_default=db.func.current_timestamp(), index=True) def __repr__(self): return f"" def to_dict(self): """Convert model to dictionary for API responses""" return { 'id': self.id, 'user': self.user, 'state': self.state, 'ts': self.ts.isoformat() if self.ts else None } # Print model metadata for debugging app.logger.info(f"WorkEvent __tablename__: {WorkEvent.__tablename__}") app.logger.info(f"WorkEvent columns: {[column.name for column in WorkEvent.__table__.columns]}") try: # Check if there are any attribute mappers or event listeners app.logger.info(f"WorkEvent attribute names: {dir(WorkEvent)}") except Exception as e: app.logger.error(f"Error inspecting WorkEvent class: {str(e)}") # API Routes @app.route('/api/report', methods=['POST']) def report_event(): """ Endpoint for clients to report activity state changes. Expected JSON payload: { "user": "username", "state": "working|stopped", "ts": "2023-07-08T12:30:45Z" (optional, ISO 8601) } """ data = request.get_json() app.logger.info(f"Received report request: {data}") # Log request if not data or 'user' not in data or 'state' not in data: app.logger.warning("Invalid report request payload.") return jsonify({ 'success': False, 'message': 'Missing required fields: user, state' }), 400 # Validate state value if data['state'] not in ['working', 'stopped']: return jsonify({ 'success': False, 'message': 'Invalid state value. Must be "working" or "stopped"' }), 400 # Parse timestamp if provided, otherwise use current time timestamp = None if 'ts' in data and data['ts']: try: timestamp = datetime.fromisoformat(data['ts'].replace('Z', '+00:00')) except ValueError: return jsonify({ 'success': False, 'message': 'Invalid timestamp format. Use ISO 8601 (YYYY-MM-DDTHH:MM:SSZ)' }), 400 # Create and store the event user = data['user'] state = data['state'] ts_str = data.get('ts') # Optional timestamp event_ts = datetime.utcnow() if ts_str: try: # Attempt to parse ISO 8601 format event_ts = datetime.fromisoformat(ts_str.replace('Z', '+00:00')) except ValueError: app.logger.warning(f"Invalid timestamp format received: {ts_str}. Using current UTC time.") # Optionally return an error here if strict timestamp validation is needed # return jsonify({"success": False, "message": "Invalid timestamp format"}), 400 new_event = WorkEvent(user=user, state=state, ts=event_ts) try: app.logger.info(f"Attempting to add event to database: User={user}, State={state}, TS={event_ts}") db.session.add(new_event) db.session.commit() app.logger.info(f"Successfully recorded event: User={user}, State={state}") # Already added, maybe refine slightly return jsonify({"success": True}), 201 except SQLAlchemyError as e: db.session.rollback() app.logger.error(f"Database error while recording event: {e}") return jsonify({"success": False, "message": "Database error"}), 500 except Exception as e: app.logger.error(f"Unexpected error processing report request: {e}") # Refined outer error return jsonify({"success": False, "message": "Internal server error"}), 500 # --- Helper Functions for Duration Calculation --- def calculate_duration_sql(time_period): """ Generates the core SQL query to calculate working durations. Uses LEAD() window function to pair 'working' with the next event. Calculates duration in hours using PostgreSQL functions. """ # PostgreSQL date functions (already compatible with the database) period_grouping = { 'daily': "DATE_TRUNC('day', start_time)", 'weekly': "DATE_TRUNC('week', start_time)", # PostgreSQL week starts Monday 'monthly': "DATE_TRUNC('month', start_time)" }.get(time_period, "DATE_TRUNC('day', start_time)") # Default to daily if invalid # Calculate duration using EXTRACT(EPOCH FROM interval) / 3600 for hours duration_calculation = "EXTRACT(EPOCH FROM (next_event_time - start_time)) / 3600.0" # Use public schema explicitly, ensure proper aggregation by user and period sql_query = f""" WITH EventPairs AS ( SELECT "user", ts AS start_time, state, LEAD(ts) OVER (PARTITION BY "user" ORDER BY ts) AS next_event_time, LEAD(state) OVER (PARTITION BY "user" ORDER BY ts) AS next_event_state FROM public.work_events ORDER BY "user", ts ), CalculatedDurations AS ( SELECT "user", {period_grouping} AS period_start, SUM( CASE WHEN state = 'working' AND next_event_time IS NOT NULL THEN {duration_calculation} ELSE 0 -- Ignore intervals starting with 'stopped' or without a following event END ) AS total_hours, MIN(CASE WHEN state = 'working' THEN start_time END) AS first_login_time FROM EventPairs WHERE state = 'working' -- Only consider intervals that start with 'working' GROUP BY "user", period_start ) -- Final aggregation to ensure one row per user per period SELECT "user", period_start, SUM(total_hours) AS total_hours, MIN(first_login_time) AS first_login_time FROM CalculatedDurations GROUP BY "user", period_start ORDER BY "user", period_start DESC; """ # Add debug logging to see the SQL query app.logger.info(f"Generated SQL query: {sql_query}") return sql_query def filter_sql_by_user(base_sql, user): """Applies a user filter to the SQL query safely.""" # Find the position of GROUP BY to insert WHERE clause correctly group_by_pos = base_sql.find("GROUP BY") if group_by_pos != -1: # Make the user filter case-insensitive using LOWER() where_clause = "WHERE state = 'working' AND LOWER(\"user\") = LOWER(:user)\n " # Replace the generic WHERE clause with the user-specific one filtered_sql = base_sql[:base_sql.find("WHERE")] + where_clause + base_sql[group_by_pos:] return filtered_sql else: # Should not happen with the current query structure, but handle defensively return base_sql # Return original if GROUP BY not found def fetch_duration_report(time_period, user_filter=None): """Fetches duration report data from the database.""" app.logger.debug(f"Fetching duration report. Period: {time_period}, User: {user_filter}") sql_query = calculate_duration_sql(time_period) params = {} if user_filter: # Note: filter_sql_by_user modifies the query string directly sql_query = filter_sql_by_user(sql_query, user_filter) params['user'] = user_filter app.logger.debug(f"Applying user filter: {user_filter}") # Add debug to show database connection info db_uri = app.config['SQLALCHEMY_DATABASE_URI'] masked_uri = db_uri if '@' in db_uri: parts = db_uri.split('@') masked_uri = "****@" + parts[1] app.logger.info(f"Executing query using database: {masked_uri}") try: # Add a simple count query to verify data in the database count_query = "SELECT COUNT(*) FROM work_events" count_result = db.session.execute(text(count_query)).scalar() app.logger.info(f"Total records in work_events table: {count_result}") # Add another query to check distinct users users_query = "SELECT DISTINCT \"user\" FROM work_events" users_result = db.session.execute(text(users_query)).fetchall() user_list = [row[0] for row in users_result] app.logger.info(f"Distinct users in work_events table: {user_list}") results = db.session.execute(text(sql_query), params).mappings().all() app.logger.debug(f"Database query executed. Found {len(results)} rows.") return results except Exception as e: app.logger.error(f"Error executing duration report query: {e}") # Re-raise the exception to be handled by the endpoint's error handler raise def fetch_user_activity(username, start_date, end_date): """Fetches detailed user activity logs for a specific date range.""" app.logger.debug(f"Fetching activity logs for user: {username}, from: {start_date}, to: {end_date}") # SQL query to match working and stopped pairs and calculate durations sql_query = """ WITH EventPairs AS ( SELECT w1."user", DATE(w1.ts) AS work_date, w1.ts AS start_time, w2.ts AS end_time, EXTRACT(EPOCH FROM (w2.ts - w1.ts))/3600 AS session_duration_hours FROM work_events w1 JOIN work_events w2 ON w1."user" = w2."user" AND w1.state = 'working' AND w2.state = 'stopped' AND w2.ts > w1.ts AND NOT EXISTS ( SELECT 1 FROM work_events w3 WHERE w3."user" = w1."user" AND w3.ts > w1.ts AND w3.ts < w2.ts ) WHERE w1."user" = :username AND DATE(w1.ts) BETWEEN :start_date AND :end_date ORDER BY w1.ts ) SELECT * FROM EventPairs """ try: params = { 'username': username, 'start_date': start_date, 'end_date': end_date } results = db.session.execute(text(sql_query), params).mappings().all() app.logger.debug(f"User activity query executed. Found {len(results)} rows.") return results except Exception as e: app.logger.error(f"Error executing user activity query: {e}") raise def format_report_data(results, time_period): """Formats the raw database results into a list of dictionaries for the API.""" app.logger.debug(f"Formatting report data for period: {time_period}. Input rows: {len(results)}") period_key_map = { 'daily': 'day', 'weekly': 'week_start', 'monthly': 'month_start' } period_key = period_key_map.get(time_period, 'period_start') # Default if unexpected period # First convert raw rows to dictionaries raw_data = [] for row in results: # Ensure period_start is converted to string if it's a date/datetime object period_value = row['period_start'] if hasattr(period_value, 'isoformat'): period_value = period_value.isoformat() # Format first_login_time first_login_time = row['first_login_time'] if hasattr(first_login_time, 'isoformat'): first_login_time = first_login_time.isoformat() # Ensure duration_hours is a float, not a string or Decimal duration_hours = row['total_hours'] if duration_hours is None: duration_hours = 0.0 else: # Convert to float explicitly to ensure it's JSON serializable as a number duration_hours = float(duration_hours) raw_data.append({ 'user': row['user'], period_key: period_value, 'duration_hours': duration_hours, 'first_login_time': first_login_time }) # Additional preprocessing to consolidate any duplicate user entries user_period_map = {} for entry in raw_data: user = entry['user'] period = entry[period_key] key = f"{user}_{period}" if key in user_period_map: # Aggregate duration for existing user+period user_period_map[key]['duration_hours'] += entry['duration_hours'] # Use the earliest first_login_time existing_time = user_period_map[key]['first_login_time'] new_time = entry['first_login_time'] if existing_time and new_time: if new_time < existing_time: user_period_map[key]['first_login_time'] = new_time else: # New user+period combination user_period_map[key] = entry # Convert consolidated map back to list formatted_data = list(user_period_map.values()) app.logger.debug(f"Formatted report data created. Output rows: {len(formatted_data)}") return formatted_data def format_user_activity(results): """Formats the raw user activity results into a list of dictionaries.""" formatted_data = [] for row in results: start_time = row['start_time'] end_time = row['end_time'] # Format timestamps for display if hasattr(start_time, 'isoformat'): start_time = start_time.isoformat() if hasattr(end_time, 'isoformat'): end_time = end_time.isoformat() # Format duration as float duration = float(row['session_duration_hours']) if row['session_duration_hours'] is not None else 0.0 formatted_data.append({ 'date': row['work_date'].isoformat() if hasattr(row['work_date'], 'isoformat') else str(row['work_date']), 'start_time': start_time, 'end_time': end_time, 'duration_hours': round(duration, 2) }) return formatted_data # --- Reporting Endpoints --- # Reporting endpoints (basic implementation) @app.route('/api/reports/daily', methods=['GET']) def get_daily_report(): app.logger.info("Daily report API requested.") try: app.logger.info("Fetching daily report data...") # Get date parameter or use today as default selected_date = request.args.get('date') if selected_date: app.logger.info(f"Using selected date: {selected_date}") else: selected_date = datetime.now().strftime('%Y-%m-%d') app.logger.info(f"No date provided, using today: {selected_date}") user_filter = request.args.get('user') # Get regular daily report results results = fetch_duration_report('daily', user_filter) # Filter to only include entries for the selected date filtered_results = [] for row in results: row_date = row['period_start'] if hasattr(row_date, 'isoformat'): row_date = row_date.isoformat() # Check if the row's date matches the selected date if row_date and row_date.startswith(selected_date): filtered_results.append(row) # Add debug logging for usernames in raw results app.logger.info(f"Raw results usernames for date {selected_date}: {[r['user'] for r in filtered_results]}") report = format_report_data(filtered_results, 'daily') # Add debug logging for usernames in formatted data app.logger.info(f"Formatted data usernames: {[r['user'] for r in report]}") app.logger.info(f"Successfully generated daily report for date: {selected_date}, user: {request.args.get('user', 'All')}. Found {len(filtered_results)} records.") return jsonify({"success": True, "data": report}) except Exception as e: app.logger.error(f"Error generating daily report: {e}") return jsonify({"success": False, "message": "Error generating report"}), 500 @app.route('/api/reports/weekly', methods=['GET']) def get_weekly_report(): app.logger.info("Weekly report API requested.") try: app.logger.info("Fetching weekly report data...") # Check if a specific day within the week was requested day_filter = request.args.get('day') user_filter = request.args.get('user') if day_filter: app.logger.info(f"Filtering weekly report for specific day: {day_filter}") # Use daily query with the specific date results = fetch_duration_report('daily', user_filter) # Filter results to only include the requested day filtered_results = [] for row in results: row_date = row['period_start'] if hasattr(row_date, 'isoformat'): row_date = row_date.isoformat() # Check if the row's date matches the requested day if row_date and row_date.startswith(day_filter): filtered_results.append(row) results = filtered_results else: # Get current week dates for filtering now = datetime.now() # Get Monday of current week current_week_start = now - timedelta(days=now.weekday()) current_week_start = current_week_start.replace(hour=0, minute=0, second=0, microsecond=0) # Regular weekly report (whole week) results = fetch_duration_report('weekly', user_filter) # Filter to just include current week and aggregate by user filtered_results = [] user_aggregated = {} for row in results: row_date = row['period_start'] # Convert to datetime if it's a string if isinstance(row_date, str): try: row_date = datetime.fromisoformat(row_date.replace('Z', '+00:00')) except ValueError: continue # Check if it's from current week if row_date and row_date.date() == current_week_start.date(): username = row['user'] if username in user_aggregated: # Add duration hours user_aggregated[username]['total_hours'] += row['total_hours'] # Keep earliest first_login_time if row['first_login_time'] and user_aggregated[username]['first_login_time']: row_login = row['first_login_time'] if isinstance(row_login, str): try: row_login = datetime.fromisoformat(row_login.replace('Z', '+00:00')) except ValueError: row_login = None existing_login = user_aggregated[username]['first_login_time'] if isinstance(existing_login, str): try: existing_login = datetime.fromisoformat(existing_login.replace('Z', '+00:00')) except ValueError: existing_login = None if row_login and existing_login and row_login < existing_login: user_aggregated[username]['first_login_time'] = row['first_login_time'] else: # First entry for this user user_aggregated[username] = { 'user': username, 'period_start': row_date, 'total_hours': row['total_hours'], 'first_login_time': row['first_login_time'] } # Convert aggregated dict back to list filtered_results = list(user_aggregated.values()) results = filtered_results if filtered_results else results # Add debug logging for usernames in raw results app.logger.info(f"Raw results usernames: {[r['user'] for r in results]}") report = format_report_data(results, 'weekly') # Add debug logging for usernames in formatted data app.logger.info(f"Formatted data usernames: {[r['user'] for r in report]}") app.logger.info(f"Successfully generated weekly report for user: {request.args.get('user', 'All')}. Found {len(results)} records.") return jsonify({"success": True, "data": report}) except Exception as e: app.logger.error(f"Error generating weekly report: {e}") return jsonify({"success": False, "message": "Error generating report"}), 500 @app.route('/api/reports/monthly', methods=['GET']) def get_monthly_report(): app.logger.info("Monthly report API requested.") try: app.logger.info("Fetching monthly report data...") # Get current month for filtering now = datetime.now() current_month_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) # Get regular monthly report results = fetch_duration_report('monthly', request.args.get('user')) # Filter to only include current month and aggregate by user filtered_results = [] user_aggregated = {} for row in results: row_date = row['period_start'] # Convert to datetime if it's a string if isinstance(row_date, str): try: row_date = datetime.fromisoformat(row_date.replace('Z', '+00:00')) except ValueError: continue # Check if it's from current month if row_date and row_date.year == current_month_start.year and row_date.month == current_month_start.month: username = row['user'] if username in user_aggregated: # Add duration hours user_aggregated[username]['total_hours'] += row['total_hours'] else: # First entry for this user user_aggregated[username] = { 'user': username, 'period_start': row_date, 'total_hours': row['total_hours'], 'first_login_time': row['first_login_time'] } # Convert aggregated dict back to list filtered_results = list(user_aggregated.values()) results = filtered_results if filtered_results else results # Add debug logging for usernames in raw results app.logger.info(f"Raw results usernames: {[r['user'] for r in results]}") report = format_report_data(results, 'monthly') # Add debug logging for usernames in formatted data app.logger.info(f"Formatted data usernames: {[r['user'] for r in report]}") app.logger.info(f"Successfully generated monthly report for user: {request.args.get('user', 'All')}. Found {len(results)} records.") return jsonify({"success": True, "data": report}) except Exception as e: app.logger.error(f"Error generating monthly report: {e}") return jsonify({"success": False, "message": "Error generating report"}), 500 @app.route('/api/user-activity/', methods=['GET']) def get_user_activity(username): """Gets detailed activity logs for a specific user.""" app.logger.info(f"User activity logs requested for: {username}") # Get date range from query parameters, default to current day if not provided start_date = request.args.get('start_date', datetime.now().strftime('%Y-%m-%d')) end_date = request.args.get('end_date', start_date) try: app.logger.info(f"Fetching activity logs for user: {username}, from: {start_date}, to: {end_date}") results = fetch_user_activity(username, start_date, end_date) activity_logs = format_user_activity(results) app.logger.info(f"Successfully retrieved {len(activity_logs)} activity records for user: {username}") return jsonify({ "success": True, "data": { "username": username, "start_date": start_date, "end_date": end_date, "activities": activity_logs } }) except Exception as e: app.logger.error(f"Error retrieving user activity logs: {e}") return jsonify({"success": False, "message": "Error retrieving activity logs"}), 500 # Error handlers @app.errorhandler(400) def bad_request(error): return jsonify({ 'success': False, 'message': 'Bad request' }), 400 @app.errorhandler(404) def not_found_error(error): app.logger.warning(f"404 Not Found error triggered for URL: {request.url}") return jsonify({"success": False, "message": "Resource not found"}), 404 @app.errorhandler(405) def method_not_allowed(error): return jsonify({ 'success': False, 'message': 'Method not allowed' }), 405 @app.errorhandler(500) def internal_error(error): # Note: The specific error causing the 500 might have already been logged app.logger.error(f"Global 500 Internal Server error handler triggered: {error}") return jsonify({"success": False, "message": "Internal server error"}), 500 # Simple dashboard (optional) @app.route('/') def dashboard(): app.logger.info("Dashboard page requested.") # Add direct query to verify data try: # Direct query to list all distinct users direct_query = "SELECT DISTINCT \"user\" FROM work_events" direct_results = db.session.execute(text(direct_query)).fetchall() user_list = [row[0] for row in direct_results] app.logger.info(f"DIRECT QUERY - Distinct users in database: {user_list}") # Direct query to count records count_query = "SELECT COUNT(*) FROM work_events" count_result = db.session.execute(text(count_query)).scalar() app.logger.info(f"DIRECT QUERY - Total records in work_events: {count_result}") # Get first few records to inspect sample_query = "SELECT id, \"user\", state, ts FROM work_events LIMIT 5" sample_results = db.session.execute(text(sample_query)).fetchall() app.logger.info(f"DIRECT QUERY - Sample records: {sample_results}") # Check the current schema name schema_query = "SELECT current_schema()" schema_result = db.session.execute(text(schema_query)).scalar() app.logger.info(f"DIRECT QUERY - Current schema: {schema_result}") # List all schemas in the database schemas_query = "SELECT schema_name FROM information_schema.schemata" schemas_results = db.session.execute(text(schemas_query)).fetchall() schema_list = [row[0] for row in schemas_results] app.logger.info(f"DIRECT QUERY - Available schemas: {schema_list}") # List all tables in the public schema tables_query = "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'" tables_results = db.session.execute(text(tables_query)).fetchall() table_list = [row[0] for row in tables_results] app.logger.info(f"DIRECT QUERY - Tables in public schema: {table_list}") except Exception as e: app.logger.error(f"Error during direct database query debugging: {str(e)}") return render_template('dashboard.html') # Create the database tables def init_db(): with app.app_context(): # Log the full database URI (with sensitive info removed) db_uri = app.config['SQLALCHEMY_DATABASE_URI'] if 'postgresql' in db_uri: # Mask password if using PostgreSQL masked_uri = db_uri.replace(db_uri.split('@')[0], 'postgresql://****:****') app.logger.info(f"Using database URI: {masked_uri}") # Initialize PostgreSQL-specific components app.logger.info("Detected PostgreSQL database") # Check for extensions (can add more as needed) try: db.session.execute(text("SELECT 1 FROM pg_extension WHERE extname = 'pgcrypto'")) app.logger.info("PostgreSQL database version check completed") except Exception as e: app.logger.warning(f"PostgreSQL extension check failed: {e}") else: app.logger.info(f"Using database URI: {db_uri}") # For SQLite, ensure parent directory exists and is writable if db_uri and db_uri.startswith('sqlite:///'): db_file = db_uri.replace('sqlite:///', '') db_dir = os.path.dirname(db_file) app.logger.info(f"SQLite database file path: {db_file}") app.logger.info(f"SQLite database directory exists: {os.path.exists(db_dir)}") if os.path.exists(db_dir): app.logger.info(f"SQLite database directory writable: {os.access(db_dir, os.W_OK)}") # Create the tables db.create_all() app.logger.info("Database initialized") if __name__ == '__main__': print("Starting application...") # Create database tables if they don't exist print(f"Instance path: {app.instance_path}") instance_exists = os.path.exists(app.instance_path) print(f"Instance path exists: {instance_exists}") # Make sure the instance directory exists if not instance_exists: try: os.makedirs(app.instance_path) print(f"Created instance directory: {app.instance_path}") except Exception as e: print(f"Error creating instance directory: {e}") # Check instance directory permissions has_write_access = os.access(app.instance_path, os.W_OK) print(f"Instance path write access: {has_write_access}") # Print database configuration db_uri = app.config['SQLALCHEMY_DATABASE_URI'] print(f"Database URI: {db_uri}") # For SQLite, print more details about the database file if db_uri.startswith('sqlite:///'): db_file = db_uri.replace('sqlite:///', '') print(f"Database file path: {db_file}") db_dir = os.path.dirname(db_file) print(f"Database directory: {db_dir}") print(f"Database directory exists: {os.path.exists(db_dir)}") print(f"Database directory writable: {os.access(db_dir, os.W_OK)}") print(f"Database file exists: {os.path.exists(db_file)}") if os.path.exists(db_file): print(f"Database file writable: {os.access(db_file, os.W_OK)}") # Use try/except to catch and log any initialization errors try: print("Initializing database...") init_db() # Ensure database and tables are created on first run print("Database initialization successful") except Exception as e: print(f"Error during database initialization: {e}") import traceback traceback.print_exc() # Check if database file was created (for SQLite) if db_uri.startswith('sqlite:///'): db_file = db_uri.replace('sqlite:///', '') print(f"After init: Database file exists: {os.path.exists(db_file)}") # Run the Flask application host = os.environ.get('HOST', '0.0.0.0') port = int(os.environ.get('PORT', 5000)) debug = os.environ.get('DEBUG', 'False').lower() == 'true' print(f"Starting Flask application on {host}:{port} (debug={debug})") app.run(host=host, port=port, debug=debug)