feat: add routing/sequencing to work order operations (#46975)

* feat: add routing/sequencing to work order operations

* fix: add validation and remove reorderin for non sequence id operations

* chore: readability

* fix: logical error

* fix: logical error

* chore: added row number in error message
This commit is contained in:
Mihir Kandoi
2025-05-13 17:14:57 +05:30
committed by GitHub
parent 0b1a316ad9
commit f1159b6ea6
4 changed files with 113 additions and 12 deletions

View File

@@ -2851,6 +2851,81 @@ class TestWorkOrder(IntegrationTestCase):
)
frappe.db.set_single_value("Stock Settings", "pick_serial_and_batch_based_on", original_based_on)
def test_operations_time_planning_calculation(self):
from erpnext.manufacturing.doctype.routing.test_routing import create_routing, setup_operations
operations = [
{"operation": "Test Operation A", "workstation": "Test Workstation A", "time_in_mins": 1},
{"operation": "Test Operation B", "workstation": "Test Workstation A", "time_in_mins": 4},
{"operation": "Test Operation C", "workstation": "Test Workstation A", "time_in_mins": 3},
{"operation": "Test Operation D", "workstation": "Test Workstation A", "time_in_mins": 2},
]
setup_operations(operations)
routing_doc = create_routing(routing_name="Testing Route", operations=operations)
bom = make_bom(
item="_Test FG Item", raw_materials=["_Test Item"], with_operations=1, routing=routing_doc.name
)
wo = make_wo_order_test_record(
item="_Test FG Item",
bom_no=bom.name,
qty=5,
source_warehouse="_Test Warehouse 1 - _TC",
skip_transfer=1,
fg_warehouse="_Test Warehouse 2 - _TC",
)
# Initial check
self.assertEqual(wo.operations[0].operation, "Test Operation A")
self.assertEqual(wo.operations[1].operation, "Test Operation B")
self.assertEqual(wo.operations[2].operation, "Test Operation C")
self.assertEqual(wo.operations[3].operation, "Test Operation D")
wo = frappe.copy_doc(wo)
wo.operations[3].sequence_id = 2
wo.submit()
# Test 2 : Sort line items in child table based on sequence ID
self.assertEqual(wo.operations[0].operation, "Test Operation A")
self.assertEqual(wo.operations[1].operation, "Test Operation B")
self.assertEqual(wo.operations[2].operation, "Test Operation D")
self.assertEqual(wo.operations[3].operation, "Test Operation C")
wo = frappe.copy_doc(wo)
wo.operations[3].sequence_id = 1
wo.submit()
self.assertEqual(wo.operations[0].operation, "Test Operation A")
self.assertEqual(wo.operations[1].operation, "Test Operation C")
self.assertEqual(wo.operations[2].operation, "Test Operation B")
self.assertEqual(wo.operations[3].operation, "Test Operation D")
wo = frappe.copy_doc(wo)
wo.operations[0].sequence_id = 3
wo.submit()
self.assertEqual(wo.operations[0].operation, "Test Operation C")
self.assertEqual(wo.operations[1].operation, "Test Operation B")
self.assertEqual(wo.operations[2].operation, "Test Operation D")
self.assertEqual(wo.operations[3].operation, "Test Operation A")
wo = frappe.copy_doc(wo)
wo.operations[1].sequence_id = 0
# Test 3 - Error should be thrown if any one operation does not have sequence id but others do
self.assertRaises(frappe.ValidationError, wo.submit)
workstation = frappe.get_doc("Workstation", "Test Workstation A")
workstation.production_capacity = 4
workstation.save()
wo = frappe.copy_doc(wo)
wo.operations[1].sequence_id = 2
wo.submit()
# Test 4 - If Sequence ID is same then planned start time for both operations should be same
self.assertEqual(wo.operations[1].planned_start_time, wo.operations[2].planned_start_time)
def make_stock_in_entries_and_get_batches(rm_item, source_warehouse, wip_warehouse):
from erpnext.stock.doctype.stock_entry.test_stock_entry import (

View File

@@ -2,7 +2,7 @@
"actions": [],
"allow_import": 1,
"autoname": "naming_series:",
"creation": "2013-01-10 16:34:16",
"creation": "2025-04-09 12:09:40.634472",
"doctype": "DocType",
"document_type": "Setup",
"engine": "InnoDB",

View File

@@ -704,19 +704,30 @@ class WorkOrder(Document):
enable_capacity_planning = not cint(manufacturing_settings_doc.disable_capacity_planning)
plan_days = cint(manufacturing_settings_doc.capacity_planning_for_days) or 30
for index, row in enumerate(self.operations):
if all([op.sequence_id for op in self.operations]):
self.operations = sorted(self.operations, key=lambda op: op.sequence_id)
for idx, op in enumerate(self.operations):
op.idx = idx + 1
elif any([op.sequence_id for op in self.operations]):
frappe.throw(
_(
"Row #{0}: Incorrect Sequence ID. If any single operation has a Sequence ID then all other operations must have one too."
).format(next((op.idx for op in self.operations if not op.sequence_id), None))
)
for idx, row in enumerate(self.operations):
qty = self.qty
while qty > 0:
qty = split_qty_based_on_batch_size(self, row, qty)
if row.job_card_qty > 0:
self.prepare_data_for_job_card(row, index, plan_days, enable_capacity_planning)
self.prepare_data_for_job_card(row, idx, plan_days, enable_capacity_planning)
planned_end_date = self.operations and self.operations[-1].planned_end_time
if planned_end_date:
self.db_set("planned_end_date", planned_end_date)
def prepare_data_for_job_card(self, row, index, plan_days, enable_capacity_planning):
self.set_operation_start_end_time(index, row)
def prepare_data_for_job_card(self, row, idx, plan_days, enable_capacity_planning):
self.set_operation_start_end_time(row, idx)
job_card_doc = create_job_card(
self, row, auto_create=True, enable_capacity_planning=enable_capacity_planning
@@ -741,12 +752,24 @@ class WorkOrder(Document):
row.db_update()
def set_operation_start_end_time(self, idx, row):
def set_operation_start_end_time(self, row, idx):
"""Set start and end time for given operation. If first operation, set start as
`planned_start_date`, else add time diff to end time of earlier operation."""
if idx == 0:
# first operation at planned_start date
row.planned_start_time = self.planned_start_date
elif self.operations[idx - 1].sequence_id:
if self.operations[idx - 1].sequence_id == row.sequence_id:
row.planned_start_time = self.operations[idx - 1].planned_start_time
else:
last_ops_with_same_sequence_ids = sorted(
[op for op in self.operations if op.sequence_id == self.operations[idx - 1].sequence_id],
key=lambda op: get_datetime(op.planned_end_time),
)
row.planned_start_time = (
get_datetime(last_ops_with_same_sequence_ids[-1].planned_end_time)
+ get_mins_between_operations()
)
else:
row.planned_start_time = (
get_datetime(self.operations[idx - 1].planned_end_time) + get_mins_between_operations()

View File

@@ -1,6 +1,6 @@
{
"actions": [],
"creation": "2014-10-16 14:35:41.950175",
"creation": "2025-04-09 12:12:19.824560",
"doctype": "DocType",
"editable_grid": 1,
"engine": "InnoDB",
@@ -110,13 +110,15 @@
"fieldname": "planned_start_time",
"fieldtype": "Datetime",
"label": "Planned Start Time",
"no_copy": 1
"no_copy": 1,
"read_only": 1
},
{
"fieldname": "planned_end_time",
"fieldtype": "Datetime",
"label": "Planned End Time",
"no_copy": 1
"no_copy": 1,
"read_only": 1
},
{
"fieldname": "column_break_10",
@@ -199,7 +201,6 @@
{
"fieldname": "sequence_id",
"fieldtype": "Int",
"hidden": 1,
"label": "Sequence ID",
"print_hide": 1
},
@@ -294,17 +295,19 @@
"read_only": 1
}
],
"grid_page_length": 50,
"index_web_pages_for_search": 1,
"istable": 1,
"links": [],
"modified": "2024-06-03 15:57:17.958543",
"modified": "2025-04-09 16:21:47.110564",
"modified_by": "Administrator",
"module": "Manufacturing",
"name": "Work Order Operation",
"owner": "Administrator",
"permissions": [],
"row_format": "Dynamic",
"sort_field": "creation",
"sort_order": "DESC",
"states": [],
"track_changes": 1
}
}