work-tracing/app/api/reports.py
ilia-gurielidze-autstand 9e6d0a6911 first commit
2025-05-05 12:12:46 +04:00

384 lines
16 KiB
Python

"""
API endpoints for retrieving activity reports.
This module provides endpoints for retrieving daily, weekly, and monthly reports,
as well as detailed user activity logs.
"""
from datetime import datetime, timedelta
from flask import Blueprint, request, jsonify, current_app
from sqlalchemy import text
from sqlalchemy.exc import SQLAlchemyError
from app import db
from app.utils.queries import calculate_duration_sql, filter_sql_by_user
from app.utils.formatting import format_report_data, format_user_activity
# Create a blueprint for report-related API endpoints
reports_bp = Blueprint('reports', __name__, url_prefix='/api')
def fetch_duration_report(time_period, user_filter=None):
"""
Fetches duration report data from the database.
Args:
time_period (str): Time period to group by ('daily', 'weekly', or 'monthly')
user_filter (str, optional): Username to filter results by
Returns:
list: List of report data rows
"""
current_app.logger.debug(f"Fetching duration report. Period: {time_period}, User: {user_filter}")
sql_query = calculate_duration_sql(time_period)
params = {}
if user_filter:
# Note: filter_sql_by_user modifies the query string directly
sql_query = filter_sql_by_user(sql_query, user_filter)
params['user'] = user_filter
current_app.logger.debug(f"Applying user filter: {user_filter}")
# Add debug to show database connection info
db_uri = current_app.config['SQLALCHEMY_DATABASE_URI']
masked_uri = db_uri
if '@' in db_uri:
parts = db_uri.split('@')
masked_uri = "****@" + parts[1]
current_app.logger.info(f"Executing query using database: {masked_uri}")
try:
# Add a simple count query to verify data in the database
count_query = "SELECT COUNT(*) FROM work_events"
count_result = db.session.execute(text(count_query)).scalar()
current_app.logger.info(f"Total records in work_events table: {count_result}")
# Add another query to check distinct users
users_query = "SELECT DISTINCT \"user\" FROM work_events"
users_result = db.session.execute(text(users_query)).fetchall()
user_list = [row[0] for row in users_result]
current_app.logger.info(f"Distinct users in work_events table: {user_list}")
results = db.session.execute(text(sql_query), params).mappings().all()
current_app.logger.debug(f"Database query executed. Found {len(results)} rows.")
return results
except Exception as e:
current_app.logger.error(f"Error executing duration report query: {e}")
# Re-raise the exception to be handled by the endpoint's error handler
raise
def fetch_user_activity(username, start_date, end_date):
"""
Fetches detailed user activity logs for a specific date range.
Args:
username (str): Username to fetch activity for
start_date (str): Start date in YYYY-MM-DD format
end_date (str): End date in YYYY-MM-DD format
Returns:
list: List of user activity rows
"""
current_app.logger.debug(f"Fetching activity logs for user: {username}, from: {start_date}, to: {end_date}")
# SQL query to match working and stopped pairs and calculate durations
sql_query = """
WITH EventPairs AS (
SELECT
w1."user",
DATE(w1.ts) AS work_date,
w1.ts AS start_time,
w2.ts AS end_time,
EXTRACT(EPOCH FROM (w2.ts - w1.ts))/3600 AS session_duration_hours
FROM
work_events w1
JOIN
work_events w2 ON w1."user" = w2."user"
AND w1.state = 'working'
AND w2.state = 'stopped'
AND w2.ts > w1.ts
AND NOT EXISTS (
SELECT 1 FROM work_events w3
WHERE w3."user" = w1."user"
AND w3.ts > w1.ts AND w3.ts < w2.ts
)
WHERE
w1."user" = :username
AND DATE(w1.ts) BETWEEN :start_date AND :end_date
ORDER BY
w1.ts
)
SELECT * FROM EventPairs
"""
try:
params = {
'username': username,
'start_date': start_date,
'end_date': end_date
}
results = db.session.execute(text(sql_query), params).mappings().all()
current_app.logger.debug(f"User activity query executed. Found {len(results)} rows.")
return results
except Exception as e:
current_app.logger.error(f"Error executing user activity query: {e}")
raise
@reports_bp.route('/reports/daily', methods=['GET'])
def get_daily_report():
"""
Endpoint for retrieving daily report data.
Query Parameters:
user (str, optional): Filter results by username
date (str, optional): Specific date in YYYY-MM-DD format
"""
current_app.logger.info("Daily report API requested.")
try:
current_app.logger.info("Fetching daily report data...")
# Get date parameter or use today as default
selected_date = request.args.get('date')
if selected_date:
current_app.logger.info(f"Using selected date: {selected_date}")
else:
selected_date = datetime.now().strftime('%Y-%m-%d')
current_app.logger.info(f"No date provided, using today: {selected_date}")
user_filter = request.args.get('user')
# Get regular daily report results
results = fetch_duration_report('daily', user_filter)
# Filter to only include entries for the selected date
filtered_results = []
for row in results:
row_date = row['period_start']
if hasattr(row_date, 'isoformat'):
row_date = row_date.isoformat()
# Check if the row's date matches the selected date
if row_date and row_date.startswith(selected_date):
filtered_results.append(row)
# Add debug logging for usernames in raw results
current_app.logger.info(f"Raw results usernames for date {selected_date}: {[r['user'] for r in filtered_results]}")
report = format_report_data(filtered_results, 'daily')
# Add debug logging for usernames in formatted data
current_app.logger.info(f"Formatted data usernames: {[r['user'] for r in report]}")
current_app.logger.info(f"Successfully generated daily report for date: {selected_date}, user: {request.args.get('user', 'All')}. Found {len(filtered_results)} records.")
return jsonify({"success": True, "data": report})
except Exception as e:
current_app.logger.error(f"Error generating daily report: {e}")
return jsonify({"success": False, "message": "Error generating report"}), 500
@reports_bp.route('/reports/weekly', methods=['GET'])
def get_weekly_report():
"""
Endpoint for retrieving weekly report data.
Query Parameters:
user (str, optional): Filter results by username
day (str, optional): Specific day in YYYY-MM-DD format
"""
current_app.logger.info("Weekly report API requested.")
try:
current_app.logger.info("Fetching weekly report data...")
# Check if a specific day within the week was requested
day_filter = request.args.get('day')
user_filter = request.args.get('user')
if day_filter:
current_app.logger.info(f"Filtering weekly report for specific day: {day_filter}")
# Use daily query with the specific date
results = fetch_duration_report('daily', user_filter)
# Filter results to only include the requested day
filtered_results = []
for row in results:
row_date = row['period_start']
if hasattr(row_date, 'isoformat'):
row_date = row_date.isoformat()
# Check if the row's date matches the requested day
if row_date and row_date.startswith(day_filter):
filtered_results.append(row)
results = filtered_results
else:
# Get current week dates for filtering
now = datetime.now()
# Get Monday of current week
current_week_start = now - timedelta(days=now.weekday())
current_week_start = current_week_start.replace(hour=0, minute=0, second=0, microsecond=0)
# Regular weekly report (whole week)
results = fetch_duration_report('weekly', user_filter)
# Filter to just include current week and aggregate by user
filtered_results = []
user_aggregated = {}
for row in results:
row_date = row['period_start']
# Convert to datetime if it's a string
if isinstance(row_date, str):
try:
row_date = datetime.fromisoformat(row_date.replace('Z', '+00:00'))
except ValueError:
continue
# Check if it's from current week
if row_date and row_date.date() == current_week_start.date():
username = row['user']
if username in user_aggregated:
# Add duration hours
user_aggregated[username]['total_hours'] += row['total_hours']
# Keep earliest first_login_time
if row['first_login_time'] and user_aggregated[username]['first_login_time']:
row_login = row['first_login_time']
if isinstance(row_login, str):
try:
row_login = datetime.fromisoformat(row_login.replace('Z', '+00:00'))
except ValueError:
row_login = None
existing_login = user_aggregated[username]['first_login_time']
if isinstance(existing_login, str):
try:
existing_login = datetime.fromisoformat(existing_login.replace('Z', '+00:00'))
except ValueError:
existing_login = None
if row_login and existing_login and row_login < existing_login:
user_aggregated[username]['first_login_time'] = row['first_login_time']
else:
# First entry for this user
user_aggregated[username] = {
'user': username,
'period_start': row_date,
'total_hours': row['total_hours'],
'first_login_time': row['first_login_time']
}
# Convert aggregated dict back to list
filtered_results = list(user_aggregated.values())
results = filtered_results if filtered_results else results
# Add debug logging for usernames in raw results
current_app.logger.info(f"Raw results usernames: {[r['user'] for r in results]}")
report = format_report_data(results, 'weekly')
# Add debug logging for usernames in formatted data
current_app.logger.info(f"Formatted data usernames: {[r['user'] for r in report]}")
current_app.logger.info(f"Successfully generated weekly report for user: {request.args.get('user', 'All')}. Found {len(results)} records.")
return jsonify({"success": True, "data": report})
except Exception as e:
current_app.logger.error(f"Error generating weekly report: {e}")
return jsonify({"success": False, "message": "Error generating report"}), 500
@reports_bp.route('/reports/monthly', methods=['GET'])
def get_monthly_report():
"""
Endpoint for retrieving monthly report data.
Query Parameters:
user (str, optional): Filter results by username
"""
current_app.logger.info("Monthly report API requested.")
try:
current_app.logger.info("Fetching monthly report data...")
# Get current month for filtering
now = datetime.now()
current_month_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
# Get regular monthly report
results = fetch_duration_report('monthly', request.args.get('user'))
# Filter to only include current month and aggregate by user
filtered_results = []
user_aggregated = {}
for row in results:
row_date = row['period_start']
# Convert to datetime if it's a string
if isinstance(row_date, str):
try:
row_date = datetime.fromisoformat(row_date.replace('Z', '+00:00'))
except ValueError:
continue
# Check if it's from current month
if row_date and row_date.year == current_month_start.year and row_date.month == current_month_start.month:
username = row['user']
if username in user_aggregated:
# Add duration hours
user_aggregated[username]['total_hours'] += row['total_hours']
else:
# First entry for this user
user_aggregated[username] = {
'user': username,
'period_start': row_date,
'total_hours': row['total_hours'],
'first_login_time': row['first_login_time']
}
# Convert aggregated dict back to list
filtered_results = list(user_aggregated.values())
results = filtered_results if filtered_results else results
# Add debug logging for usernames in raw results
current_app.logger.info(f"Raw results usernames: {[r['user'] for r in results]}")
report = format_report_data(results, 'monthly')
# Add debug logging for usernames in formatted data
current_app.logger.info(f"Formatted data usernames: {[r['user'] for r in report]}")
current_app.logger.info(f"Successfully generated monthly report for user: {request.args.get('user', 'All')}. Found {len(results)} records.")
return jsonify({"success": True, "data": report})
except Exception as e:
current_app.logger.error(f"Error generating monthly report: {e}")
return jsonify({"success": False, "message": "Error generating report"}), 500
@reports_bp.route('/user-activity/<username>', methods=['GET'])
def get_user_activity(username):
"""
Gets detailed activity logs for a specific user.
Path Parameter:
username: Username to fetch activity for
Query Parameters:
start_date (str, optional): Start date in YYYY-MM-DD format
end_date (str, optional): End date in YYYY-MM-DD format
"""
current_app.logger.info(f"User activity logs requested for: {username}")
# Get date range from query parameters, default to current day if not provided
start_date = request.args.get('start_date', datetime.now().strftime('%Y-%m-%d'))
end_date = request.args.get('end_date', start_date)
try:
current_app.logger.info(f"Fetching activity logs for user: {username}, from: {start_date}, to: {end_date}")
results = fetch_user_activity(username, start_date, end_date)
activity_logs = format_user_activity(results)
current_app.logger.info(f"Successfully retrieved {len(activity_logs)} activity records for user: {username}")
return jsonify({
"success": True,
"data": {
"username": username,
"start_date": start_date,
"end_date": end_date,
"activities": activity_logs
}
})
except Exception as e:
current_app.logger.error(f"Error retrieving user activity logs: {e}")
return jsonify({"success": False, "message": "Error retrieving activity logs"}), 500