diff --git a/erpnext/hooks.py b/erpnext/hooks.py index 1c4bbbc3fc1..7d7f65dfd7a 100644 --- a/erpnext/hooks.py +++ b/erpnext/hooks.py @@ -392,9 +392,12 @@ after_migrate = ["erpnext.setup.install.update_select_perm_after_install"] scheduler_events = { "cron": { + "0/5 * * * *": [ + "erpnext.manufacturing.doctype.bom_update_log.bom_update_log.resume_bom_cost_update_jobs", + ], "0/30 * * * *": [ "erpnext.utilities.doctype.video.video.update_youtube_data", - ] + ], }, "all": [ "erpnext.projects.doctype.project.project.project_status_update_reminder", diff --git a/erpnext/manufacturing/doctype/bom/bom.py b/erpnext/manufacturing/doctype/bom/bom.py index 6376359a705..631548b3099 100644 --- a/erpnext/manufacturing/doctype/bom/bom.py +++ b/erpnext/manufacturing/doctype/bom/bom.py @@ -1,11 +1,11 @@ -# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors +# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and Contributors # License: GNU General Public License v3. See license.txt import functools import re from collections import deque from operator import itemgetter -from typing import List +from typing import Dict, List import frappe from frappe import _ @@ -189,6 +189,7 @@ class BOM(WebsiteGenerator): self.validate_transfer_against() self.set_routing_operations() self.validate_operations() + self.update_exploded_items(save=False) self.calculate_cost() self.update_stock_qty() self.update_cost(update_parent=False, from_child_bom=True, update_hour_rate=False, save=False) @@ -386,40 +387,14 @@ class BOM(WebsiteGenerator): existing_bom_cost = self.total_cost - for d in self.get("items"): - if not d.item_code: - continue - - rate = self.get_rm_rate( - { - "company": self.company, - "item_code": d.item_code, - "bom_no": d.bom_no, - "qty": d.qty, - "uom": d.uom, - "stock_uom": d.stock_uom, - "conversion_factor": d.conversion_factor, - "sourced_by_supplier": d.sourced_by_supplier, - } - ) - - if rate: - d.rate = rate - d.amount = flt(d.rate) * flt(d.qty) - d.base_rate = flt(d.rate) * flt(self.conversion_rate) - d.base_amount = flt(d.amount) * flt(self.conversion_rate) - - if save: - d.db_update() - if self.docstatus == 1: self.flags.ignore_validate_update_after_submit = True - self.calculate_cost(update_hour_rate) + + self.calculate_cost(save_updates=save, update_hour_rate=update_hour_rate) + if save: self.db_update() - self.update_exploded_items(save=save) - # update parent BOMs if self.total_cost != existing_bom_cost and update_parent: parent_boms = frappe.db.sql_list( @@ -608,11 +583,15 @@ class BOM(WebsiteGenerator): bom_list.reverse() return bom_list - def calculate_cost(self, update_hour_rate=False): + def calculate_cost(self, save_updates=False, update_hour_rate=False): """Calculate bom totals""" self.calculate_op_cost(update_hour_rate) - self.calculate_rm_cost() - self.calculate_sm_cost() + self.calculate_rm_cost(save=save_updates) + self.calculate_sm_cost(save=save_updates) + if save_updates: + # not via doc event, table is not regenerated and needs updation + self.calculate_exploded_cost() + self.total_cost = self.operating_cost + self.raw_material_cost - self.scrap_material_cost self.base_total_cost = ( self.base_operating_cost + self.base_raw_material_cost - self.base_scrap_material_cost @@ -654,12 +633,26 @@ class BOM(WebsiteGenerator): if update_hour_rate: row.db_update() - def calculate_rm_cost(self): + def calculate_rm_cost(self, save=False): """Fetch RM rate as per today's valuation rate and calculate totals""" total_rm_cost = 0 base_total_rm_cost = 0 for d in self.get("items"): + old_rate = d.rate + d.rate = self.get_rm_rate( + { + "company": self.company, + "item_code": d.item_code, + "bom_no": d.bom_no, + "qty": d.qty, + "uom": d.uom, + "stock_uom": d.stock_uom, + "conversion_factor": d.conversion_factor, + "sourced_by_supplier": d.sourced_by_supplier, + } + ) + d.base_rate = flt(d.rate) * flt(self.conversion_rate) d.amount = flt(d.rate, d.precision("rate")) * flt(d.qty, d.precision("qty")) d.base_amount = d.amount * flt(self.conversion_rate) @@ -669,11 +662,13 @@ class BOM(WebsiteGenerator): total_rm_cost += d.amount base_total_rm_cost += d.base_amount + if save and (old_rate != d.rate): + d.db_update() self.raw_material_cost = total_rm_cost self.base_raw_material_cost = base_total_rm_cost - def calculate_sm_cost(self): + def calculate_sm_cost(self, save=False): """Fetch RM rate as per today's valuation rate and calculate totals""" total_sm_cost = 0 base_total_sm_cost = 0 @@ -688,10 +683,45 @@ class BOM(WebsiteGenerator): ) total_sm_cost += d.amount base_total_sm_cost += d.base_amount + if save: + d.db_update() self.scrap_material_cost = total_sm_cost self.base_scrap_material_cost = base_total_sm_cost + def calculate_exploded_cost(self): + "Set exploded row cost from it's parent BOM." + rm_rate_map = self.get_rm_rate_map() + + for row in self.get("exploded_items"): + old_rate = flt(row.rate) + row.rate = rm_rate_map.get(row.item_code) + row.amount = flt(row.stock_qty) * flt(row.rate) + + if old_rate != row.rate: + # Only db_update if changed + row.db_update() + + def get_rm_rate_map(self) -> Dict[str, float]: + "Create Raw Material-Rate map for Exploded Items. Fetch rate from Items table or Subassembly BOM." + rm_rate_map = {} + + for item in self.get("items"): + if item.bom_no: + # Get Item-Rate from Subassembly BOM + explosion_items = frappe.get_all( + "BOM Explosion Item", + filters={"parent": item.bom_no}, + fields=["item_code", "rate"], + order_by=None, # to avoid sort index creation at db level (granular change) + ) + explosion_item_rate = {item.item_code: flt(item.rate) for item in explosion_items} + rm_rate_map.update(explosion_item_rate) + else: + rm_rate_map[item.item_code] = flt(item.base_rate) / flt(item.conversion_factor or 1.0) + + return rm_rate_map + def update_exploded_items(self, save=True): """Update Flat BOM, following will be correct data""" self.get_exploded_items() @@ -902,44 +932,46 @@ def get_bom_item_rate(args, bom_doc): return flt(rate) -def get_valuation_rate(args): - """Get weighted average of valuation rate from all warehouses""" +def get_valuation_rate(data): + """ + 1) Get average valuation rate from all warehouses + 2) If no value, get last valuation rate from SLE + 3) If no value, get valuation rate from Item + """ + from frappe.query_builder.functions import Sum - total_qty, total_value, valuation_rate = 0.0, 0.0, 0.0 - item_bins = frappe.db.sql( - """ - select - bin.actual_qty, bin.stock_value - from - `tabBin` bin, `tabWarehouse` warehouse - where - bin.item_code=%(item)s - and bin.warehouse = warehouse.name - and warehouse.company=%(company)s""", - {"item": args["item_code"], "company": args["company"]}, - as_dict=1, - ) + item_code, company = data.get("item_code"), data.get("company") + valuation_rate = 0.0 - for d in item_bins: - total_qty += flt(d.actual_qty) - total_value += flt(d.stock_value) + bin_table = frappe.qb.DocType("Bin") + wh_table = frappe.qb.DocType("Warehouse") + item_valuation = ( + frappe.qb.from_(bin_table) + .join(wh_table) + .on(bin_table.warehouse == wh_table.name) + .select((Sum(bin_table.stock_value) / Sum(bin_table.actual_qty)).as_("valuation_rate")) + .where((bin_table.item_code == item_code) & (wh_table.company == company)) + ).run(as_dict=True)[0] - if total_qty: - valuation_rate = total_value / total_qty + valuation_rate = item_valuation.get("valuation_rate") - if valuation_rate <= 0: - last_valuation_rate = frappe.db.sql( - """select valuation_rate - from `tabStock Ledger Entry` - where item_code = %s and valuation_rate > 0 and is_cancelled = 0 - order by posting_date desc, posting_time desc, creation desc limit 1""", - args["item_code"], - ) + if (valuation_rate is not None) and valuation_rate <= 0: + # Explicit null value check. If None, Bins don't exist, neither does SLE + sle = frappe.qb.DocType("Stock Ledger Entry") + last_val_rate = ( + frappe.qb.from_(sle) + .select(sle.valuation_rate) + .where((sle.item_code == item_code) & (sle.valuation_rate > 0) & (sle.is_cancelled == 0)) + .orderby(sle.posting_date, order=frappe.qb.desc) + .orderby(sle.posting_time, order=frappe.qb.desc) + .orderby(sle.creation, order=frappe.qb.desc) + .limit(1) + ).run(as_dict=True) - valuation_rate = flt(last_valuation_rate[0][0]) if last_valuation_rate else 0 + valuation_rate = flt(last_val_rate[0].get("valuation_rate")) if last_val_rate else 0 if not valuation_rate: - valuation_rate = frappe.db.get_value("Item", args["item_code"], "valuation_rate") + valuation_rate = frappe.db.get_value("Item", item_code, "valuation_rate") return flt(valuation_rate) @@ -1125,39 +1157,6 @@ def get_children(parent=None, is_root=False, **filters): return bom_items -def get_boms_in_bottom_up_order(bom_no=None): - def _get_parent(bom_no): - return frappe.db.sql_list( - """ - select distinct bom_item.parent from `tabBOM Item` bom_item - where bom_item.bom_no = %s and bom_item.docstatus=1 and bom_item.parenttype='BOM' - and exists(select bom.name from `tabBOM` bom where bom.name=bom_item.parent and bom.is_active=1) - """, - bom_no, - ) - - count = 0 - bom_list = [] - if bom_no: - bom_list.append(bom_no) - else: - # get all leaf BOMs - bom_list = frappe.db.sql_list( - """select name from `tabBOM` bom - where docstatus=1 and is_active=1 - and not exists(select bom_no from `tabBOM Item` - where parent=bom.name and ifnull(bom_no, '')!='')""" - ) - - while count < len(bom_list): - for child_bom in _get_parent(bom_list[count]): - if child_bom not in bom_list: - bom_list.append(child_bom) - count += 1 - - return bom_list - - def add_additional_cost(stock_entry, work_order): # Add non stock items cost in the additional cost stock_entry.additional_costs = [] diff --git a/erpnext/manufacturing/doctype/bom/test_bom.py b/erpnext/manufacturing/doctype/bom/test_bom.py index f235e449a3c..182a20c6bb7 100644 --- a/erpnext/manufacturing/doctype/bom/test_bom.py +++ b/erpnext/manufacturing/doctype/bom/test_bom.py @@ -11,7 +11,9 @@ from frappe.utils import cstr, flt from erpnext.buying.doctype.purchase_order.test_purchase_order import create_purchase_order from erpnext.manufacturing.doctype.bom.bom import BOMRecursionError, item_query, make_variant_bom -from erpnext.manufacturing.doctype.bom_update_tool.bom_update_tool import update_cost +from erpnext.manufacturing.doctype.bom_update_log.test_bom_update_log import ( + update_cost_in_all_boms_in_test, +) from erpnext.stock.doctype.item.test_item import make_item from erpnext.stock.doctype.stock_reconciliation.test_stock_reconciliation import ( create_stock_reconciliation, @@ -69,26 +71,31 @@ class TestBOM(FrappeTestCase): def test_update_bom_cost_in_all_boms(self): # get current rate for '_Test Item 2' - rm_rate = frappe.db.sql( - """select rate from `tabBOM Item` - where parent='BOM-_Test Item Home Desktop Manufactured-001' - and item_code='_Test Item 2' and docstatus=1 and parenttype='BOM'""" + bom_rates = frappe.db.get_values( + "BOM Item", + { + "parent": "BOM-_Test Item Home Desktop Manufactured-001", + "item_code": "_Test Item 2", + "docstatus": 1, + }, + fieldname=["rate", "base_rate"], + as_dict=True, ) - rm_rate = rm_rate[0][0] if rm_rate else 0 + rm_base_rate = bom_rates[0].get("base_rate") if bom_rates else 0 # Reset item valuation rate - reset_item_valuation_rate(item_code="_Test Item 2", qty=200, rate=rm_rate + 10) + reset_item_valuation_rate(item_code="_Test Item 2", qty=200, rate=rm_base_rate + 10) # update cost of all BOMs based on latest valuation rate - update_cost() + update_cost_in_all_boms_in_test() # check if new valuation rate updated in all BOMs for d in frappe.db.sql( - """select rate from `tabBOM Item` + """select base_rate from `tabBOM Item` where item_code='_Test Item 2' and docstatus=1 and parenttype='BOM'""", as_dict=1, ): - self.assertEqual(d.rate, rm_rate + 10) + self.assertEqual(d.base_rate, rm_base_rate + 10) def test_bom_cost(self): bom = frappe.copy_doc(test_records[2]) diff --git a/erpnext/manufacturing/doctype/bom/test_records.json b/erpnext/manufacturing/doctype/bom/test_records.json index 25730f9b9f4..507d319b515 100644 --- a/erpnext/manufacturing/doctype/bom/test_records.json +++ b/erpnext/manufacturing/doctype/bom/test_records.json @@ -32,6 +32,7 @@ "is_active": 1, "is_default": 1, "item": "_Test Item Home Desktop Manufactured", + "company": "_Test Company", "quantity": 1.0 }, { diff --git a/erpnext/manufacturing/doctype/bom_explosion_item/bom_explosion_item.json b/erpnext/manufacturing/doctype/bom_explosion_item/bom_explosion_item.json index f01d856e72a..9b1db63494b 100644 --- a/erpnext/manufacturing/doctype/bom_explosion_item/bom_explosion_item.json +++ b/erpnext/manufacturing/doctype/bom_explosion_item/bom_explosion_item.json @@ -169,13 +169,15 @@ "index_web_pages_for_search": 1, "istable": 1, "links": [], - "modified": "2020-10-08 16:21:29.386212", + "modified": "2022-05-27 13:42:23.305455", "modified_by": "Administrator", "module": "Manufacturing", "name": "BOM Explosion Item", + "naming_rule": "Random", "owner": "Administrator", "permissions": [], "sort_field": "modified", "sort_order": "DESC", + "states": [], "track_changes": 1 } \ No newline at end of file diff --git a/erpnext/manufacturing/doctype/bom_update_batch/__init__.py b/erpnext/manufacturing/doctype/bom_update_batch/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/erpnext/manufacturing/doctype/bom_update_batch/bom_update_batch.json b/erpnext/manufacturing/doctype/bom_update_batch/bom_update_batch.json new file mode 100644 index 00000000000..83b54d326cb --- /dev/null +++ b/erpnext/manufacturing/doctype/bom_update_batch/bom_update_batch.json @@ -0,0 +1,55 @@ +{ + "actions": [], + "autoname": "autoincrement", + "creation": "2022-05-31 17:34:39.825537", + "doctype": "DocType", + "engine": "InnoDB", + "field_order": [ + "level", + "batch_no", + "boms_updated", + "status" + ], + "fields": [ + { + "fieldname": "level", + "fieldtype": "Int", + "in_list_view": 1, + "label": "Level" + }, + { + "fieldname": "batch_no", + "fieldtype": "Int", + "in_list_view": 1, + "label": "Batch No." + }, + { + "fieldname": "boms_updated", + "fieldtype": "Long Text", + "hidden": 1, + "in_list_view": 1, + "label": "BOMs Updated" + }, + { + "fieldname": "status", + "fieldtype": "Select", + "in_list_view": 1, + "label": "Status", + "options": "Pending\nCompleted", + "read_only": 1 + } + ], + "index_web_pages_for_search": 1, + "istable": 1, + "links": [], + "modified": "2022-06-06 14:50:35.161062", + "modified_by": "Administrator", + "module": "Manufacturing", + "name": "BOM Update Batch", + "naming_rule": "Autoincrement", + "owner": "Administrator", + "permissions": [], + "sort_field": "modified", + "sort_order": "DESC", + "states": [] +} \ No newline at end of file diff --git a/erpnext/manufacturing/doctype/bom_update_batch/bom_update_batch.py b/erpnext/manufacturing/doctype/bom_update_batch/bom_update_batch.py new file mode 100644 index 00000000000..f952e435e67 --- /dev/null +++ b/erpnext/manufacturing/doctype/bom_update_batch/bom_update_batch.py @@ -0,0 +1,9 @@ +# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and contributors +# For license information, please see license.txt + +# import frappe +from frappe.model.document import Document + + +class BOMUpdateBatch(Document): + pass diff --git a/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.json b/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.json index 98c1acb71ce..c32e383b08a 100644 --- a/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.json +++ b/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.json @@ -13,6 +13,10 @@ "update_type", "status", "error_log", + "progress_section", + "current_level", + "processed_boms", + "bom_batches", "amended_from" ], "fields": [ @@ -63,13 +67,36 @@ "fieldtype": "Link", "label": "Error Log", "options": "Error Log" + }, + { + "collapsible": 1, + "depends_on": "eval: doc.update_type == \"Update Cost\"", + "fieldname": "progress_section", + "fieldtype": "Section Break", + "label": "Progress" + }, + { + "fieldname": "processed_boms", + "fieldtype": "Long Text", + "hidden": 1, + "label": "Processed BOMs" + }, + { + "fieldname": "bom_batches", + "fieldtype": "Table", + "options": "BOM Update Batch" + }, + { + "fieldname": "current_level", + "fieldtype": "Int", + "label": "Current Level" } ], "in_create": 1, "index_web_pages_for_search": 1, "is_submittable": 1, "links": [], - "modified": "2022-03-31 12:51:44.885102", + "modified": "2022-06-06 15:15:23.883251", "modified_by": "Administrator", "module": "Manufacturing", "name": "BOM Update Log", diff --git a/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.py b/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.py index c0770fac90a..9c9c24044aa 100644 --- a/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.py +++ b/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.py @@ -1,13 +1,20 @@ # Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and contributors # For license information, please see license.txt -from typing import Dict, List, Literal, Optional +import json +from typing import Any, Dict, List, Optional, Tuple, Union import frappe from frappe import _ from frappe.model.document import Document -from frappe.utils import cstr, flt +from frappe.utils import cint, cstr -from erpnext.manufacturing.doctype.bom_update_tool.bom_update_tool import update_cost +from erpnext.manufacturing.doctype.bom_update_log.bom_updation_utils import ( + get_leaf_boms, + get_next_higher_level_boms, + handle_exception, + replace_bom, + set_values_in_log, +) class BOMMissingError(frappe.ValidationError): @@ -20,6 +27,8 @@ class BOMUpdateLog(Document): self.validate_boms_are_specified() self.validate_same_bom() self.validate_bom_items() + else: + self.validate_bom_cost_update_in_progress() self.status = "Queued" @@ -42,123 +51,184 @@ class BOMUpdateLog(Document): if current_bom_item != new_bom_item: frappe.throw(_("The selected BOMs are not for the same item")) - def on_submit(self): - if frappe.flags.in_test: - return + def validate_bom_cost_update_in_progress(self): + "If another Cost Updation Log is still in progress, dont make new ones." + wip_log = frappe.get_all( + "BOM Update Log", + {"update_type": "Update Cost", "status": ["in", ["Queued", "In Progress"]]}, + limit_page_length=1, + ) + if wip_log: + log_link = frappe.utils.get_link_to_form("BOM Update Log", wip_log[0].name) + frappe.throw( + _("BOM Updation already in progress. Please wait until {0} is complete.").format(log_link), + title=_("Note"), + ) + + def on_submit(self): if self.update_type == "Replace BOM": boms = {"current_bom": self.current_bom, "new_bom": self.new_bom} frappe.enqueue( - method="erpnext.manufacturing.doctype.bom_update_log.bom_update_log.run_bom_job", + method="erpnext.manufacturing.doctype.bom_update_log.bom_update_log.run_replace_bom_job", doc=self, boms=boms, timeout=40000, + now=frappe.flags.in_test, ) else: - frappe.enqueue( - method="erpnext.manufacturing.doctype.bom_update_log.bom_update_log.run_bom_job", - doc=self, - update_type="Update Cost", - timeout=40000, - ) + process_boms_cost_level_wise(self) -def replace_bom(boms: Dict) -> None: - """Replace current BOM with new BOM in parent BOMs.""" - current_bom = boms.get("current_bom") - new_bom = boms.get("new_bom") - - unit_cost = get_new_bom_unit_cost(new_bom) - update_new_bom_in_bom_items(unit_cost, current_bom, new_bom) - - frappe.cache().delete_key("bom_children") - parent_boms = get_parent_boms(new_bom) - - for bom in parent_boms: - bom_obj = frappe.get_doc("BOM", bom) - # this is only used for versioning and we do not want - # to make separate db calls by using load_doc_before_save - # which proves to be expensive while doing bulk replace - bom_obj._doc_before_save = bom_obj - bom_obj.update_exploded_items() - bom_obj.calculate_cost() - bom_obj.update_parent_cost() - bom_obj.db_update() - if bom_obj.meta.get("track_changes") and not bom_obj.flags.ignore_version: - bom_obj.save_version() - - -def update_new_bom_in_bom_items(unit_cost: float, current_bom: str, new_bom: str) -> None: - bom_item = frappe.qb.DocType("BOM Item") - ( - frappe.qb.update(bom_item) - .set(bom_item.bom_no, new_bom) - .set(bom_item.rate, unit_cost) - .set(bom_item.amount, (bom_item.stock_qty * unit_cost)) - .where( - (bom_item.bom_no == current_bom) & (bom_item.docstatus < 2) & (bom_item.parenttype == "BOM") - ) - ).run() - - -def get_parent_boms(new_bom: str, bom_list: Optional[List] = None) -> List: - bom_list = bom_list or [] - bom_item = frappe.qb.DocType("BOM Item") - - parents = ( - frappe.qb.from_(bom_item) - .select(bom_item.parent) - .where((bom_item.bom_no == new_bom) & (bom_item.docstatus < 2) & (bom_item.parenttype == "BOM")) - .run(as_dict=True) - ) - - for d in parents: - if new_bom == d.parent: - frappe.throw(_("BOM recursion: {0} cannot be child of {1}").format(new_bom, d.parent)) - - bom_list.append(d.parent) - get_parent_boms(d.parent, bom_list) - - return list(set(bom_list)) - - -def get_new_bom_unit_cost(new_bom: str) -> float: - bom = frappe.qb.DocType("BOM") - new_bom_unitcost = ( - frappe.qb.from_(bom).select(bom.total_cost / bom.quantity).where(bom.name == new_bom).run() - ) - - return flt(new_bom_unitcost[0][0]) - - -def run_bom_job( +def run_replace_bom_job( doc: "BOMUpdateLog", boms: Optional[Dict[str, str]] = None, - update_type: Literal["Replace BOM", "Update Cost"] = "Replace BOM", ) -> None: try: doc.db_set("status", "In Progress") + if not frappe.flags.in_test: frappe.db.commit() frappe.db.auto_commit_on_many_writes = 1 - boms = frappe._dict(boms or {}) - - if update_type == "Replace BOM": - replace_bom(boms) - else: - update_cost() + replace_bom(boms, doc.name) doc.db_set("status", "Completed") - except Exception: - frappe.db.rollback() - error_log = doc.log_error("BOM Update Tool Error") - - doc.db_set("status", "Failed") - doc.db_set("error_log", error_log.name) - + handle_exception(doc) finally: frappe.db.auto_commit_on_many_writes = 0 - frappe.db.commit() # nosemgrep + + if not frappe.flags.in_test: + frappe.db.commit() # nosemgrep + + +def process_boms_cost_level_wise( + update_doc: "BOMUpdateLog", parent_boms: List[str] = None +) -> Union[None, Tuple]: + "Queue jobs at the start of new BOM Level in 'Update Cost' Jobs." + + current_boms = {} + values = {} + + if update_doc.status == "Queued": + # First level yet to process. On Submit. + current_level = 0 + current_boms = get_leaf_boms() + values = { + "processed_boms": json.dumps({}), + "status": "In Progress", + "current_level": current_level, + } + else: + # Resume next level. via Cron Job. + if not parent_boms: + return + + current_level = cint(update_doc.current_level) + 1 + + # Process the next level BOMs. Stage parents as current BOMs. + current_boms = parent_boms.copy() + values = {"current_level": current_level} + + set_values_in_log(update_doc.name, values, commit=True) + queue_bom_cost_jobs(current_boms, update_doc, current_level) + + +def queue_bom_cost_jobs( + current_boms_list: List[str], update_doc: "BOMUpdateLog", current_level: int +) -> None: + "Queue batches of 20k BOMs of the same level to process parallelly" + batch_no = 0 + + while current_boms_list: + batch_no += 1 + batch_size = 20_000 + boms_to_process = current_boms_list[:batch_size] # slice out batch of 20k BOMs + + # update list to exclude 20K (queued) BOMs + current_boms_list = current_boms_list[batch_size:] if len(current_boms_list) > batch_size else [] + + batch_row = update_doc.append( + "bom_batches", {"level": current_level, "batch_no": batch_no, "status": "Pending"} + ) + batch_row.db_insert() + + frappe.enqueue( + method="erpnext.manufacturing.doctype.bom_update_log.bom_updation_utils.update_cost_in_level", + doc=update_doc, + bom_list=boms_to_process, + batch_name=batch_row.name, + queue="long", + now=frappe.flags.in_test, + ) + + +def resume_bom_cost_update_jobs(): + """ + 1. Checks for In Progress BOM Update Log. + 2. Checks if this job has completed the _current level_. + 3. If current level is complete, get parent BOMs and start next level. + 4. If no parents, mark as Complete. + 5. If current level is WIP, skip the Log. + + Called every 5 minutes via Cron job. + """ + + in_progress_logs = frappe.db.get_all( + "BOM Update Log", + {"update_type": "Update Cost", "status": "In Progress"}, + ["name", "processed_boms", "current_level"], + ) + if not in_progress_logs: + return + + for log in in_progress_logs: + # check if all log batches of current level are processed + bom_batches = frappe.db.get_all( + "BOM Update Batch", + {"parent": log.name, "level": log.current_level}, + ["name", "boms_updated", "status"], + ) + incomplete_level = any(row.get("status") == "Pending" for row in bom_batches) + if not bom_batches or incomplete_level: + continue + + # Prep parent BOMs & updated processed BOMs for next level + current_boms, processed_boms = get_processed_current_boms(log, bom_batches) + parent_boms = get_next_higher_level_boms(child_boms=current_boms, processed_boms=processed_boms) + + # Unset processed BOMs if log is complete, it is used for next level BOMs + set_values_in_log( + log.name, + values={ + "processed_boms": json.dumps([] if not parent_boms else processed_boms), + "status": "Completed" if not parent_boms else "In Progress", + }, + commit=True, + ) + + if parent_boms: # there is a next level to process + process_boms_cost_level_wise( + update_doc=frappe.get_doc("BOM Update Log", log.name), parent_boms=parent_boms + ) + + +def get_processed_current_boms( + log: Dict[str, Any], bom_batches: Dict[str, Any] +) -> Tuple[List[str], Dict[str, Any]]: + """ + Aggregate all BOMs from BOM Update Batch rows into 'processed_boms' field + and into current boms list. + """ + processed_boms = json.loads(log.processed_boms) if log.processed_boms else {} + current_boms = [] + + for row in bom_batches: + boms_updated = json.loads(row.boms_updated) + current_boms.extend(boms_updated) + boms_updated_dict = {bom: True for bom in boms_updated} + processed_boms.update(boms_updated_dict) + + return current_boms, processed_boms diff --git a/erpnext/manufacturing/doctype/bom_update_log/bom_updation_utils.py b/erpnext/manufacturing/doctype/bom_update_log/bom_updation_utils.py new file mode 100644 index 00000000000..af115e3e421 --- /dev/null +++ b/erpnext/manufacturing/doctype/bom_update_log/bom_updation_utils.py @@ -0,0 +1,225 @@ +# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and contributors +# For license information, please see license.txt + +import copy +import json +from collections import defaultdict +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union + +if TYPE_CHECKING: + from erpnext.manufacturing.doctype.bom_update_log.bom_update_log import BOMUpdateLog + +import frappe +from frappe import _ + + +def replace_bom(boms: Dict, log_name: str) -> None: + "Replace current BOM with new BOM in parent BOMs." + + current_bom = boms.get("current_bom") + new_bom = boms.get("new_bom") + + unit_cost = get_bom_unit_cost(new_bom) + update_new_bom_in_bom_items(unit_cost, current_bom, new_bom) + + frappe.cache().delete_key("bom_children") + parent_boms = get_ancestor_boms(new_bom) + + for bom in parent_boms: + bom_obj = frappe.get_doc("BOM", bom) + # this is only used for versioning and we do not want + # to make separate db calls by using load_doc_before_save + # which proves to be expensive while doing bulk replace + bom_obj._doc_before_save = copy.deepcopy(bom_obj) + bom_obj.update_exploded_items() + bom_obj.calculate_cost() + bom_obj.update_parent_cost() + bom_obj.db_update() + bom_obj.flags.updater_reference = { + "doctype": "BOM Update Log", + "docname": log_name, + "label": _("via BOM Update Tool"), + } + bom_obj.save_version() + + +def update_cost_in_level( + doc: "BOMUpdateLog", bom_list: List[str], batch_name: Union[int, str] +) -> None: + "Updates Cost for BOMs within a given level. Runs via background jobs." + + try: + status = frappe.db.get_value("BOM Update Log", doc.name, "status") + if status == "Failed": + return + + update_cost_in_boms(bom_list=bom_list) # main updation logic + + bom_batch = frappe.qb.DocType("BOM Update Batch") + ( + frappe.qb.update(bom_batch) + .set(bom_batch.boms_updated, json.dumps(bom_list)) + .set(bom_batch.status, "Completed") + .where(bom_batch.name == batch_name) + ).run() + except Exception: + handle_exception(doc) + finally: + if not frappe.flags.in_test: + frappe.db.commit() # nosemgrep + + +def get_ancestor_boms(new_bom: str, bom_list: Optional[List] = None) -> List: + "Recursively get all ancestors of BOM." + + bom_list = bom_list or [] + bom_item = frappe.qb.DocType("BOM Item") + + parents = ( + frappe.qb.from_(bom_item) + .select(bom_item.parent) + .where((bom_item.bom_no == new_bom) & (bom_item.docstatus < 2) & (bom_item.parenttype == "BOM")) + .run(as_dict=True) + ) + + for d in parents: + if new_bom == d.parent: + frappe.throw(_("BOM recursion: {0} cannot be child of {1}").format(new_bom, d.parent)) + + bom_list.append(d.parent) + get_ancestor_boms(d.parent, bom_list) + + return list(set(bom_list)) + + +def update_new_bom_in_bom_items(unit_cost: float, current_bom: str, new_bom: str) -> None: + bom_item = frappe.qb.DocType("BOM Item") + ( + frappe.qb.update(bom_item) + .set(bom_item.bom_no, new_bom) + .set(bom_item.rate, unit_cost) + .set(bom_item.amount, (bom_item.stock_qty * unit_cost)) + .where( + (bom_item.bom_no == current_bom) & (bom_item.docstatus < 2) & (bom_item.parenttype == "BOM") + ) + ).run() + + +def get_bom_unit_cost(bom_name: str) -> float: + bom = frappe.qb.DocType("BOM") + new_bom_unitcost = ( + frappe.qb.from_(bom).select(bom.total_cost / bom.quantity).where(bom.name == bom_name).run() + ) + + return frappe.utils.flt(new_bom_unitcost[0][0]) + + +def update_cost_in_boms(bom_list: List[str]) -> None: + "Updates cost in given BOMs. Returns current and total updated BOMs." + + for index, bom in enumerate(bom_list): + bom_doc = frappe.get_doc("BOM", bom, for_update=True) + bom_doc.calculate_cost(save_updates=True, update_hour_rate=True) + bom_doc.db_update() + + if (index % 50 == 0) and not frappe.flags.in_test: + frappe.db.commit() # nosemgrep + + +def get_next_higher_level_boms( + child_boms: List[str], processed_boms: Dict[str, bool] +) -> List[str]: + "Generate immediate higher level dependants with no unresolved dependencies (children)." + + def _all_children_are_processed(parent_bom): + child_boms = dependency_map.get(parent_bom) + return all(processed_boms.get(bom) for bom in child_boms) + + dependants_map, dependency_map = _generate_dependence_map() + + dependants = [] + for bom in child_boms: + # generate list of immediate dependants + parents = dependants_map.get(bom) or [] + dependants.extend(parents) + + dependants = set(dependants) # remove duplicates + resolved_dependants = set() + + # consider only if children are all resolved + for parent_bom in dependants: + if _all_children_are_processed(parent_bom): + resolved_dependants.add(parent_bom) + + return list(resolved_dependants) + + +def get_leaf_boms() -> List[str]: + "Get BOMs that have no dependencies." + + return frappe.db.sql_list( + """select name from `tabBOM` bom + where docstatus=1 and is_active=1 + and not exists(select bom_no from `tabBOM Item` + where parent=bom.name and ifnull(bom_no, '')!='')""" + ) + + +def _generate_dependence_map() -> defaultdict: + """ + Generate maps such as: { BOM-1: [Dependant-BOM-1, Dependant-BOM-2, ..] }. + Here BOM-1 is the leaf/lower level node/dependency. + The list contains one level higher nodes/dependants that depend on BOM-1. + + Generate and return the reverse as well. + """ + + bom = frappe.qb.DocType("BOM") + bom_item = frappe.qb.DocType("BOM Item") + + bom_items = ( + frappe.qb.from_(bom_item) + .join(bom) + .on(bom_item.parent == bom.name) + .select(bom_item.bom_no, bom_item.parent) + .where( + (bom_item.bom_no.isnotnull()) + & (bom_item.bom_no != "") + & (bom.docstatus == 1) + & (bom.is_active == 1) + & (bom_item.parenttype == "BOM") + ) + ).run(as_dict=True) + + child_parent_map = defaultdict(list) + parent_child_map = defaultdict(list) + for row in bom_items: + child_parent_map[row.bom_no].append(row.parent) + parent_child_map[row.parent].append(row.bom_no) + + return child_parent_map, parent_child_map + + +def set_values_in_log(log_name: str, values: Dict[str, Any], commit: bool = False) -> None: + "Update BOM Update Log record." + + if not values: + return + + bom_update_log = frappe.qb.DocType("BOM Update Log") + query = frappe.qb.update(bom_update_log).where(bom_update_log.name == log_name) + + for key, value in values.items(): + query = query.set(key, value) + query.run() + + if commit and not frappe.flags.in_test: + frappe.db.commit() # nosemgrep + + +def handle_exception(doc: "BOMUpdateLog") -> None: + "Rolls back and fails BOM Update Log." + + frappe.db.rollback() + error_log = doc.log_error("BOM Update Tool Error") + set_values_in_log(doc.name, {"status": "Failed", "error_log": error_log.name}) diff --git a/erpnext/manufacturing/doctype/bom_update_log/test_bom_update_log.py b/erpnext/manufacturing/doctype/bom_update_log/test_bom_update_log.py index 47efea961b4..b38fc8976b2 100644 --- a/erpnext/manufacturing/doctype/bom_update_log/test_bom_update_log.py +++ b/erpnext/manufacturing/doctype/bom_update_log/test_bom_update_log.py @@ -6,9 +6,12 @@ from frappe.tests.utils import FrappeTestCase from erpnext.manufacturing.doctype.bom_update_log.bom_update_log import ( BOMMissingError, - run_bom_job, + resume_bom_cost_update_jobs, +) +from erpnext.manufacturing.doctype.bom_update_tool.bom_update_tool import ( + enqueue_replace_bom, + enqueue_update_cost, ) -from erpnext.manufacturing.doctype.bom_update_tool.bom_update_tool import enqueue_replace_bom test_records = frappe.get_test_records("BOM") @@ -31,17 +34,12 @@ class TestBOMUpdateLog(FrappeTestCase): def tearDown(self): frappe.db.rollback() - if self._testMethodName == "test_bom_update_log_completion": - # clear logs and delete BOM created via setUp - frappe.db.delete("BOM Update Log") - self.new_bom_doc.cancel() - self.new_bom_doc.delete() - - # explicitly commit and restore to original state - frappe.db.commit() # nosemgrep - def test_bom_update_log_validate(self): - "Test if BOM presence is validated." + """ + 1) Test if BOM presence is validated. + 2) Test if same BOMs are validated. + 3) Test of non-existent BOM is validated. + """ with self.assertRaises(BOMMissingError): enqueue_replace_bom(boms={}) @@ -52,45 +50,22 @@ class TestBOMUpdateLog(FrappeTestCase): with self.assertRaises(frappe.ValidationError): enqueue_replace_bom(boms=frappe._dict(current_bom=self.boms.new_bom, new_bom="Dummy BOM")) - def test_bom_update_log_queueing(self): - "Test if BOM Update Log is created and queued." - - log = enqueue_replace_bom( - boms=self.boms, - ) - - self.assertEqual(log.docstatus, 1) - self.assertEqual(log.status, "Queued") - def test_bom_update_log_completion(self): "Test if BOM Update Log handles job completion correctly." - log = enqueue_replace_bom( - boms=self.boms, - ) - - # Explicitly commits log, new bom (setUp) and replacement impact. - # Is run via background jobs IRL - run_bom_job( - doc=log, - boms=self.boms, - update_type="Replace BOM", - ) + log = enqueue_replace_bom(boms=self.boms) log.reload() - self.assertEqual(log.status, "Completed") - # teardown (undo replace impact) due to commit - boms = frappe._dict( - current_bom=self.boms.new_bom, - new_bom=self.boms.current_bom, - ) - log2 = enqueue_replace_bom( - boms=self.boms, - ) - run_bom_job( # Explicitly commits - doc=log2, - boms=boms, - update_type="Replace BOM", - ) - self.assertEqual(log2.status, "Completed") + +def update_cost_in_all_boms_in_test(): + """ + Utility to run 'Update Cost' job in tests without Cron job until fully complete. + """ + log = enqueue_update_cost() # create BOM Update Log + + while log.status != "Completed": + resume_bom_cost_update_jobs() # run cron job until complete + log.reload() + + return log diff --git a/erpnext/manufacturing/doctype/bom_update_tool/bom_update_tool.py b/erpnext/manufacturing/doctype/bom_update_tool/bom_update_tool.py index b0e7da12017..d16fcd08326 100644 --- a/erpnext/manufacturing/doctype/bom_update_tool/bom_update_tool.py +++ b/erpnext/manufacturing/doctype/bom_update_tool/bom_update_tool.py @@ -10,8 +10,6 @@ if TYPE_CHECKING: import frappe from frappe.model.document import Document -from erpnext.manufacturing.doctype.bom.bom import get_boms_in_bottom_up_order - class BOMUpdateTool(Document): pass @@ -40,14 +38,13 @@ def enqueue_update_cost() -> "BOMUpdateLog": def auto_update_latest_price_in_all_boms() -> None: """Called via hooks.py.""" if frappe.db.get_single_value("Manufacturing Settings", "update_bom_costs_automatically"): - update_cost() - - -def update_cost() -> None: - """Updates Cost for all BOMs from bottom to top.""" - bom_list = get_boms_in_bottom_up_order() - for bom in bom_list: - frappe.get_doc("BOM", bom).update_cost(update_parent=False, from_child_bom=True) + wip_log = frappe.get_all( + "BOM Update Log", + {"update_type": "Update Cost", "status": ["in", ["Queued", "In Progress"]]}, + limit_page_length=1, + ) + if not wip_log: + create_bom_update_log(update_type="Update Cost") def create_bom_update_log( diff --git a/erpnext/manufacturing/doctype/bom_update_tool/test_bom_update_tool.py b/erpnext/manufacturing/doctype/bom_update_tool/test_bom_update_tool.py index fae72a0f6f7..5dd557f8ab1 100644 --- a/erpnext/manufacturing/doctype/bom_update_tool/test_bom_update_tool.py +++ b/erpnext/manufacturing/doctype/bom_update_tool/test_bom_update_tool.py @@ -1,11 +1,13 @@ -# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors +# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and Contributors # License: GNU General Public License v3. See license.txt import frappe from frappe.tests.utils import FrappeTestCase -from erpnext.manufacturing.doctype.bom_update_log.bom_update_log import replace_bom -from erpnext.manufacturing.doctype.bom_update_tool.bom_update_tool import update_cost +from erpnext.manufacturing.doctype.bom_update_log.test_bom_update_log import ( + update_cost_in_all_boms_in_test, +) +from erpnext.manufacturing.doctype.bom_update_tool.bom_update_tool import enqueue_replace_bom from erpnext.manufacturing.doctype.production_plan.test_production_plan import make_bom from erpnext.stock.doctype.item.test_item import create_item @@ -15,6 +17,9 @@ test_records = frappe.get_test_records("BOM") class TestBOMUpdateTool(FrappeTestCase): "Test major functions run via BOM Update Tool." + def tearDown(self): + frappe.db.rollback() + def test_replace_bom(self): current_bom = "BOM-_Test Item Home Desktop Manufactured-001" @@ -23,15 +28,10 @@ class TestBOMUpdateTool(FrappeTestCase): bom_doc.insert() boms = frappe._dict(current_bom=current_bom, new_bom=bom_doc.name) - replace_bom(boms) + enqueue_replace_bom(boms=boms) - self.assertFalse(frappe.db.sql("select name from `tabBOM Item` where bom_no=%s", current_bom)) - self.assertTrue(frappe.db.sql("select name from `tabBOM Item` where bom_no=%s", bom_doc.name)) - - # reverse, as it affects other testcases - boms.current_bom = bom_doc.name - boms.new_bom = current_bom - replace_bom(boms) + self.assertFalse(frappe.db.exists("BOM Item", {"bom_no": current_bom, "docstatus": 1})) + self.assertTrue(frappe.db.exists("BOM Item", {"bom_no": bom_doc.name, "docstatus": 1})) def test_bom_cost(self): for item in ["BOM Cost Test Item 1", "BOM Cost Test Item 2", "BOM Cost Test Item 3"]: @@ -52,13 +52,13 @@ class TestBOMUpdateTool(FrappeTestCase): self.assertEqual(doc.total_cost, 200) frappe.db.set_value("Item", "BOM Cost Test Item 2", "valuation_rate", 200) - update_cost() + update_cost_in_all_boms_in_test() doc.load_from_db() self.assertEqual(doc.total_cost, 300) frappe.db.set_value("Item", "BOM Cost Test Item 2", "valuation_rate", 100) - update_cost() + update_cost_in_all_boms_in_test() doc.load_from_db() self.assertEqual(doc.total_cost, 200) diff --git a/erpnext/manufacturing/doctype/production_plan/test_production_plan.py b/erpnext/manufacturing/doctype/production_plan/test_production_plan.py index 891a4978789..e88049d810d 100644 --- a/erpnext/manufacturing/doctype/production_plan/test_production_plan.py +++ b/erpnext/manufacturing/doctype/production_plan/test_production_plan.py @@ -798,7 +798,6 @@ def make_bom(**args): for item in args.raw_materials: item_doc = frappe.get_doc("Item", item) - bom.append( "items", { diff --git a/erpnext/manufacturing/doctype/work_order/test_work_order.py b/erpnext/manufacturing/doctype/work_order/test_work_order.py index 2aba48231be..27e7e24a823 100644 --- a/erpnext/manufacturing/doctype/work_order/test_work_order.py +++ b/erpnext/manufacturing/doctype/work_order/test_work_order.py @@ -417,7 +417,7 @@ class TestWorkOrder(FrappeTestCase): "doctype": "Item Price", "item_code": "_Test FG Non Stock Item", "price_list_rate": 1000, - "price_list": "Standard Buying", + "price_list": "_Test Price List India", } ).insert(ignore_permissions=True) @@ -426,8 +426,17 @@ class TestWorkOrder(FrappeTestCase): item_code="_Test FG Item", target="_Test Warehouse - _TC", qty=1, basic_rate=100 ) - if not frappe.db.get_value("BOM", {"item": fg_item}): - make_bom(item=fg_item, rate=1000, raw_materials=["_Test FG Item", "_Test FG Non Stock Item"]) + if not frappe.db.get_value("BOM", {"item": fg_item, "docstatus": 1}): + bom = make_bom( + item=fg_item, + rate=1000, + raw_materials=["_Test FG Item", "_Test FG Non Stock Item"], + do_not_save=True, + ) + bom.rm_cost_as_per = "Price List" # non stock item won't have valuation rate + bom.buying_price_list = "_Test Price List India" + bom.currency = "INR" + bom.save() wo = make_wo_order_test_record(production_item=fg_item)