""" API endpoints for retrieving activity reports. This module provides endpoints for retrieving daily, weekly, and monthly reports, as well as detailed user activity logs. """ from datetime import datetime, timedelta from flask import Blueprint, request, jsonify, current_app from sqlalchemy import text from sqlalchemy.exc import SQLAlchemyError from app import db from app.utils.queries import calculate_duration_sql, filter_sql_by_user from app.utils.formatting import format_report_data, format_user_activity # Create a blueprint for report-related API endpoints reports_bp = Blueprint('reports', __name__, url_prefix='/api') def fetch_duration_report(time_period, user_filter=None): """ Fetches duration report data from the database. Args: time_period (str): Time period to group by ('daily', 'weekly', or 'monthly') user_filter (str, optional): Username to filter results by Returns: list: List of report data rows """ current_app.logger.debug(f"Fetching duration report. Period: {time_period}, User: {user_filter}") sql_query = calculate_duration_sql(time_period) params = {} if user_filter: # Note: filter_sql_by_user modifies the query string directly sql_query = filter_sql_by_user(sql_query, user_filter) params['user'] = user_filter current_app.logger.debug(f"Applying user filter: {user_filter}") # Add debug to show database connection info db_uri = current_app.config['SQLALCHEMY_DATABASE_URI'] masked_uri = db_uri if '@' in db_uri: parts = db_uri.split('@') masked_uri = "****@" + parts[1] current_app.logger.info(f"Executing query using database: {masked_uri}") try: # Add a simple count query to verify data in the database count_query = "SELECT COUNT(*) FROM work_events" count_result = db.session.execute(text(count_query)).scalar() current_app.logger.info(f"Total records in work_events table: {count_result}") # Add another query to check distinct users users_query = "SELECT DISTINCT \"user\" FROM work_events" users_result = db.session.execute(text(users_query)).fetchall() user_list = [row[0] for row in users_result] current_app.logger.info(f"Distinct users in work_events table: {user_list}") results = db.session.execute(text(sql_query), params).mappings().all() current_app.logger.debug(f"Database query executed. Found {len(results)} rows.") return results except Exception as e: current_app.logger.error(f"Error executing duration report query: {e}") # Re-raise the exception to be handled by the endpoint's error handler raise def fetch_user_activity(username, start_date, end_date): """ Fetches detailed user activity logs for a specific date range. Args: username (str): Username to fetch activity for start_date (str): Start date in YYYY-MM-DD format end_date (str): End date in YYYY-MM-DD format Returns: list: List of user activity rows """ current_app.logger.debug(f"Fetching activity logs for user: {username}, from: {start_date}, to: {end_date}") # SQL query to match working and stopped pairs and calculate durations sql_query = """ WITH EventPairs AS ( SELECT w1."user", DATE(w1.ts) AS work_date, w1.ts AS start_time, w2.ts AS end_time, EXTRACT(EPOCH FROM (w2.ts - w1.ts))/3600 AS session_duration_hours FROM work_events w1 JOIN work_events w2 ON w1."user" = w2."user" AND w1.state = 'working' AND w2.state = 'stopped' AND w2.ts > w1.ts AND NOT EXISTS ( SELECT 1 FROM work_events w3 WHERE w3."user" = w1."user" AND w3.ts > w1.ts AND w3.ts < w2.ts ) WHERE w1."user" = :username AND DATE(w1.ts) BETWEEN :start_date AND :end_date ORDER BY w1.ts ) SELECT * FROM EventPairs """ try: params = { 'username': username, 'start_date': start_date, 'end_date': end_date } results = db.session.execute(text(sql_query), params).mappings().all() current_app.logger.debug(f"User activity query executed. Found {len(results)} rows.") return results except Exception as e: current_app.logger.error(f"Error executing user activity query: {e}") raise @reports_bp.route('/reports/daily', methods=['GET']) def get_daily_report(): """ Endpoint for retrieving daily report data. Query Parameters: user (str, optional): Filter results by username date (str, optional): Specific date in YYYY-MM-DD format """ current_app.logger.info("Daily report API requested.") try: current_app.logger.info("Fetching daily report data...") # Get date parameter or use today as default selected_date = request.args.get('date') if selected_date: current_app.logger.info(f"Using selected date: {selected_date}") else: selected_date = datetime.now().strftime('%Y-%m-%d') current_app.logger.info(f"No date provided, using today: {selected_date}") user_filter = request.args.get('user') # Get regular daily report results results = fetch_duration_report('daily', user_filter) # Filter to only include entries for the selected date filtered_results = [] for row in results: row_date = row['period_start'] if hasattr(row_date, 'isoformat'): row_date = row_date.isoformat() # Check if the row's date matches the selected date if row_date and row_date.startswith(selected_date): filtered_results.append(row) # Add debug logging for usernames in raw results current_app.logger.info(f"Raw results usernames for date {selected_date}: {[r['user'] for r in filtered_results]}") report = format_report_data(filtered_results, 'daily') # Add debug logging for usernames in formatted data current_app.logger.info(f"Formatted data usernames: {[r['user'] for r in report]}") current_app.logger.info(f"Successfully generated daily report for date: {selected_date}, user: {request.args.get('user', 'All')}. Found {len(filtered_results)} records.") return jsonify({"success": True, "data": report}) except Exception as e: current_app.logger.error(f"Error generating daily report: {e}") return jsonify({"success": False, "message": "Error generating report"}), 500 @reports_bp.route('/reports/weekly', methods=['GET']) def get_weekly_report(): """ Endpoint for retrieving weekly report data. Query Parameters: user (str, optional): Filter results by username day (str, optional): Specific day in YYYY-MM-DD format """ current_app.logger.info("Weekly report API requested.") try: current_app.logger.info("Fetching weekly report data...") # Check if a specific day within the week was requested day_filter = request.args.get('day') user_filter = request.args.get('user') if day_filter: current_app.logger.info(f"Filtering weekly report for specific day: {day_filter}") # Use daily query with the specific date results = fetch_duration_report('daily', user_filter) # Filter results to only include the requested day filtered_results = [] for row in results: row_date = row['period_start'] if hasattr(row_date, 'isoformat'): row_date = row_date.isoformat() # Check if the row's date matches the requested day if row_date and row_date.startswith(day_filter): filtered_results.append(row) results = filtered_results else: # Get current week dates for filtering now = datetime.now() # Get Monday of current week current_week_start = now - timedelta(days=now.weekday()) current_week_start = current_week_start.replace(hour=0, minute=0, second=0, microsecond=0) # Regular weekly report (whole week) results = fetch_duration_report('weekly', user_filter) # Filter to just include current week and aggregate by user filtered_results = [] user_aggregated = {} for row in results: row_date = row['period_start'] # Convert to datetime if it's a string if isinstance(row_date, str): try: row_date = datetime.fromisoformat(row_date.replace('Z', '+00:00')) except ValueError: continue # Check if it's from current week if row_date and row_date.date() == current_week_start.date(): username = row['user'] if username in user_aggregated: # Add duration hours user_aggregated[username]['total_hours'] += row['total_hours'] # Keep earliest first_login_time if row['first_login_time'] and user_aggregated[username]['first_login_time']: row_login = row['first_login_time'] if isinstance(row_login, str): try: row_login = datetime.fromisoformat(row_login.replace('Z', '+00:00')) except ValueError: row_login = None existing_login = user_aggregated[username]['first_login_time'] if isinstance(existing_login, str): try: existing_login = datetime.fromisoformat(existing_login.replace('Z', '+00:00')) except ValueError: existing_login = None if row_login and existing_login and row_login < existing_login: user_aggregated[username]['first_login_time'] = row['first_login_time'] else: # First entry for this user user_aggregated[username] = { 'user': username, 'period_start': row_date, 'total_hours': row['total_hours'], 'first_login_time': row['first_login_time'] } # Convert aggregated dict back to list filtered_results = list(user_aggregated.values()) results = filtered_results if filtered_results else results # Add debug logging for usernames in raw results current_app.logger.info(f"Raw results usernames: {[r['user'] for r in results]}") report = format_report_data(results, 'weekly') # Add debug logging for usernames in formatted data current_app.logger.info(f"Formatted data usernames: {[r['user'] for r in report]}") current_app.logger.info(f"Successfully generated weekly report for user: {request.args.get('user', 'All')}. Found {len(results)} records.") return jsonify({"success": True, "data": report}) except Exception as e: current_app.logger.error(f"Error generating weekly report: {e}") return jsonify({"success": False, "message": "Error generating report"}), 500 @reports_bp.route('/reports/monthly', methods=['GET']) def get_monthly_report(): """ Endpoint for retrieving monthly report data. Query Parameters: user (str, optional): Filter results by username """ current_app.logger.info("Monthly report API requested.") try: current_app.logger.info("Fetching monthly report data...") # Get current month for filtering now = datetime.now() current_month_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) # Get regular monthly report results = fetch_duration_report('monthly', request.args.get('user')) # Filter to only include current month and aggregate by user filtered_results = [] user_aggregated = {} for row in results: row_date = row['period_start'] # Convert to datetime if it's a string if isinstance(row_date, str): try: row_date = datetime.fromisoformat(row_date.replace('Z', '+00:00')) except ValueError: continue # Check if it's from current month if row_date and row_date.year == current_month_start.year and row_date.month == current_month_start.month: username = row['user'] if username in user_aggregated: # Add duration hours user_aggregated[username]['total_hours'] += row['total_hours'] else: # First entry for this user user_aggregated[username] = { 'user': username, 'period_start': row_date, 'total_hours': row['total_hours'], 'first_login_time': row['first_login_time'] } # Convert aggregated dict back to list filtered_results = list(user_aggregated.values()) results = filtered_results if filtered_results else results # Add debug logging for usernames in raw results current_app.logger.info(f"Raw results usernames: {[r['user'] for r in results]}") report = format_report_data(results, 'monthly') # Add debug logging for usernames in formatted data current_app.logger.info(f"Formatted data usernames: {[r['user'] for r in report]}") current_app.logger.info(f"Successfully generated monthly report for user: {request.args.get('user', 'All')}. Found {len(results)} records.") return jsonify({"success": True, "data": report}) except Exception as e: current_app.logger.error(f"Error generating monthly report: {e}") return jsonify({"success": False, "message": "Error generating report"}), 500 @reports_bp.route('/user-activity/', methods=['GET']) def get_user_activity(username): """ Gets detailed activity logs for a specific user. Path Parameter: username: Username to fetch activity for Query Parameters: start_date (str, optional): Start date in YYYY-MM-DD format end_date (str, optional): End date in YYYY-MM-DD format """ current_app.logger.info(f"User activity logs requested for: {username}") # Get date range from query parameters, default to current day if not provided start_date = request.args.get('start_date', datetime.now().strftime('%Y-%m-%d')) end_date = request.args.get('end_date', start_date) try: current_app.logger.info(f"Fetching activity logs for user: {username}, from: {start_date}, to: {end_date}") results = fetch_user_activity(username, start_date, end_date) activity_logs = format_user_activity(results) current_app.logger.info(f"Successfully retrieved {len(activity_logs)} activity records for user: {username}") return jsonify({ "success": True, "data": { "username": username, "start_date": start_date, "end_date": end_date, "activities": activity_logs } }) except Exception as e: current_app.logger.error(f"Error retrieving user activity logs: {e}") return jsonify({"success": False, "message": "Error retrieving activity logs"}), 500