From 625a9f69f592be8c50c9b1bd1a16e0b7b9157988 Mon Sep 17 00:00:00 2001 From: Rucha Mahabal Date: Sun, 20 Feb 2022 22:10:52 +0530 Subject: [PATCH] refactor: consider timeslots in `get_employee_shift` --- .../shift_assignment/shift_assignment.py | 245 ++++++++++-------- 1 file changed, 133 insertions(+), 112 deletions(-) diff --git a/erpnext/hr/doctype/shift_assignment/shift_assignment.py b/erpnext/hr/doctype/shift_assignment/shift_assignment.py index f51a860c929..86564e012bf 100644 --- a/erpnext/hr/doctype/shift_assignment/shift_assignment.py +++ b/erpnext/hr/doctype/shift_assignment/shift_assignment.py @@ -7,7 +7,7 @@ from datetime import datetime, timedelta import frappe from frappe import _ from frappe.model.document import Document -from frappe.query_builder import Criterion +from frappe.query_builder import Criterion, Column from frappe.utils import cstr, get_link_to_form, getdate, now_datetime, nowdate from erpnext.hr.doctype.employee.employee import get_holiday_list_for_employee @@ -171,102 +171,124 @@ def get_shift_type_timing(shift_types): return shift_timing_map -def get_employee_shift( - employee, for_date=None, consider_default_shift=False, next_shift_direction=None -): +def get_shift_for_time(shifts, for_timestamp): + for entry in shifts: + shift_details = get_shift_details(entry.shift_type, for_date=for_timestamp.date()) + if shift_details.actual_start <= for_timestamp <= shift_details.actual_end: + return shift_details + + +def get_shifts_for_date(employee, for_timestamp): + assignment = frappe.qb.DocType('Shift Assignment') + + return ( + frappe.qb.from_(assignment) + .select(assignment.name, assignment.shift_type) + .where( + (assignment.employee == employee) + & (assignment.docstatus == 1) + & (assignment.status == 'Active') + & (assignment.start_date <= getdate(for_timestamp.date())) + & ( + Criterion.any([ + assignment.end_date.isnull(), + (assignment.end_date.isnotnull() & (getdate(for_timestamp.date()) >= assignment.end_date)) + ]) + ) + ) + ).run(as_dict=True) + + +def get_shift_for_timestamp(employee, for_timestamp): + shifts = get_shifts_for_date(employee, for_timestamp) + if shifts: + return get_shift_for_time(shifts, for_timestamp) + return None + + +def get_employee_shift(employee, for_timestamp=None, consider_default_shift=False, next_shift_direction=None): """Returns a Shift Type for the given employee on the given date. (excluding the holidays) :param employee: Employee for which shift is required. - :param for_date: Date on which shift are required + :param for_timestamp: DateTime on which shift is required :param consider_default_shift: If set to true, default shift is taken when no shift assignment is found. :param next_shift_direction: One of: None, 'forward', 'reverse'. Direction to look for next shift if shift not found on given date. """ - if for_date is None: - for_date = nowdate() - default_shift = frappe.db.get_value("Employee", employee, "default_shift") - shift_type_name = None - shift_assignment_details = frappe.db.get_value( - "Shift Assignment", - {"employee": employee, "start_date": ("<=", for_date), "docstatus": "1", "status": "Active"}, - ["shift_type", "end_date"], - ) + if for_timestamp is None: + for_timestamp = now_datetime() - if shift_assignment_details: - shift_type_name = shift_assignment_details[0] + shift_details = get_shift_for_timestamp(employee, for_timestamp) - # if end_date present means that shift is over after end_date else it is a ongoing shift. - if shift_assignment_details[1] and for_date >= shift_assignment_details[1]: - shift_type_name = None + # if shift assignment is not found, consider default shift + default_shift = frappe.db.get_value('Employee', employee, 'default_shift') + if not shift_details and consider_default_shift: + shift_details = get_shift_details(default_shift, for_timestamp.date()) - if not shift_type_name and consider_default_shift: - shift_type_name = default_shift - if shift_type_name: - holiday_list_name = frappe.db.get_value("Shift Type", shift_type_name, "holiday_list") - if not holiday_list_name: - holiday_list_name = get_holiday_list_for_employee(employee, False) - if holiday_list_name and is_holiday(holiday_list_name, for_date): - shift_type_name = None + # if its a holiday, reset + if shift_details and is_holiday_date(employee, shift_details): + shift_details = None - if not shift_type_name and next_shift_direction: - MAX_DAYS = 366 - if consider_default_shift and default_shift: - direction = -1 if next_shift_direction == "reverse" else +1 - for i in range(MAX_DAYS): - date = for_date + timedelta(days=direction * (i + 1)) - shift_details = get_employee_shift(employee, date, consider_default_shift, None) + # if no shift is found, find next or prev shift based on direction + if not shift_details and next_shift_direction: + shift_details = get_prev_or_next_shift(employee, for_timestamp, consider_default_shift, default_shift, next_shift_direction) + + return shift_details + + +def get_prev_or_next_shift(employee, for_timestamp, consider_default_shift, default_shift, next_shift_direction): + MAX_DAYS = 366 + shift_details = None + + if consider_default_shift and default_shift: + direction = -1 if next_shift_direction == 'reverse' else 1 + for i in range(MAX_DAYS): + date = for_timestamp + timedelta(days=direction*(i+1)) + shift_details = get_employee_shift(employee, date, consider_default_shift, None) + if shift_details: + break + else: + direction = '<' if next_shift_direction == 'reverse' else '>' + sort_order = 'desc' if next_shift_direction == 'reverse' else 'asc' + dates = frappe.db.get_all('Shift Assignment', + ['start_date', 'end_date'], + {'employee':employee, 'start_date': (direction, for_timestamp.date()), 'docstatus': '1', "status": "Active"}, + as_list=True, + limit=MAX_DAYS, order_by='start_date ' + sort_order) + + if dates: + for date in dates: + if date[1] and date[1] < for_timestamp.date(): + continue + shift_details = get_employee_shift(employee, datetime.combine(date, for_timestamp.time()), consider_default_shift, None) if shift_details: - shift_type_name = shift_details.shift_type.name - for_date = date break - else: - direction = "<" if next_shift_direction == "reverse" else ">" - sort_order = "desc" if next_shift_direction == "reverse" else "asc" - dates = frappe.db.get_all( - "Shift Assignment", - ["start_date", "end_date"], - { - "employee": employee, - "start_date": (direction, for_date), - "docstatus": "1", - "status": "Active", - }, - as_list=True, - limit=MAX_DAYS, - order_by="start_date " + sort_order, - ) - if dates: - for date in dates: - if date[1] and date[1] < for_date: - continue - shift_details = get_employee_shift(employee, date[0], consider_default_shift, None) - if shift_details: - shift_type_name = shift_details.shift_type.name - for_date = date[0] - break + return shift_details - return get_shift_details(shift_type_name, for_date) + +def is_holiday_date(employee, shift_details): + holiday_list_name = frappe.db.get_value('Shift Type', shift_details.shift_type.name, 'holiday_list') + + if not holiday_list_name: + holiday_list_name = get_holiday_list_for_employee(employee, False) + + return holiday_list_name and is_holiday(holiday_list_name, shift_details.start_datetime.date()) def get_employee_shift_timings(employee, for_timestamp=None, consider_default_shift=False): """Returns previous shift, current/upcoming shift, next_shift for the given timestamp and employee""" if for_timestamp is None: for_timestamp = now_datetime() + # write and verify a test case for midnight shift. prev_shift = curr_shift = next_shift = None - curr_shift = get_employee_shift(employee, for_timestamp.date(), consider_default_shift, "forward") + curr_shift = get_employee_shift(employee, for_timestamp, consider_default_shift, 'forward') if curr_shift: - next_shift = get_employee_shift( - employee, - curr_shift.start_datetime.date() + timedelta(days=1), - consider_default_shift, - "forward", - ) - prev_shift = get_employee_shift( - employee, for_timestamp.date() + timedelta(days=-1), consider_default_shift, "reverse" - ) + next_shift = get_employee_shift(employee, curr_shift.start_datetime + timedelta(days=1), consider_default_shift, 'forward') + prev_shift = get_employee_shift(employee, for_timestamp + timedelta(days=-1), consider_default_shift, 'reverse') if curr_shift: + # adjust actual start and end times if they are overlapping with grace period (before start and after end) if prev_shift: curr_shift.actual_start = ( prev_shift.end_datetime @@ -292,6 +314,38 @@ def get_employee_shift_timings(employee, for_timestamp=None, consider_default_sh return prev_shift, curr_shift, next_shift +def get_actual_start_end_datetime_of_shift(employee, for_datetime, consider_default_shift=False): + """Takes a datetime and returns the 'actual' start datetime and end datetime of the shift in which the timestamp belongs. + Here 'actual' means - taking in to account the "begin_check_in_before_shift_start_time" and "allow_check_out_after_shift_end_time". + None is returned if the timestamp is outside any actual shift timings. + Shift Details is also returned(current/upcoming i.e. if timestamp not in any actual shift then details of next shift returned) + """ + actual_shift_start = actual_shift_end = shift_details = None + shift_timings_as_per_timestamp = get_employee_shift_timings(employee, for_datetime, consider_default_shift) + timestamp_list = [] + + for shift in shift_timings_as_per_timestamp: + if shift: + timestamp_list.extend([shift.actual_start, shift.actual_end]) + else: + timestamp_list.extend([None, None]) + + timestamp_index = None + for index, timestamp in enumerate(timestamp_list): + if timestamp and for_datetime <= timestamp: + timestamp_index = index + break + + if timestamp_index and timestamp_index%2 == 1: + shift_details = shift_timings_as_per_timestamp[int((timestamp_index-1)/2)] + actual_shift_start = shift_details.actual_start + actual_shift_end = shift_details.actual_end + elif timestamp_index: + shift_details = shift_timings_as_per_timestamp[int(timestamp_index/2)] + + return actual_shift_start, actual_shift_end, shift_details + + def get_shift_details(shift_type_name, for_date=None): """Returns Shift Details which contain some additional information as described below. 'shift_details' contains the following keys: @@ -319,43 +373,10 @@ def get_shift_details(shift_type_name, for_date=None): ) actual_end = end_datetime + timedelta(minutes=shift_type.allow_check_out_after_shift_end_time) - return frappe._dict( - { - "shift_type": shift_type, - "start_datetime": start_datetime, - "end_datetime": end_datetime, - "actual_start": actual_start, - "actual_end": actual_end, - } - ) - - -def get_actual_start_end_datetime_of_shift(employee, for_datetime, consider_default_shift=False): - """Takes a datetime and returns the 'actual' start datetime and end datetime of the shift in which the timestamp belongs. - Here 'actual' means - taking in to account the "begin_check_in_before_shift_start_time" and "allow_check_out_after_shift_end_time". - None is returned if the timestamp is outside any actual shift timings. - Shift Details is also returned(current/upcoming i.e. if timestamp not in any actual shift then details of next shift returned) - """ - actual_shift_start = actual_shift_end = shift_details = None - shift_timings_as_per_timestamp = get_employee_shift_timings( - employee, for_datetime, consider_default_shift - ) - timestamp_list = [] - for shift in shift_timings_as_per_timestamp: - if shift: - timestamp_list.extend([shift.actual_start, shift.actual_end]) - else: - timestamp_list.extend([None, None]) - timestamp_index = None - for index, timestamp in enumerate(timestamp_list): - if timestamp and for_datetime <= timestamp: - timestamp_index = index - break - if timestamp_index and timestamp_index % 2 == 1: - shift_details = shift_timings_as_per_timestamp[int((timestamp_index - 1) / 2)] - actual_shift_start = shift_details.actual_start - actual_shift_end = shift_details.actual_end - elif timestamp_index: - shift_details = shift_timings_as_per_timestamp[int(timestamp_index / 2)] - - return actual_shift_start, actual_shift_end, shift_details + return frappe._dict({ + 'shift_type': shift_type, + 'start_datetime': start_datetime, + 'end_datetime': end_datetime, + 'actual_start': actual_start, + 'actual_end': actual_end + })