work-tracing/app/services/work_hours_service.py
2025-05-16 17:55:30 +04:00

320 lines
23 KiB
Python

"""
Service layer for calculating and storing real work hours.
"""
from app import db
from app.models import WorkEvent, UserRealWorkSummary
from sqlalchemy import func, and_, extract
from sqlalchemy.dialects.postgresql import insert as pg_insert # For PostgreSQL UPSERT
from sqlalchemy.dialects.sqlite import insert as sqlite_insert # For SQLite UPSERT
import datetime
from flask import current_app
def calculate_and_store_real_work_hours(app, force_recalculate=False):
"""
Calculates real work hours (blocks of 40 consecutive working minutes)
from WorkEvent data and stores/updates them in UserRealWorkSummary.
This function is intended to be called periodically by a scheduler
and can also be triggered manually via a CLI command.
Args:
app: The Flask application instance
force_recalculate: If True, ignores last_processed_event_id and recalculates from scratch
"""
# Get the actual app instance. This is important for operations outside of a request context.
# current_app is a proxy; _get_current_object() gives the real app if an app context is active
# or if Flask-APScheduler has pushed one. To be absolutely sure, we grab the app
# and then push our own context for the duration of this job.
with app.app_context():
current_app.logger.info(f"Starting real work hours calculation... Force recalculate: {force_recalculate}")
processed_users_count = 0
total_hours_logged_session = 0
try:
# Get all distinct users who have work events
users_with_events = db.session.query(WorkEvent.user).distinct().all()
user_list = [u[0] for u in users_with_events]
for username in user_list:
current_app.logger.debug(f"Processing user: {username}")
# For each user, determine the starting point for fetching events.
# We process day by day to correctly attribute hours and manage last_processed_event_id.
# Get the minimum date for events for this user to establish a processing range.
min_event_date_result = db.session.query(func.min(func.date(WorkEvent.ts)))\
.filter(WorkEvent.user == username)\
.first()
if not min_event_date_result or not min_event_date_result[0]:
current_app.logger.debug(f"No events found for user: {username}")
continue
# Get the last processed event ID - set to 0 if force_recalculate is True
last_processed_id = 0
prior_event_completed_block = False
last_processed_event_object = None
if not force_recalculate:
# Only check for last processed ID if not forcing a recalculation
last_summary_for_user = UserRealWorkSummary.query\
.filter(UserRealWorkSummary.username == username)\
.order_by(UserRealWorkSummary.work_date.desc(), UserRealWorkSummary.last_processed_event_id.desc())\
.first()
last_processed_id = last_summary_for_user.last_processed_event_id if last_summary_for_user else 0
# Read the flag indicating if the very last processed event completed a block
prior_event_completed_block = last_summary_for_user.last_event_completed_block if last_summary_for_user else False
# Fetch the actual last processed event object to check its state
if last_processed_id > 0:
last_processed_event_object = WorkEvent.query.get(last_processed_id)
current_app.logger.debug(f"User {username}: Overall Last processed event ID: {last_processed_id}, Prior event completed block: {prior_event_completed_block}, Force recalculate: {force_recalculate}")
# LOOKBACK MECHANISM:
# If last_processed_id > 0 and not force_recalculate, look back up to 40 events to catch potential streaks
lookback_event_id = 0
if last_processed_id > 0 and not force_recalculate:
# Find the event ID to look back to (at least 40 events back if possible)
lookback_query = WorkEvent.query\
.filter(WorkEvent.user == username)\
.filter(WorkEvent.id <= last_processed_id)\
.order_by(WorkEvent.id.desc())\
.limit(40)
lookback_events = lookback_query.all()
if lookback_events and len(lookback_events) > 0:
# Get the ID of the oldest event in our lookback window
lookback_event_id = lookback_events[-1].id
current_app.logger.debug(f"Looking back from event ID {last_processed_id} to {lookback_event_id} for potential streaks")
# If we found a valid lookback ID, use it; otherwise, use last_processed_id
events_query = WorkEvent.query\
.filter(WorkEvent.user == username)
if force_recalculate:
# Start from the beginning if force_recalculate is True
current_app.logger.debug(f"Force recalculate set to True, processing all events for user {username}")
elif lookback_event_id > 0:
# Use the lookback ID to catch potential streaks
events_query = events_query.filter(WorkEvent.id >= lookback_event_id)
current_app.logger.debug(f"Using lookback ID {lookback_event_id} to catch potential streaks")
else:
# Original logic: only get events after the last processed ID
events_query = events_query.filter(WorkEvent.id > last_processed_id)
events_query = events_query.order_by(WorkEvent.ts.asc())
user_events = events_query.all()
if not user_events:
current_app.logger.debug(f"No new events to process for user: {username} since event ID {last_processed_id}")
continue
else:
current_app.logger.info(f"Fetched {len(user_events)} new events for user: {username} since event ID {lookback_event_id if lookback_event_id > 0 else last_processed_id}")
processed_users_count += 1
consecutive_working_minutes = 0
# last_event_ts = None # We need to maintain this relative to the streak
current_streak_start_ts = None
max_event_id_in_batch = last_processed_id
# Group events by date to process streaks within each day accurately
events_by_date = {}
for event in user_events:
# Skip events we've already processed (due to lookback), but only if not force_recalculate
if lookback_event_id > 0 and event.id <= last_processed_id and not force_recalculate:
# Skip updates but still count these events for streak calculation
current_app.logger.debug(f"Already processed event ID {event.id}, using for streak calculation only")
# Still track this event for streak calculations
event_date = event.ts.date()
if event_date not in events_by_date:
events_by_date[event_date] = []
events_by_date[event_date].append((event, True)) # True indicates already processed
else:
# New event we need to process fully
event_date = event.ts.date()
if event_date not in events_by_date:
events_by_date[event_date] = []
events_by_date[event_date].append((event, False if force_recalculate else (event.id <= last_processed_id))) # Mark as needs processing for force_recalculate
# If force_recalculate, clear all summary records for this user first
if force_recalculate:
delete_count = UserRealWorkSummary.query.filter_by(username=username).delete()
current_app.logger.info(f"Force recalculate: Deleted {delete_count} existing summary records for user {username}")
for work_date_obj, daily_events_with_flag in sorted(events_by_date.items()):
current_app.logger.debug(f"Processing date {work_date_obj} for user {username}")
consecutive_working_minutes = 0
current_streak_start_ts = None # Keep for potential future duration logic
# Extract just the events from the (event, flag) tuples
daily_events = [item[0] for item in daily_events_with_flag]
already_processed_flags = [item[1] for item in daily_events_with_flag]
# Initialize consecutive_working_minutes based on the last processed event
if not force_recalculate and last_processed_event_object and last_processed_event_object.state == 'working':
# Check if the last processed event is relevant to the current work_date_obj
# and is close in time to the first event in daily_events
# Only continue a streak if the last processed event DID NOT complete a block
if not prior_event_completed_block: # MODIFIED: Check the flag
is_same_day_continuation = (
last_processed_event_object.ts.date() == work_date_obj and
daily_events and
(daily_events[0].ts - last_processed_event_object.ts).total_seconds() < 120 # e.g., within 2 minutes
)
is_cross_midnight_continuation = ( # Renamed for clarity
last_processed_event_object.ts.date() == work_date_obj - datetime.timedelta(days=1) and
daily_events and
(daily_events[0].ts - last_processed_event_object.ts).total_seconds() < 120 and # Within 2 minutes
last_processed_event_object.ts.time() >= datetime.time(23, 58) # Ended late previous day
)
if is_same_day_continuation or is_cross_midnight_continuation:
current_app.logger.debug(f"User {username} on {work_date_obj}: Continuing streak from event ID {last_processed_event_object.id} (which did not complete a block).")
consecutive_working_minutes = 1 # Start with 1, the current event will make it 2 if 'working'
current_streak_start_ts = last_processed_event_object.ts
else:
current_app.logger.debug(f"User {username} on {work_date_obj}: Last processed event ID {last_processed_event_object.id} completed a block. Not continuing streak from it.")
# consecutive_working_minutes remains 0, current_streak_start_ts remains None
last_event_in_day_id = None
current_event_completed_block_for_day = False # Flag for the current day's summary
for i, event in enumerate(daily_events): # daily_events are already sorted by ts
max_event_id_in_batch = max(max_event_id_in_batch, event.id)
last_event_in_day_id = event.id
already_processed = already_processed_flags[i]
if event.state == 'working':
if current_streak_start_ts is None: # Start of a new potential streak
current_streak_start_ts = event.ts
consecutive_working_minutes = 1
current_app.logger.debug(f"User {username} on {work_date_obj}: New 'working' streak started at {event.ts}, consecutive_minutes: {consecutive_working_minutes}")
else:
# Simplification: if the previous event in the list was 'working' and part of the streak, increment.
consecutive_working_minutes += 1
current_app.logger.debug(f"User {username} on {work_date_obj}: Continued 'working' streak, event at {event.ts}, consecutive_minutes: {consecutive_working_minutes}")
else: # state is 'stopped' or other non-working
if consecutive_working_minutes > 0:
current_app.logger.debug(f"User {username} on {work_date_obj}: Streak of {consecutive_working_minutes} mins ended by non-working state ('{event.state}') at {event.ts}.")
consecutive_working_minutes = 0
current_streak_start_ts = None
# Reset flag for current event, will be set true if it completes a block
event_completes_this_block = False
if consecutive_working_minutes == 40: # 40 consecutive minutes
logged_hours = 1 # count as 1 real hour
current_app.logger.info(f"User {username} completed a {consecutive_working_minutes}-minute work block on {work_date_obj} at {event.ts}. Logging {logged_hours} real hours.")
# Only increment the total and perform database updates if this is a NEW event or force_recalculate
if not already_processed or force_recalculate:
total_hours_logged_session += logged_hours
event_completes_this_block = True # This event completed a block
# UPSERT logic for UserRealWorkSummary
summary_data = {
'username': username,
'work_date': work_date_obj,
'real_hours_counted': logged_hours,
'last_processed_event_id': event.id,
'last_event_completed_block': True # MODIFIED: Set flag to True
}
current_app.logger.debug(f"User {username} on {work_date_obj}: Preparing UPSERT data: {summary_data}")
# Determine dialect for UPSERT
dialect = db.engine.dialect.name
if dialect == 'postgresql':
stmt = pg_insert(UserRealWorkSummary).values(**summary_data)
stmt = stmt.on_conflict_do_update(
index_elements=['username', 'work_date'],
set_=dict(real_hours_counted=UserRealWorkSummary.real_hours_counted + logged_hours,
last_processed_event_id=event.id,
last_event_completed_block=True # MODIFIED: Set flag to True
)
)
current_app.logger.debug(f"User {username} on {work_date_obj}: PostgreSQL UPSERT statement created.")
elif dialect == 'sqlite':
stmt = sqlite_insert(UserRealWorkSummary).values(**summary_data)
stmt = stmt.on_conflict_do_update(
index_elements=['username', 'work_date'],
set_=dict(real_hours_counted=UserRealWorkSummary.real_hours_counted + logged_hours,
last_processed_event_id=event.id,
last_event_completed_block=True # MODIFIED: Set flag to True
)
)
current_app.logger.debug(f"User {username} on {work_date_obj}: SQLite UPSERT statement created.")
else: # Generic fallback, might not be truly atomic or performant
current_app.logger.warning(f"User {username} on {work_date_obj}: Dialect {dialect} not explicitly handled for UPSERT, attempting merge.")
# This part is tricky without specific dialect handling for atomic increment.
# For a generic approach, one might query then insert/update, which is not atomic.
# Given the common DBs (SQLite/Postgres), the above should cover typical cases.
# Let's ensure we have a record first, then update.
summary_rec = UserRealWorkSummary.query.filter_by(username=username, work_date=work_date_obj).first()
if summary_rec:
current_app.logger.debug(f"User {username} on {work_date_obj}: Found existing summary record, ID: {summary_rec.id}, hours: {summary_rec.real_hours_counted}. Will add {logged_hours} hours.")
summary_rec.real_hours_counted += logged_hours
summary_rec.last_processed_event_id = event.id
summary_rec.last_event_completed_block = True # MODIFIED: Set flag to True
else:
summary_rec = UserRealWorkSummary(**summary_data) # summary_data already has the flag
db.session.add(summary_rec)
current_app.logger.debug(f"User {username} on {work_date_obj}: No existing summary record found. Creating new one with {logged_hours} hours.")
db.session.commit() # Commit here for fallback
current_app.logger.info(f"User {username} on {work_date_obj}: Fallback UPSERT committed. Hours: {summary_rec.real_hours_counted}, Last Event ID: {summary_rec.last_processed_event_id}")
if dialect in ['postgresql', 'sqlite']:
db.session.execute(stmt)
current_app.logger.info(f"User {username} on {work_date_obj}: Dialect-specific UPSERT executed. Last Event ID: {event.id}. Hours increment: {logged_hours}.")
else:
current_app.logger.debug(f"User {username} on {work_date_obj}: Skipping database update for already processed event ID {event.id} but counting streak.")
consecutive_working_minutes = 0 # Reset streak
current_streak_start_ts = None
current_app.logger.debug(f"User {username} on {work_date_obj}: Streak reset after logging hours.")
# After processing all events for a specific date for a user, update the last_processed_event_id for that day
# AND the last_event_completed_block status based on the *last event of that day*.
# This needs to be done carefully if the last event of the day was part of a block completion.
# The current logic updates last_event_completed_block during the UPSERT when a block IS completed.
# If the day ends, and the last event didn't complete a block, we need to ensure the summary for the day reflects that.
current_event_completed_block_for_day = event_completes_this_block
# Skip this update if we didn't process any new events (all were from lookback and not force_recalculate)
if last_event_in_day_id is not None and (any(not flag for flag in already_processed_flags) or force_recalculate):
summary_rec_for_day = UserRealWorkSummary.query.filter_by(username=username, work_date=work_date_obj).first()
if summary_rec_for_day:
summary_rec_for_day.last_processed_event_id = last_event_in_day_id
# Update the flag based on whether the *last event processed for this day* completed a block
summary_rec_for_day.last_event_completed_block = current_event_completed_block_for_day
current_app.logger.debug(f"User {username} on {work_date_obj}: Updating existing summary. Last Event ID: {last_event_in_day_id}, Completed Block: {current_event_completed_block_for_day}")
else:
# If no hours were logged but events were processed for this day.
db.session.merge(UserRealWorkSummary(
username=username,
work_date=work_date_obj,
real_hours_counted=0,
last_processed_event_id=last_event_in_day_id,
last_event_completed_block=current_event_completed_block_for_day # MODIFIED
))
current_app.logger.debug(f"User {username} on {work_date_obj}: Merged new summary record for day with 0 hours. Last Event ID: {last_event_in_day_id}, Completed Block: {current_event_completed_block_for_day}")
# current_app.logger.debug(f"Updated last_processed_event_id for {username} on {work_date_obj} to {last_event_in_day_id}") # Covered by above log
db.session.commit() # Commit all changes for the current user
current_app.logger.info(f"Committed all pending DB changes for user: {username}. Overall max event ID processed in this batch for user: {max_event_id_in_batch}")
current_app.logger.info(f"Real work hours calculation completed. Processed {processed_users_count} users. Logged {total_hours_logged_session} new real hour blocks this session.")
except Exception as e:
db.session.rollback()
current_app.logger.error(f"Error during real work hours calculation: {e}", exc_info=True)
finally:
db.session.remove() # Ensure session is cleaned up