From 3b2a8bf837b62b68a2422ddd01335e2f7045c3d1 Mon Sep 17 00:00:00 2001 From: marination Date: Thu, 2 Jun 2022 13:35:30 +0530 Subject: [PATCH] feat: Track progress in Log Batch/Job wise - This was done due to stale reads while the background jobs tried updating status of the log - Added a table where all bom jobs within log will be tracked with what level they are processing - Cron job will check if table jobs are all processed every 5 mins - If yes, it will prepare parents and call `process_boms_cost_level_wise` to start next level - If pending jobs, do nothing - Current BOM Level is being tracked that helps adding rows to the table - Individual bom cost jobs (that are queued) will process and update boms > will update BOM Update Batch table row with list of updated BOMs --- .../doctype/bom_update_batch/__init__.py | 0 .../bom_update_batch/bom_update_batch.json | 45 ++++++++ .../bom_update_batch/bom_update_batch.py | 9 ++ .../bom_update_log/bom_update_log.json | 21 ++-- .../doctype/bom_update_log/bom_update_log.py | 106 +++++++++++++----- .../bom_update_log/bom_updation_utils.py | 55 +-------- 6 files changed, 154 insertions(+), 82 deletions(-) create mode 100644 erpnext/manufacturing/doctype/bom_update_batch/__init__.py create mode 100644 erpnext/manufacturing/doctype/bom_update_batch/bom_update_batch.json create mode 100644 erpnext/manufacturing/doctype/bom_update_batch/bom_update_batch.py diff --git a/erpnext/manufacturing/doctype/bom_update_batch/__init__.py b/erpnext/manufacturing/doctype/bom_update_batch/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/erpnext/manufacturing/doctype/bom_update_batch/bom_update_batch.json b/erpnext/manufacturing/doctype/bom_update_batch/bom_update_batch.json new file mode 100644 index 00000000000..9938454ce4e --- /dev/null +++ b/erpnext/manufacturing/doctype/bom_update_batch/bom_update_batch.json @@ -0,0 +1,45 @@ +{ + "actions": [], + "autoname": "autoincrement", + "creation": "2022-05-31 17:34:39.825537", + "doctype": "DocType", + "engine": "InnoDB", + "field_order": [ + "level", + "batch_no", + "boms_updated" + ], + "fields": [ + { + "fieldname": "level", + "fieldtype": "Int", + "in_list_view": 1, + "label": "Level" + }, + { + "fieldname": "batch_no", + "fieldtype": "Int", + "in_list_view": 1, + "label": "Batch No." + }, + { + "fieldname": "boms_updated", + "fieldtype": "Long Text", + "in_list_view": 1, + "label": "BOMs Updated" + } + ], + "index_web_pages_for_search": 1, + "istable": 1, + "links": [], + "modified": "2022-05-31 23:36:13.628391", + "modified_by": "Administrator", + "module": "Manufacturing", + "name": "BOM Update Batch", + "naming_rule": "Autoincrement", + "owner": "Administrator", + "permissions": [], + "sort_field": "modified", + "sort_order": "DESC", + "states": [] +} \ No newline at end of file diff --git a/erpnext/manufacturing/doctype/bom_update_batch/bom_update_batch.py b/erpnext/manufacturing/doctype/bom_update_batch/bom_update_batch.py new file mode 100644 index 00000000000..f952e435e67 --- /dev/null +++ b/erpnext/manufacturing/doctype/bom_update_batch/bom_update_batch.py @@ -0,0 +1,9 @@ +# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and contributors +# For license information, please see license.txt + +# import frappe +from frappe.model.document import Document + + +class BOMUpdateBatch(Document): + pass diff --git a/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.json b/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.json index bea3cf03733..b1c24ab9954 100644 --- a/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.json +++ b/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.json @@ -14,9 +14,10 @@ "status", "error_log", "progress_section", - "current_boms", + "current_level", "parent_boms", "processed_boms", + "bom_batches", "amended_from" ], "fields": [ @@ -70,15 +71,11 @@ }, { "collapsible": 1, + "depends_on": "eval: doc.update_type == \"Update Cost\"", "fieldname": "progress_section", "fieldtype": "Section Break", "label": "Progress" }, - { - "fieldname": "current_boms", - "fieldtype": "Long Text", - "label": "Current BOMs" - }, { "description": "Immediate parent BOMs", "fieldname": "parent_boms", @@ -89,13 +86,23 @@ "fieldname": "processed_boms", "fieldtype": "Long Text", "label": "Processed BOMs" + }, + { + "fieldname": "bom_batches", + "fieldtype": "Table", + "options": "BOM Update Batch" + }, + { + "fieldname": "current_level", + "fieldtype": "Int", + "label": "Current Level" } ], "in_create": 1, "index_web_pages_for_search": 1, "is_submittable": 1, "links": [], - "modified": "2022-05-27 17:03:34.712010", + "modified": "2022-05-31 20:20:06.370786", "modified_by": "Administrator", "module": "Manufacturing", "name": "BOM Update Log", diff --git a/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.py b/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.py index f61f863c10a..bfae76c2b2e 100644 --- a/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.py +++ b/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.py @@ -1,15 +1,16 @@ # Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and contributors # For license information, please see license.txt import json -from typing import Dict, Optional +from typing import Any, Dict, List, Optional, Tuple import frappe from frappe import _ from frappe.model.document import Document -from frappe.utils import cstr +from frappe.utils import cint, cstr from erpnext.manufacturing.doctype.bom_update_log.bom_updation_utils import ( get_leaf_boms, + get_next_higher_level_boms, handle_exception, replace_bom, set_values_in_log, @@ -111,55 +112,110 @@ def process_boms_cost_level_wise(update_doc: "BOMUpdateLog") -> None: if update_doc.status == "Queued": # First level yet to process. On Submit. - current_boms = {bom: False for bom in get_leaf_boms()} + current_level = 0 + current_boms = get_leaf_boms() values = { - "current_boms": json.dumps(current_boms), "parent_boms": "[]", "processed_boms": json.dumps({}), "status": "In Progress", + "current_level": current_level, } else: - # status is Paused, resume. via Cron Job. - current_boms, parent_boms = json.loads(update_doc.current_boms), json.loads( - update_doc.parent_boms - ) - if not current_boms: - # Process the next level BOMs. Stage parents as current BOMs. - current_boms = {bom: False for bom in parent_boms} - values = { - "current_boms": json.dumps(current_boms), - "parent_boms": "[]", - "status": "In Progress", - } + # Resume next level. via Cron Job. + current_level = cint(update_doc.current_level) + 1 + parent_boms = json.loads(update_doc.parent_boms) + + # Process the next level BOMs. Stage parents as current BOMs. + current_boms = parent_boms.copy() + values = {"parent_boms": "[]", "current_level": current_level} set_values_in_log(update_doc.name, values, commit=True) - queue_bom_cost_jobs(current_boms, update_doc) + queue_bom_cost_jobs(current_boms, update_doc, current_level) -def queue_bom_cost_jobs(current_boms: Dict, update_doc: "BOMUpdateLog") -> None: +def queue_bom_cost_jobs( + current_boms_list: List, update_doc: "BOMUpdateLog", current_level: int +) -> None: "Queue batches of 20k BOMs of the same level to process parallelly" - current_boms_list = [bom for bom in current_boms] + batch_no = 0 while current_boms_list: + batch_no += 1 batch_size = 20_000 boms_to_process = current_boms_list[:batch_size] # slice out batch of 20k BOMs # update list to exclude 20K (queued) BOMs current_boms_list = current_boms_list[batch_size:] if len(current_boms_list) > batch_size else [] + + batch_row = update_doc.append("bom_batches", {"level": current_level, "batch_no": batch_no}) + batch_row.db_insert() + frappe.enqueue( method="erpnext.manufacturing.doctype.bom_update_log.bom_updation_utils.update_cost_in_level", doc=update_doc, bom_list=boms_to_process, + batch_name=batch_row.name, timeout=40000, ) def resume_bom_cost_update_jobs(): - "Called every 10 minutes via Cron job." - paused_jobs = frappe.db.get_all("BOM Update Log", {"status": "Paused"}) - if not paused_jobs: + """ + 1. Checks for In Progress BOM Update Log. + 2. Checks if this job has completed the _current level_. + 3. If current level is complete, get parent BOMs and start next level. + 4. If no parents, mark as Complete. + 5. If current level is WIP, skip the Log. + + Called every 5 minutes via Cron job. + """ + + in_progress_logs = frappe.db.get_all( + "BOM Update Log", + {"update_type": "Update Cost", "status": "In Progress"}, + ["name", "processed_boms", "current_level"], + ) + if not in_progress_logs: return - for job in paused_jobs: - # resume from next level - process_boms_cost_level_wise(update_doc=frappe.get_doc("BOM Update Log", job.name)) + for log in in_progress_logs: + # check if all log batches of current level are processed + bom_batches = frappe.db.get_all( + "BOM Update Batch", {"parent": log.name, "level": log.current_level}, ["name", "boms_updated"] + ) + incomplete_level = any(not row.get("boms_updated") for row in bom_batches) + if not bom_batches or incomplete_level: + continue + + # Prep parent BOMs & updated processed BOMs for next level + current_boms, processed_boms = get_processed_current_boms(log, bom_batches) + parent_boms = get_next_higher_level_boms(child_boms=current_boms, processed_boms=processed_boms) + + set_values_in_log( + log.name, + values={ + "processed_boms": json.dumps(processed_boms), + "parent_boms": json.dumps(parent_boms), + "status": "Completed" if not parent_boms else "In Progress", + }, + commit=True, + ) + + if parent_boms: # there is a next level to process + process_boms_cost_level_wise(update_doc=frappe.get_doc("BOM Update Log", log.name)) + + +def get_processed_current_boms( + log: Dict[str, Any], bom_batches: Dict[str, Any] +) -> Tuple[List[str], Dict[str, Any]]: + "Aggregate all BOMs from BOM Update Batch rows into 'processed_boms' field and into current boms list." + processed_boms = json.loads(log.processed_boms) if log.processed_boms else {} + current_boms = [] + + for row in bom_batches: + boms_updated = json.loads(row.boms_updated) + current_boms.extend(boms_updated) + boms_updated_dict = {bom: True for bom in boms_updated} + processed_boms.update(boms_updated_dict) + + return current_boms, processed_boms diff --git a/erpnext/manufacturing/doctype/bom_update_log/bom_updation_utils.py b/erpnext/manufacturing/doctype/bom_update_log/bom_updation_utils.py index 93a15deb154..6f36f2e985b 100644 --- a/erpnext/manufacturing/doctype/bom_update_log/bom_updation_utils.py +++ b/erpnext/manufacturing/doctype/bom_update_log/bom_updation_utils.py @@ -38,7 +38,7 @@ def replace_bom(boms: Dict) -> None: bom_obj.save_version() -def update_cost_in_level(doc: "BOMUpdateLog", bom_list: List[str]) -> None: +def update_cost_in_level(doc: "BOMUpdateLog", bom_list: List[str], batch_name: int) -> None: "Updates Cost for BOMs within a given level. Runs via background jobs." try: @@ -47,19 +47,9 @@ def update_cost_in_level(doc: "BOMUpdateLog", bom_list: List[str]) -> None: return frappe.db.auto_commit_on_many_writes = 1 - # main updation logic - job_data = update_cost_in_boms(bom_list=bom_list, docname=doc.name) - set_values_in_log( - doc.name, - values={ - "current_boms": json.dumps(job_data.get("current_boms")), - "processed_boms": json.dumps(job_data.get("processed_boms")), - }, - commit=True, - ) - - process_if_level_is_complete(doc.name, job_data["current_boms"], job_data["processed_boms"]) + update_cost_in_boms(bom_list=bom_list) # main updation logic + frappe.db.set_value("BOM Update Batch", batch_name, "boms_updated", json.dumps(bom_list)) except Exception: handle_exception(doc) finally: @@ -112,48 +102,13 @@ def get_bom_unit_cost(bom_name: str) -> float: return frappe.utils.flt(new_bom_unitcost[0][0]) -def update_cost_in_boms(bom_list: List[str], docname: str) -> Dict[str, Dict]: +def update_cost_in_boms(bom_list: List[str]) -> None: "Updates cost in given BOMs. Returns current and total updated BOMs." - updated_boms = {} # current boms that have been updated - for bom in bom_list: bom_doc = frappe.get_cached_doc("BOM", bom) bom_doc.calculate_cost(save_updates=True, update_hour_rate=True) bom_doc.db_update() - updated_boms[bom] = True - - # Update processed BOMs in Log - log_data = frappe.db.get_values( - "BOM Update Log", docname, ["current_boms", "processed_boms"], as_dict=True - )[0] - - for field in ("current_boms", "processed_boms"): - log_data[field] = json.loads(log_data.get(field)) - log_data[field].update(updated_boms) - - return log_data - - -def process_if_level_is_complete( - docname: str, current_boms: Dict[str, bool], processed_boms: Dict[str, bool] -) -> None: - "Prepare and set higher level BOMs/dependants in Log if current level is complete." - - processing_complete = all(current_boms.get(bom) for bom in current_boms) - if not processing_complete: - return - - parent_boms = get_next_higher_level_boms(child_boms=current_boms, processed_boms=processed_boms) - set_values_in_log( - docname, - values={ - "current_boms": json.dumps({}), - "parent_boms": json.dumps(parent_boms), - "status": "Completed" if not parent_boms else "Paused", - }, - commit=True, - ) def get_next_higher_level_boms( @@ -244,7 +199,7 @@ def set_values_in_log(log_name: str, values: Dict[str, Any], commit: bool = Fals query.run() if commit: - frappe.db.commit() + frappe.db.commit() # nosemgrep def handle_exception(doc: "BOMUpdateLog") -> None: