mirror of
https://github.com/frappe/erpnext.git
synced 2026-05-06 15:00:27 +00:00
refactor: Overlapping validation for Shift Request
- commonify time overlap function between request and assignment - add tests for shift request overlap
This commit is contained in:
@@ -32,7 +32,7 @@ class ShiftAssignment(Document):
|
|||||||
overlapping_dates = self.get_overlapping_dates()
|
overlapping_dates = self.get_overlapping_dates()
|
||||||
if len(overlapping_dates):
|
if len(overlapping_dates):
|
||||||
# if dates are overlapping, check if timings are overlapping, else allow
|
# if dates are overlapping, check if timings are overlapping, else allow
|
||||||
overlapping_timings = self.has_overlapping_timings(overlapping_dates[0].shift_type)
|
overlapping_timings = has_overlapping_timings(self.shift_type, overlapping_dates[0].shift_type)
|
||||||
if overlapping_timings:
|
if overlapping_timings:
|
||||||
self.throw_overlap_error(overlapping_dates[0])
|
self.throw_overlap_error(overlapping_dates[0])
|
||||||
|
|
||||||
@@ -43,9 +43,7 @@ class ShiftAssignment(Document):
|
|||||||
shift = frappe.qb.DocType("Shift Assignment")
|
shift = frappe.qb.DocType("Shift Assignment")
|
||||||
query = (
|
query = (
|
||||||
frappe.qb.from_(shift)
|
frappe.qb.from_(shift)
|
||||||
.select(
|
.select(shift.name, shift.shift_type, shift.docstatus, shift.status)
|
||||||
shift.name, shift.shift_type, shift.start_date, shift.end_date, shift.docstatus, shift.status
|
|
||||||
)
|
|
||||||
.where(
|
.where(
|
||||||
(shift.employee == self.employee)
|
(shift.employee == self.employee)
|
||||||
& (shift.docstatus == 1)
|
& (shift.docstatus == 1)
|
||||||
@@ -81,55 +79,46 @@ class ShiftAssignment(Document):
|
|||||||
|
|
||||||
return query.run(as_dict=True)
|
return query.run(as_dict=True)
|
||||||
|
|
||||||
def has_overlapping_timings(self, overlapping_shift):
|
|
||||||
curr_shift = frappe.db.get_value(
|
|
||||||
"Shift Type", self.shift_type, ["start_time", "end_time"], as_dict=True
|
|
||||||
)
|
|
||||||
overlapping_shift = frappe.db.get_value(
|
|
||||||
"Shift Type", overlapping_shift, ["start_time", "end_time"], as_dict=True
|
|
||||||
)
|
|
||||||
|
|
||||||
if (
|
|
||||||
(
|
|
||||||
curr_shift.start_time > overlapping_shift.start_time
|
|
||||||
and curr_shift.start_time < overlapping_shift.end_time
|
|
||||||
)
|
|
||||||
or (
|
|
||||||
curr_shift.end_time > overlapping_shift.start_time
|
|
||||||
and curr_shift.end_time < overlapping_shift.end_time
|
|
||||||
)
|
|
||||||
or (
|
|
||||||
curr_shift.start_time <= overlapping_shift.start_time
|
|
||||||
and curr_shift.end_time >= overlapping_shift.end_time
|
|
||||||
)
|
|
||||||
):
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
def throw_overlap_error(self, shift_details):
|
def throw_overlap_error(self, shift_details):
|
||||||
shift_details = frappe._dict(shift_details)
|
shift_details = frappe._dict(shift_details)
|
||||||
msg = None
|
|
||||||
if shift_details.docstatus == 1 and shift_details.status == "Active":
|
if shift_details.docstatus == 1 and shift_details.status == "Active":
|
||||||
if shift_details.start_date and shift_details.end_date:
|
msg = _(
|
||||||
msg = _("Employee {0} already has an active Shift {1}: {2} from {3} to {4}").format(
|
"Employee {0} already has an active Shift {1}: {2} that overlaps within this period."
|
||||||
frappe.bold(self.employee),
|
).format(
|
||||||
frappe.bold(self.shift_type),
|
frappe.bold(self.employee),
|
||||||
get_link_to_form("Shift Assignment", shift_details.name),
|
frappe.bold(shift_details.shift_type),
|
||||||
getdate(self.start_date).strftime("%d-%m-%Y"),
|
get_link_to_form("Shift Assignment", shift_details.name),
|
||||||
getdate(self.end_date).strftime("%d-%m-%Y"),
|
)
|
||||||
)
|
|
||||||
else:
|
|
||||||
msg = _("Employee {0} already has an active Shift {1}: {2} from {3}").format(
|
|
||||||
frappe.bold(self.employee),
|
|
||||||
frappe.bold(self.shift_type),
|
|
||||||
get_link_to_form("Shift Assignment", shift_details.name),
|
|
||||||
getdate(self.start_date).strftime("%d-%m-%Y"),
|
|
||||||
)
|
|
||||||
|
|
||||||
if msg:
|
|
||||||
frappe.throw(msg, title=_("Overlapping Shifts"), exc=OverlappingShiftError)
|
frappe.throw(msg, title=_("Overlapping Shifts"), exc=OverlappingShiftError)
|
||||||
|
|
||||||
|
|
||||||
|
def has_overlapping_timings(shift_1: str, shift_2: str) -> bool:
|
||||||
|
"""
|
||||||
|
Accepts two shift types and checks whether their timings are overlapping
|
||||||
|
"""
|
||||||
|
curr_shift = frappe.db.get_value("Shift Type", shift_1, ["start_time", "end_time"], as_dict=True)
|
||||||
|
overlapping_shift = frappe.db.get_value(
|
||||||
|
"Shift Type", shift_2, ["start_time", "end_time"], as_dict=True
|
||||||
|
)
|
||||||
|
|
||||||
|
if (
|
||||||
|
(
|
||||||
|
curr_shift.start_time > overlapping_shift.start_time
|
||||||
|
and curr_shift.start_time < overlapping_shift.end_time
|
||||||
|
)
|
||||||
|
or (
|
||||||
|
curr_shift.end_time > overlapping_shift.start_time
|
||||||
|
and curr_shift.end_time < overlapping_shift.end_time
|
||||||
|
)
|
||||||
|
or (
|
||||||
|
curr_shift.start_time <= overlapping_shift.start_time
|
||||||
|
and curr_shift.end_time >= overlapping_shift.end_time
|
||||||
|
)
|
||||||
|
):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
@frappe.whitelist()
|
@frappe.whitelist()
|
||||||
def get_events(start, end, filters=None):
|
def get_events(start, end, filters=None):
|
||||||
events = []
|
events = []
|
||||||
|
|||||||
@@ -103,7 +103,7 @@ class TestShiftAssignment(FrappeTestCase):
|
|||||||
# shift with end date
|
# shift with end date
|
||||||
make_shift_assignment(shift_type.name, employee, date, add_days(date, 30))
|
make_shift_assignment(shift_type.name, employee, date, add_days(date, 30))
|
||||||
|
|
||||||
# shift setup for 13-15
|
# shift setup for 11-15
|
||||||
shift_type = setup_shift_type(shift_type="Shift 2", start_time="11:00:00", end_time="15:00:00")
|
shift_type = setup_shift_type(shift_type="Shift 2", start_time="11:00:00", end_time="15:00:00")
|
||||||
date = getdate()
|
date = getdate()
|
||||||
|
|
||||||
@@ -127,7 +127,7 @@ class TestShiftAssignment(FrappeTestCase):
|
|||||||
date = getdate()
|
date = getdate()
|
||||||
make_shift_assignment(shift_type.name, employee, date)
|
make_shift_assignment(shift_type.name, employee, date)
|
||||||
|
|
||||||
# shift setup for 13-15
|
# shift setup for 11-15
|
||||||
shift_type = setup_shift_type(shift_type="Shift 2", start_time="11:00:00", end_time="15:00:00")
|
shift_type = setup_shift_type(shift_type="Shift 2", start_time="11:00:00", end_time="15:00:00")
|
||||||
date = getdate()
|
date = getdate()
|
||||||
|
|
||||||
|
|||||||
@@ -5,12 +5,14 @@
|
|||||||
import frappe
|
import frappe
|
||||||
from frappe import _
|
from frappe import _
|
||||||
from frappe.model.document import Document
|
from frappe.model.document import Document
|
||||||
from frappe.utils import formatdate, getdate
|
from frappe.query_builder import Criterion
|
||||||
|
from frappe.utils import formatdate, get_link_to_form, getdate
|
||||||
|
|
||||||
|
from erpnext.hr.doctype.shift_assignment.shift_assignment import has_overlapping_timings
|
||||||
from erpnext.hr.utils import share_doc_with_approver, validate_active_employee
|
from erpnext.hr.utils import share_doc_with_approver, validate_active_employee
|
||||||
|
|
||||||
|
|
||||||
class OverlapError(frappe.ValidationError):
|
class OverlappingShiftRequestError(frappe.ValidationError):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
@@ -18,7 +20,7 @@ class ShiftRequest(Document):
|
|||||||
def validate(self):
|
def validate(self):
|
||||||
validate_active_employee(self.employee)
|
validate_active_employee(self.employee)
|
||||||
self.validate_dates()
|
self.validate_dates()
|
||||||
self.validate_shift_request_overlap_dates()
|
self.validate_overlapping_shift_requests()
|
||||||
self.validate_approver()
|
self.validate_approver()
|
||||||
self.validate_default_shift()
|
self.validate_default_shift()
|
||||||
|
|
||||||
@@ -79,37 +81,60 @@ class ShiftRequest(Document):
|
|||||||
if self.from_date and self.to_date and (getdate(self.to_date) < getdate(self.from_date)):
|
if self.from_date and self.to_date and (getdate(self.to_date) < getdate(self.from_date)):
|
||||||
frappe.throw(_("To date cannot be before from date"))
|
frappe.throw(_("To date cannot be before from date"))
|
||||||
|
|
||||||
def validate_shift_request_overlap_dates(self):
|
def validate_overlapping_shift_requests(self):
|
||||||
|
overlapping_dates = self.get_overlapping_dates()
|
||||||
|
if len(overlapping_dates):
|
||||||
|
# if dates are overlapping, check if timings are overlapping, else allow
|
||||||
|
overlapping_timings = has_overlapping_timings(self.shift_type, overlapping_dates[0].shift_type)
|
||||||
|
if overlapping_timings:
|
||||||
|
self.throw_overlap_error(overlapping_dates[0])
|
||||||
|
|
||||||
|
def get_overlapping_dates(self):
|
||||||
if not self.name:
|
if not self.name:
|
||||||
self.name = "New Shift Request"
|
self.name = "New Shift Request"
|
||||||
|
|
||||||
d = frappe.db.sql(
|
shift = frappe.qb.DocType("Shift Request")
|
||||||
"""
|
query = (
|
||||||
select
|
frappe.qb.from_(shift)
|
||||||
name, shift_type, from_date, to_date
|
.select(shift.name, shift.shift_type)
|
||||||
from `tabShift Request`
|
.where((shift.employee == self.employee) & (shift.docstatus < 2) & (shift.name != self.name))
|
||||||
where employee = %(employee)s and docstatus < 2
|
|
||||||
and ((%(from_date)s >= from_date
|
|
||||||
and %(from_date)s <= to_date) or
|
|
||||||
( %(to_date)s >= from_date
|
|
||||||
and %(to_date)s <= to_date ))
|
|
||||||
and name != %(name)s""",
|
|
||||||
{
|
|
||||||
"employee": self.employee,
|
|
||||||
"shift_type": self.shift_type,
|
|
||||||
"from_date": self.from_date,
|
|
||||||
"to_date": self.to_date,
|
|
||||||
"name": self.name,
|
|
||||||
},
|
|
||||||
as_dict=1,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
for date_overlap in d:
|
if self.to_date:
|
||||||
if date_overlap["name"]:
|
query = query.where(
|
||||||
self.throw_overlap_error(date_overlap)
|
Criterion.any(
|
||||||
|
[
|
||||||
|
Criterion.any(
|
||||||
|
[
|
||||||
|
shift.to_date.isnull(),
|
||||||
|
((self.from_date >= shift.from_date) & (self.from_date <= shift.to_date)),
|
||||||
|
]
|
||||||
|
),
|
||||||
|
Criterion.any(
|
||||||
|
[
|
||||||
|
((self.to_date >= shift.from_date) & (self.to_date <= shift.to_date)),
|
||||||
|
shift.from_date.between(self.from_date, self.to_date),
|
||||||
|
]
|
||||||
|
),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
query = query.where(
|
||||||
|
shift.to_date.isnull()
|
||||||
|
| ((self.from_date >= shift.from_date) & (self.from_date <= shift.to_date))
|
||||||
|
)
|
||||||
|
|
||||||
def throw_overlap_error(self, d):
|
return query.run(as_dict=True)
|
||||||
msg = _("Employee {0} has already applied for {1} between {2} and {3} : ").format(
|
|
||||||
self.employee, d["shift_type"], formatdate(d["from_date"]), formatdate(d["to_date"])
|
def throw_overlap_error(self, shift_details):
|
||||||
) + """ <b><a href="/app/Form/Shift Request/{0}">{0}</a></b>""".format(d["name"])
|
shift_details = frappe._dict(shift_details)
|
||||||
frappe.throw(msg, OverlapError)
|
msg = _(
|
||||||
|
"Employee {0} has already applied for Shift {1}: {2} that overlaps within this period"
|
||||||
|
).format(
|
||||||
|
frappe.bold(self.employee),
|
||||||
|
frappe.bold(shift_details.shift_type),
|
||||||
|
get_link_to_form("Shift Request", shift_details.name),
|
||||||
|
)
|
||||||
|
|
||||||
|
frappe.throw(msg, title=_("Overlapping Shift Requests"), exc=OverlappingShiftRequestError)
|
||||||
|
|||||||
@@ -4,23 +4,24 @@
|
|||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
import frappe
|
import frappe
|
||||||
|
from frappe.tests.utils import FrappeTestCase
|
||||||
from frappe.utils import add_days, nowdate
|
from frappe.utils import add_days, nowdate
|
||||||
|
|
||||||
from erpnext.hr.doctype.employee.test_employee import make_employee
|
from erpnext.hr.doctype.employee.test_employee import make_employee
|
||||||
|
from erpnext.hr.doctype.shift_request.shift_request import OverlappingShiftRequestError
|
||||||
|
from erpnext.hr.doctype.shift_type.test_shift_type import setup_shift_type
|
||||||
|
|
||||||
test_dependencies = ["Shift Type"]
|
test_dependencies = ["Shift Type"]
|
||||||
|
|
||||||
|
|
||||||
class TestShiftRequest(unittest.TestCase):
|
class TestShiftRequest(FrappeTestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
for doctype in ["Shift Request", "Shift Assignment"]:
|
for doctype in ["Shift Request", "Shift Assignment", "Shift Type"]:
|
||||||
frappe.db.sql("delete from `tab{doctype}`".format(doctype=doctype))
|
frappe.db.delete(doctype)
|
||||||
|
|
||||||
def tearDown(self):
|
|
||||||
frappe.db.rollback()
|
|
||||||
|
|
||||||
def test_make_shift_request(self):
|
def test_make_shift_request(self):
|
||||||
"Test creation/updation of Shift Assignment from Shift Request."
|
"Test creation/updation of Shift Assignment from Shift Request."
|
||||||
|
setup_shift_type(shift_type="Day Shift")
|
||||||
department = frappe.get_value("Employee", "_T-Employee-00001", "department")
|
department = frappe.get_value("Employee", "_T-Employee-00001", "department")
|
||||||
set_shift_approver(department)
|
set_shift_approver(department)
|
||||||
approver = frappe.db.sql(
|
approver = frappe.db.sql(
|
||||||
@@ -48,6 +49,7 @@ class TestShiftRequest(unittest.TestCase):
|
|||||||
self.assertEqual(shift_assignment_docstatus, 2)
|
self.assertEqual(shift_assignment_docstatus, 2)
|
||||||
|
|
||||||
def test_shift_request_approver_perms(self):
|
def test_shift_request_approver_perms(self):
|
||||||
|
setup_shift_type(shift_type="Day Shift")
|
||||||
employee = frappe.get_doc("Employee", "_T-Employee-00001")
|
employee = frappe.get_doc("Employee", "_T-Employee-00001")
|
||||||
user = "test_approver_perm_emp@example.com"
|
user = "test_approver_perm_emp@example.com"
|
||||||
make_employee(user, "_Test Company")
|
make_employee(user, "_Test Company")
|
||||||
@@ -87,6 +89,145 @@ class TestShiftRequest(unittest.TestCase):
|
|||||||
employee.shift_request_approver = ""
|
employee.shift_request_approver = ""
|
||||||
employee.save()
|
employee.save()
|
||||||
|
|
||||||
|
def test_overlap_for_request_without_to_date(self):
|
||||||
|
# shift should be Ongoing if Only from_date is present
|
||||||
|
user = "test_shift_request@example.com"
|
||||||
|
employee = make_employee(user, company="_Test Company", shift_request_approver=user)
|
||||||
|
setup_shift_type(shift_type="Day Shift")
|
||||||
|
|
||||||
|
shift_request = frappe.get_doc(
|
||||||
|
{
|
||||||
|
"doctype": "Shift Request",
|
||||||
|
"shift_type": "Day Shift",
|
||||||
|
"company": "_Test Company",
|
||||||
|
"employee": employee,
|
||||||
|
"from_date": nowdate(),
|
||||||
|
"approver": user,
|
||||||
|
"status": "Approved",
|
||||||
|
}
|
||||||
|
).submit()
|
||||||
|
|
||||||
|
shift_request = frappe.get_doc(
|
||||||
|
{
|
||||||
|
"doctype": "Shift Request",
|
||||||
|
"shift_type": "Day Shift",
|
||||||
|
"company": "_Test Company",
|
||||||
|
"employee": employee,
|
||||||
|
"from_date": add_days(nowdate(), 2),
|
||||||
|
"approver": user,
|
||||||
|
"status": "Approved",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertRaises(OverlappingShiftRequestError, shift_request.save)
|
||||||
|
|
||||||
|
def test_overlap_for_request_with_from_and_to_dates(self):
|
||||||
|
user = "test_shift_request@example.com"
|
||||||
|
employee = make_employee(user, company="_Test Company", shift_request_approver=user)
|
||||||
|
setup_shift_type(shift_type="Day Shift")
|
||||||
|
|
||||||
|
shift_request = frappe.get_doc(
|
||||||
|
{
|
||||||
|
"doctype": "Shift Request",
|
||||||
|
"shift_type": "Day Shift",
|
||||||
|
"company": "_Test Company",
|
||||||
|
"employee": employee,
|
||||||
|
"from_date": nowdate(),
|
||||||
|
"to_date": add_days(nowdate(), 30),
|
||||||
|
"approver": user,
|
||||||
|
"status": "Approved",
|
||||||
|
}
|
||||||
|
).submit()
|
||||||
|
|
||||||
|
shift_request = frappe.get_doc(
|
||||||
|
{
|
||||||
|
"doctype": "Shift Request",
|
||||||
|
"shift_type": "Day Shift",
|
||||||
|
"company": "_Test Company",
|
||||||
|
"employee": employee,
|
||||||
|
"from_date": add_days(nowdate(), 10),
|
||||||
|
"to_date": add_days(nowdate(), 35),
|
||||||
|
"approver": user,
|
||||||
|
"status": "Approved",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertRaises(OverlappingShiftRequestError, shift_request.save)
|
||||||
|
|
||||||
|
def test_overlapping_for_a_fixed_period_shift_and_ongoing_shift(self):
|
||||||
|
user = "test_shift_request@example.com"
|
||||||
|
employee = make_employee(user, company="_Test Company", shift_request_approver=user)
|
||||||
|
|
||||||
|
# shift setup for 8-12
|
||||||
|
shift_type = setup_shift_type(shift_type="Shift 1", start_time="08:00:00", end_time="12:00:00")
|
||||||
|
date = nowdate()
|
||||||
|
|
||||||
|
# shift with end date
|
||||||
|
frappe.get_doc(
|
||||||
|
{
|
||||||
|
"doctype": "Shift Request",
|
||||||
|
"shift_type": shift_type.name,
|
||||||
|
"company": "_Test Company",
|
||||||
|
"employee": employee,
|
||||||
|
"from_date": date,
|
||||||
|
"to_date": add_days(date, 30),
|
||||||
|
"approver": user,
|
||||||
|
"status": "Approved",
|
||||||
|
}
|
||||||
|
).submit()
|
||||||
|
|
||||||
|
# shift setup for 11-15
|
||||||
|
shift_type = setup_shift_type(shift_type="Shift 2", start_time="11:00:00", end_time="15:00:00")
|
||||||
|
shift2 = frappe.get_doc(
|
||||||
|
{
|
||||||
|
"doctype": "Shift Request",
|
||||||
|
"shift_type": shift_type.name,
|
||||||
|
"company": "_Test Company",
|
||||||
|
"employee": employee,
|
||||||
|
"from_date": date,
|
||||||
|
"approver": user,
|
||||||
|
"status": "Approved",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertRaises(OverlappingShiftRequestError, shift2.insert)
|
||||||
|
|
||||||
|
def test_allow_non_overlapping_shift_requests_for_same_day(self):
|
||||||
|
user = "test_shift_request@example.com"
|
||||||
|
employee = make_employee(user, company="_Test Company", shift_request_approver=user)
|
||||||
|
|
||||||
|
# shift setup for 8-12
|
||||||
|
shift_type = setup_shift_type(shift_type="Shift 1", start_time="08:00:00", end_time="12:00:00")
|
||||||
|
date = nowdate()
|
||||||
|
|
||||||
|
# shift with end date
|
||||||
|
frappe.get_doc(
|
||||||
|
{
|
||||||
|
"doctype": "Shift Request",
|
||||||
|
"shift_type": shift_type.name,
|
||||||
|
"company": "_Test Company",
|
||||||
|
"employee": employee,
|
||||||
|
"from_date": date,
|
||||||
|
"to_date": add_days(date, 30),
|
||||||
|
"approver": user,
|
||||||
|
"status": "Approved",
|
||||||
|
}
|
||||||
|
).submit()
|
||||||
|
|
||||||
|
# shift setup for 13-15
|
||||||
|
shift_type = setup_shift_type(shift_type="Shift 2", start_time="13:00:00", end_time="15:00:00")
|
||||||
|
frappe.get_doc(
|
||||||
|
{
|
||||||
|
"doctype": "Shift Request",
|
||||||
|
"shift_type": shift_type.name,
|
||||||
|
"company": "_Test Company",
|
||||||
|
"employee": employee,
|
||||||
|
"from_date": date,
|
||||||
|
"approver": user,
|
||||||
|
"status": "Approved",
|
||||||
|
}
|
||||||
|
).submit()
|
||||||
|
|
||||||
|
|
||||||
def set_shift_approver(department):
|
def set_shift_approver(department):
|
||||||
department_doc = frappe.get_doc("Department", department)
|
department_doc = frappe.get_doc("Department", department)
|
||||||
|
|||||||
Reference in New Issue
Block a user