From 2f9643d44d7119fa90e84c5d357b70081535413b Mon Sep 17 00:00:00 2001 From: Rohit Waghchaure Date: Tue, 27 Jan 2026 23:35:35 +0530 Subject: [PATCH] refactor: reposting for better peformance (cherry picked from commit 20787ef5da3a71e3b4a9970470ef035d7c225786) --- .../repost_item_valuation.js | 40 +- .../repost_item_valuation.json | 69 +- .../repost_item_valuation.py | 12 +- .../stock_ledger_variance.py | 7 +- erpnext/stock/stock_ledger.py | 712 ++++++++++-------- 5 files changed, 461 insertions(+), 379 deletions(-) diff --git a/erpnext/stock/doctype/repost_item_valuation/repost_item_valuation.js b/erpnext/stock/doctype/repost_item_valuation/repost_item_valuation.js index e6547ad6f35..c514b25c8ef 100644 --- a/erpnext/stock/doctype/repost_item_valuation/repost_item_valuation.js +++ b/erpnext/stock/doctype/repost_item_valuation/repost_item_valuation.js @@ -67,9 +67,15 @@ frappe.ui.form.on("Repost Item Valuation", { } if (frm.doc.status == "In Progress") { - frm.doc.current_index = data.current_index; - frm.doc.items_to_be_repost = data.items_to_be_repost; - frm.doc.total_reposting_count = data.total_reposting_count; + if (data.current_index) { + frm.doc.current_index = data.current_index; + frm.doc.items_to_be_repost = data.items_to_be_repost; + } + + if (data.vouchers_posted) { + frm.doc.total_vouchers = data.total_vouchers; + frm.doc.vouchers_posted = data.vouchers_posted; + } frm.dashboard.reset(); frm.trigger("show_reposting_progress"); @@ -104,15 +110,31 @@ frappe.ui.form.on("Repost Item Valuation", { show_reposting_progress: function (frm) { var bars = []; - + let title = ""; + let progress = 0.0; let total_count = frm.doc.items_to_be_repost ? JSON.parse(frm.doc.items_to_be_repost).length : 0; - if (frm.doc?.total_reposting_count) { - total_count = frm.doc.total_reposting_count; + if (total_count > 1) { + progress = flt((cint(frm.doc.current_index) / total_count) * 100, 2) || 0.5; + title = __("Reposting for Item-Wh Completed {0}%", [progress]); + + bars.push({ + title: title, + width: progress + "%", + progress_class: "progress-bar-success", + }); + + frm.dashboard.add_progress(__("Reposting Progress"), bars); } - let progress = flt((cint(frm.doc.current_index) / total_count) * 100, 2) || 0.5; - var title = __("Reposting Completed {0}%", [progress]); + if (!frm.doc.vouchers_posted) { + return; + } + + // Show voucher posting progress if vouchers are being reposted + bars = []; + progress = flt((cint(frm.doc.vouchers_posted) / cint(frm.doc.total_vouchers)) * 100, 2) || 0.5; + title = __("Reposting for Vouchers Completed {0}%", [progress]); bars.push({ title: title, @@ -120,7 +142,7 @@ frappe.ui.form.on("Repost Item Valuation", { progress_class: "progress-bar-success", }); - frm.dashboard.add_progress(__("Reposting Progress"), bars); + frm.dashboard.add_progress(__("Reposting Vouchers Progress"), bars); }, restart_reposting: function (frm) { diff --git a/erpnext/stock/doctype/repost_item_valuation/repost_item_valuation.json b/erpnext/stock/doctype/repost_item_valuation/repost_item_valuation.json index bd70072e4bd..3affd1e4be9 100644 --- a/erpnext/stock/doctype/repost_item_valuation/repost_item_valuation.json +++ b/erpnext/stock/doctype/repost_item_valuation/repost_item_valuation.json @@ -24,14 +24,16 @@ "error_section", "error_log", "reposting_info_section", - "reposting_data_file", "items_to_be_repost", - "distinct_item_and_warehouse", "column_break_o1sj", "total_reposting_count", "current_index", "gl_reposting_index", - "affected_transactions" + "reposting_data_file", + "vouchers_based_on_item_and_warehouse_section", + "total_vouchers", + "column_break_yqwo", + "vouchers_posted" ], "fields": [ { @@ -164,15 +166,6 @@ "print_hide": 1, "read_only": 1 }, - { - "fieldname": "distinct_item_and_warehouse", - "fieldtype": "Code", - "hidden": 1, - "label": "Distinct Item and Warehouse", - "no_copy": 1, - "print_hide": 1, - "read_only": 1 - }, { "fieldname": "current_index", "fieldtype": "Int", @@ -182,14 +175,6 @@ "print_hide": 1, "read_only": 1 }, - { - "fieldname": "affected_transactions", - "fieldtype": "Code", - "hidden": 1, - "label": "Affected Transactions", - "no_copy": 1, - "read_only": 1 - }, { "default": "0", "fieldname": "gl_reposting_index", @@ -202,7 +187,7 @@ { "fieldname": "reposting_info_section", "fieldtype": "Section Break", - "label": "Reposting Info" + "label": "Reposting Item and Warehouse" }, { "fieldname": "column_break_o1sj", @@ -211,14 +196,7 @@ { "fieldname": "total_reposting_count", "fieldtype": "Int", - "label": "Total Reposting Count", - "no_copy": 1, - "read_only": 1 - }, - { - "fieldname": "reposting_data_file", - "fieldtype": "Attach", - "label": "Reposting Data File", + "label": "No of Items to Repost", "no_copy": 1, "read_only": 1 }, @@ -228,13 +206,44 @@ "fieldname": "recreate_stock_ledgers", "fieldtype": "Check", "label": "Recreate Stock Ledgers" + }, + { + "fieldname": "vouchers_based_on_item_and_warehouse_section", + "fieldtype": "Section Break", + "hidden": 1, + "label": "Reposting Vouchers" + }, + { + "fieldname": "total_vouchers", + "fieldtype": "Int", + "label": "Total Ledgers", + "no_copy": 1, + "read_only": 1 + }, + { + "fieldname": "column_break_yqwo", + "fieldtype": "Column Break" + }, + { + "fieldname": "vouchers_posted", + "fieldtype": "Int", + "label": "Ledgers Posted", + "no_copy": 1, + "read_only": 1 + }, + { + "fieldname": "reposting_data_file", + "fieldtype": "Attach", + "label": "Reposting Data File", + "no_copy": 1, + "read_only": 1 } ], "grid_page_length": 50, "index_web_pages_for_search": 1, "is_submittable": 1, "links": [], - "modified": "2025-03-31 12:38:20.566196", + "modified": "2026-03-27 19:59:58.637964", "modified_by": "Administrator", "module": "Stock", "name": "Repost Item Valuation", diff --git a/erpnext/stock/doctype/repost_item_valuation/repost_item_valuation.py b/erpnext/stock/doctype/repost_item_valuation/repost_item_valuation.py index e3b1b330fad..16e0cac3b87 100644 --- a/erpnext/stock/doctype/repost_item_valuation/repost_item_valuation.py +++ b/erpnext/stock/doctype/repost_item_valuation/repost_item_valuation.py @@ -33,14 +33,12 @@ class RepostItemValuation(Document): if TYPE_CHECKING: from frappe.types import DF - affected_transactions: DF.Code | None allow_negative_stock: DF.Check allow_zero_rate: DF.Check amended_from: DF.Link | None based_on: DF.Literal["Transaction", "Item and Warehouse"] company: DF.Link | None current_index: DF.Int - distinct_item_and_warehouse: DF.Code | None error_log: DF.LongText | None gl_reposting_index: DF.Int item_code: DF.Link | None @@ -49,11 +47,14 @@ class RepostItemValuation(Document): posting_time: DF.Time | None recreate_stock_ledgers: DF.Check reposting_data_file: DF.Attach | None - status: DF.Literal["Queued", "In Progress", "Completed", "Skipped", "Failed"] + reposting_reference: DF.Data | None + status: DF.Literal["Queued", "In Progress", "Completed", "Skipped", "Failed", "Cancelled"] total_reposting_count: DF.Int + total_vouchers: DF.Int via_landed_cost_voucher: DF.Check voucher_no: DF.DynamicLink | None voucher_type: DF.Link | None + vouchers_posted: DF.Int warehouse: DF.Link | None # end: auto-generated types @@ -250,6 +251,9 @@ class RepostItemValuation(Document): self.distinct_item_and_warehouse = None self.items_to_be_repost = None self.gl_reposting_index = 0 + self.total_reposting_count = 0 + self.total_vouchers = 0 + self.vouchers_posted = 0 self.clear_attachment() self.db_update() @@ -381,7 +385,7 @@ def repost_sl_entries(doc): ) else: repost_future_sle( - args=[ + items_to_be_repost=[ frappe._dict( { "item_code": doc.item_code, diff --git a/erpnext/stock/report/stock_ledger_variance/stock_ledger_variance.py b/erpnext/stock/report/stock_ledger_variance/stock_ledger_variance.py index 808afadd05a..327f158e3f6 100644 --- a/erpnext/stock/report/stock_ledger_variance/stock_ledger_variance.py +++ b/erpnext/stock/report/stock_ledger_variance/stock_ledger_variance.py @@ -248,12 +248,7 @@ def get_item_warehouse_combinations(filters: dict | None = None) -> dict: bin.warehouse, item.valuation_method, ) - .where( - (item.is_stock_item == 1) - & (item.has_serial_no == 0) - & (warehouse.is_group == 0) - & (warehouse.company == filters.company) - ) + .where((item.is_stock_item == 1) & (warehouse.is_group == 0) & (warehouse.company == filters.company)) ) if filters.item_code: diff --git a/erpnext/stock/stock_ledger.py b/erpnext/stock/stock_ledger.py index d50d7fc5dba..89e572e6be3 100644 --- a/erpnext/stock/stock_ledger.py +++ b/erpnext/stock/stock_ledger.py @@ -4,16 +4,19 @@ import copy import gzip import json +from collections import deque import frappe from frappe import _, bold, scrub from frappe.model.meta import get_field_precision +from frappe.query_builder import Order from frappe.query_builder.functions import Sum from frappe.utils import ( cint, cstr, flt, format_date, + get_datetime, get_link_to_form, getdate, now, @@ -66,8 +69,8 @@ def make_sl_entries(sl_entries, allow_negative_stock=False, via_landed_cost_vouc from erpnext.controllers.stock_controller import future_sle_exists if sl_entries: - cancel = sl_entries[0].get("is_cancelled") - if cancel: + cancelled = sl_entries[0].get("is_cancelled") + if cancelled: validate_cancellation(sl_entries) set_as_cancel(sl_entries[0].get("voucher_type"), sl_entries[0].get("voucher_no")) @@ -75,10 +78,7 @@ def make_sl_entries(sl_entries, allow_negative_stock=False, via_landed_cost_vouc future_sle_exists(args, sl_entries) for sle in sl_entries: - if sle.serial_no and not via_landed_cost_voucher: - validate_serial_no(sle) - - if cancel: + if cancelled: sle["actual_qty"] = -flt(sle.get("actual_qty")) if sle["actual_qty"] < 0 and not sle.get("outgoing_rate"): @@ -155,35 +155,6 @@ def get_args_for_future_sle(row): ) -def validate_serial_no(sle): - from erpnext.stock.doctype.serial_no.serial_no import get_serial_nos - - for sn in get_serial_nos(sle.serial_no): - args = copy.deepcopy(sle) - args.serial_no = sn - args.warehouse = "" - - vouchers = [] - for row in get_stock_ledger_entries(args, ">"): - voucher_type = frappe.bold(row.voucher_type) - voucher_no = frappe.bold(get_link_to_form(row.voucher_type, row.voucher_no)) - vouchers.append(f"{voucher_type} {voucher_no}") - - if vouchers: - serial_no = frappe.bold(sn) - msg = ( - f"""The serial no {serial_no} has been used in the future transactions so you need to cancel them first. - The list of the transactions are as below.""" - + "

" - - title = "Cannot Submit" if not sle.get("is_cancelled") else "Cannot Cancel" - frappe.throw(_(msg), title=_(title), exc=SerialNoExistsInFutureTransaction) - - def validate_cancellation(kargs): if kargs[0].get("is_cancelled"): repost_entry = frappe.db.get_value( @@ -237,146 +208,96 @@ def make_entry(args, allow_negative_stock=False, via_landed_cost_voucher=False): def repost_future_sle( - args=None, + items_to_be_repost=None, voucher_type=None, voucher_no=None, allow_negative_stock=None, via_landed_cost_voucher=False, doc=None, ): - if not args: - args = [] # set args to empty list if None to avoid enumerate error - reposting_data = {} + if not items_to_be_repost: + items_to_be_repost = get_items_to_be_repost( + voucher_type=voucher_type, voucher_no=voucher_no, doc=doc, reposting_data=reposting_data + ) + if doc and doc.reposting_data_file: reposting_data = get_reposting_data(doc.reposting_data_file) - items_to_be_repost = get_items_to_be_repost( - voucher_type=voucher_type, voucher_no=voucher_no, doc=doc, reposting_data=reposting_data + repost_affected_transaction = get_affected_transactions(doc, reposting_data) or set() + resume_item_wh_wise_last_posted_sle = ( + get_item_wh_wise_last_posted_sle_from_reposting_data(doc, reposting_data) or {} ) - if items_to_be_repost: - args = items_to_be_repost - - distinct_item_warehouses = get_distinct_item_warehouse(args, doc, reposting_data=reposting_data) - affected_transactions = get_affected_transactions(doc, reposting_data=reposting_data) - - i = get_current_index(doc) or 0 - while i < len(args): - validate_item_warehouse(args[i]) + if not items_to_be_repost: + return + index = get_current_index(doc) or 0 + while index < len(items_to_be_repost): obj = update_entries_after( { - "item_code": args[i].get("item_code"), - "warehouse": args[i].get("warehouse"), - "posting_date": args[i].get("posting_date"), - "posting_time": args[i].get("posting_time"), - "creation": args[i].get("creation"), - "distinct_item_warehouses": distinct_item_warehouses, - "items_to_be_repost": args, - "current_index": i, + "item_code": items_to_be_repost[index].get("item_code"), + "warehouse": items_to_be_repost[index].get("warehouse"), + "posting_date": items_to_be_repost[index].get("posting_date"), + "posting_time": items_to_be_repost[index].get("posting_time"), + "creation": items_to_be_repost[index].get("creation"), + "current_idx": index, + "items_to_be_repost": items_to_be_repost, + "repost_doc": doc, + "repost_affected_transaction": repost_affected_transaction, + "item_wh_wise_last_posted_sle": resume_item_wh_wise_last_posted_sle, }, allow_negative_stock=allow_negative_stock, via_landed_cost_voucher=via_landed_cost_voucher, ) - affected_transactions.update(obj.affected_transactions) - key = (args[i].get("item_code"), args[i].get("warehouse")) - if distinct_item_warehouses.get(key): - distinct_item_warehouses[key].reposting_status = True + index += 1 - if obj.new_items_found: - for _item_wh, data in distinct_item_warehouses.items(): - if ("args_idx" not in data and not data.reposting_status) or ( - data.sle_changed and data.reposting_status - ): - data.args_idx = len(args) - args.append(data.sle) - elif data.sle_changed and not data.reposting_status: - args[data.args_idx] = data.sle - - data.sle_changed = False - i += 1 - - if doc: - update_args_in_repost_item_valuation( - doc, i, args, distinct_item_warehouses, affected_transactions - ) + resume_item_wh_wise_last_posted_sle = {} + repost_affected_transaction.update(obj.repost_affected_transaction) + update_args_in_repost_item_valuation(doc, index, items_to_be_repost, repost_affected_transaction) -def get_reposting_data(file_path) -> dict: - file_name = frappe.db.get_value( - "File", +def update_args_in_repost_item_valuation( + doc, + index, + items_to_be_repost, + repost_affected_transaction, + item_wh_wise_last_posted_sle=None, + only_affected_transaction=False, +): + file_name = "" + has_file = False + + if not item_wh_wise_last_posted_sle: + item_wh_wise_last_posted_sle = {} + + if doc.reposting_data_file: + has_file = True + + if doc.reposting_data_file: + file_name = get_reposting_file_name(doc.doctype, doc.name) + # frappe.delete_doc("File", file_name, ignore_permissions=True, delete_permanently=True) + + doc.reposting_data_file = create_json_gz_file( { - "file_url": file_path, - "attached_to_field": "reposting_data_file", + "repost_affected_transaction": repost_affected_transaction, + "item_wh_wise_last_posted_sle": {str(k): v for k, v in item_wh_wise_last_posted_sle.items()} + or {}, }, - "name", + doc, + file_name, ) - if not file_name: - return frappe._dict() - - attached_file = frappe.get_doc("File", file_name) - - content = attached_file.get_content() - if isinstance(content, str): - content = content.encode("utf-8") - - try: - data = gzip.decompress(content) - except Exception: - return frappe._dict() - - if data := json.loads(data.decode("utf-8")): - data = data - - return parse_json(data) - - -def validate_item_warehouse(args): - for field in ["item_code", "warehouse", "posting_date", "posting_time"]: - if args.get(field) in [None, ""]: - validation_msg = f"The field {frappe.unscrub(field)} is required for the reposting" - frappe.throw(_(validation_msg)) - - -def update_args_in_repost_item_valuation(doc, index, args, distinct_item_warehouses, affected_transactions): - if not doc.items_to_be_repost: - file_name = "" - if doc.reposting_data_file: - file_name = get_reposting_file_name(doc.doctype, doc.name) - # frappe.delete_doc("File", file_name, ignore_permissions=True, delete_permanently=True) - - doc.reposting_data_file = create_json_gz_file( - { - "items_to_be_repost": args, - "distinct_item_and_warehouse": {str(k): v for k, v in distinct_item_warehouses.items()}, - "affected_transactions": affected_transactions, - }, - doc, - file_name, - ) - + if not only_affected_transaction or not has_file: doc.db_set( { "current_index": index, - "total_reposting_count": len(args), + "items_to_be_repost": frappe.as_json(items_to_be_repost), + "total_reposting_count": len(items_to_be_repost), "reposting_data_file": doc.reposting_data_file, } ) - else: - doc.db_set( - { - "items_to_be_repost": json.dumps(args, default=str), - "distinct_item_and_warehouse": json.dumps( - {str(k): v for k, v in distinct_item_warehouses.items()}, default=str - ), - "current_index": index, - "affected_transactions": frappe.as_json(affected_transactions), - } - ) - if not frappe.flags.in_test: frappe.db.commit() @@ -384,9 +305,8 @@ def update_args_in_repost_item_valuation(doc, index, args, distinct_item_warehou "item_reposting_progress", { "name": doc.name, - "items_to_be_repost": json.dumps(args, default=str), "current_index": index, - "total_reposting_count": len(args), + "total_reposting_count": len(items_to_be_repost), }, doctype=doc.doctype, docname=doc.name, @@ -443,23 +363,27 @@ def create_file(doc, compressed_content): return _file.file_url -def get_items_to_be_repost(voucher_type=None, voucher_no=None, doc=None, reposting_data=None): - if not reposting_data and doc and doc.reposting_data_file: - reposting_data = get_reposting_data(doc.reposting_data_file) +def validate_item_warehouse(args): + for field in ["item_code", "warehouse", "posting_date", "posting_time"]: + if args.get(field) in [None, ""]: + validation_msg = f"The field {frappe.unscrub(field)} is required for the reposting" + frappe.throw(_(validation_msg)) + +def get_items_to_be_repost(voucher_type=None, voucher_no=None, doc=None, reposting_data=None): if reposting_data and reposting_data.items_to_be_repost: return reposting_data.items_to_be_repost items_to_be_repost = [] if doc and doc.items_to_be_repost: - items_to_be_repost = json.loads(doc.items_to_be_repost) or [] + items_to_be_repost = json.loads(doc.items_to_be_repost) if not items_to_be_repost and voucher_type and voucher_no: items_to_be_repost = frappe.db.get_all( "Stock Ledger Entry", filters={"voucher_type": voucher_type, "voucher_no": voucher_no}, - fields=["item_code", "warehouse", "posting_date", "posting_time", "creation"], + fields=["item_code", "warehouse", "posting_date", "posting_time", "creation", "posting_datetime"], order_by="creation asc", group_by="item_code, warehouse", ) @@ -467,51 +391,54 @@ def get_items_to_be_repost(voucher_type=None, voucher_no=None, doc=None, reposti return items_to_be_repost or [] -def get_distinct_item_warehouse(args=None, doc=None, reposting_data=None): - if not reposting_data and doc and doc.reposting_data_file: - reposting_data = get_reposting_data(doc.reposting_data_file) - - if reposting_data and reposting_data.distinct_item_and_warehouse: - return parse_distinct_items_and_warehouses(reposting_data.distinct_item_and_warehouse) - - distinct_item_warehouses = {} - - if doc and doc.distinct_item_and_warehouse: - distinct_item_warehouses = json.loads(doc.distinct_item_and_warehouse) - distinct_item_warehouses = { - frappe.safe_eval(k): frappe._dict(v) for k, v in distinct_item_warehouses.items() - } - else: - for i, d in enumerate(args): - distinct_item_warehouses.setdefault( - (d.item_code, d.warehouse), frappe._dict({"reposting_status": False, "sle": d, "args_idx": i}) - ) - - return distinct_item_warehouses - - -def parse_distinct_items_and_warehouses(distinct_items_and_warehouses): - new_dict = frappe._dict({}) - - # convert string keys to tuple - for k, v in distinct_items_and_warehouses.items(): - new_dict[frappe.safe_eval(k)] = frappe._dict(v) - - return new_dict - - def get_affected_transactions(doc, reposting_data=None) -> set[tuple[str, str]]: if not reposting_data and doc and doc.reposting_data_file: reposting_data = get_reposting_data(doc.reposting_data_file) - if reposting_data and reposting_data.affected_transactions: - return {tuple(transaction) for transaction in reposting_data.affected_transactions} + if reposting_data and reposting_data.repost_affected_transaction: + return {tuple(transaction) for transaction in reposting_data.repost_affected_transaction} - if not doc.affected_transactions: - return set() + return set() - transactions = frappe.parse_json(doc.affected_transactions) - return {tuple(transaction) for transaction in transactions} + +def get_item_wh_wise_last_posted_sle_from_reposting_data(doc, reposting_data=None): + if not reposting_data and doc and doc.reposting_data_file: + reposting_data = get_reposting_data(doc.reposting_data_file) + + if reposting_data and reposting_data.item_wh_wise_last_posted_sle: + return frappe._dict(reposting_data.item_wh_wise_last_posted_sle) + + return frappe._dict() + + +def get_reposting_data(file_path) -> dict: + file_name = frappe.db.get_value( + "File", + { + "file_url": file_path, + "attached_to_field": "reposting_data_file", + }, + "name", + ) + + if not file_name: + return frappe._dict() + + attached_file = frappe.get_doc("File", file_name) + + content = attached_file.get_content() + if isinstance(content, str): + content = content.encode("utf-8") + + try: + data = gzip.decompress(content) + except Exception: + return frappe._dict() + + if data := json.loads(data.decode("utf-8")): + data = data + + return parse_json(data) def get_current_index(doc=None): @@ -547,6 +474,10 @@ class update_entries_after: self.allow_zero_rate = allow_zero_rate self.via_landed_cost_voucher = via_landed_cost_voucher self.item_code = args.get("item_code") + self.stock_ledgers_to_repost = [] + self.current_idx = args.get("current_idx", 0) + self.repost_doc = args.get("repost_doc") or None + self.items_to_be_repost = args.get("items_to_be_repost") or None self.allow_negative_stock = allow_negative_stock or is_negative_stock_allowed( item_code=self.item_code @@ -556,17 +487,20 @@ class update_entries_after: if self.args.sle_id: self.args["name"] = self.args.sle_id + self.prev_sle_dict = frappe._dict({}) self.company = frappe.get_cached_value("Warehouse", self.args.warehouse, "company") self.set_precision() self.valuation_method = get_valuation_method(self.item_code) + self.repost_affected_transaction = args.get("repost_affected_transaction") or set() self.new_items_found = False - self.distinct_item_warehouses = args.get("distinct_item_warehouses", frappe._dict()) - self.affected_transactions: set[tuple[str, str]] = set() self.reserved_stock = self.get_reserved_stock() self.data = frappe._dict() - self.initialize_previous_data(self.args) + + if not self.repost_doc or not self.args.get("item_wh_wise_last_posted_sle"): + self.initialize_previous_data(self.args) + self.build() def get_reserved_stock(self): @@ -613,7 +547,14 @@ class update_entries_after: """ self.data.setdefault(args.warehouse, frappe._dict()) warehouse_dict = self.data[args.warehouse] + + if self.stock_ledgers_to_repost: + return + previous_sle = get_previous_sle_of_current_voucher(args) + if previous_sle: + self.prev_sle_dict[(args.get("item_code"), args.get("warehouse"))] = previous_sle + warehouse_dict.previous_sle = previous_sle for key in ("qty_after_transaction", "valuation_rate", "stock_value"): @@ -635,27 +576,185 @@ class update_entries_after: if not future_sle_exists(self.args): self.update_bin() else: - entries_to_fix = self.get_future_entries_to_fix() + self.item_wh_wise_last_posted_sle = self.get_item_wh_wise_last_posted_sle() + _item_wh_sle = self.sort_sles(self.item_wh_wise_last_posted_sle.values()) - i = 0 - while i < len(entries_to_fix): - sle = entries_to_fix[i] - i += 1 + while _item_wh_sle: + self.initialize_reposting() + sle_dict = _item_wh_sle.pop(0) + self.repost_stock_ledgers(sle_dict) - self.process_sle(sle) - self.update_bin_data(sle) - - if sle.dependant_sle_voucher_detail_no: - entries_to_fix = self.get_dependent_entries_to_fix(entries_to_fix, sle) - if sle.voucher_type == "Stock Entry" and is_repack_entry(sle.voucher_no): - # for repack entries, we need to repost both source and target warehouses - self.update_distinct_item_warehouses_for_repack(sle) + self.update_bin() + self.reset_vouchers_and_idx() + self.update_data_in_repost() if self.exceptions: self.raise_exceptions() - def update_distinct_item_warehouses_for_repack(self, sle): - sles = ( + def initialize_reposting(self): + self._sles = [] + self.distinct_sles = set() + self.distinct_dependant_item_wh = set() + self.prev_sle_dict = frappe._dict({}) + + def get_item_wh_wise_last_posted_sle(self): + if self.args and self.args.get("item_wh_wise_last_posted_sle"): + _sles = {} + for key, sle in self.args.get("item_wh_wise_last_posted_sle").items(): + _sles[frappe.safe_eval(key)] = frappe._dict(sle) + + return _sles + + return { + (self.args.item_code, self.args.warehouse): frappe._dict( + { + "item_code": self.args.item_code, + "warehouse": self.args.warehouse, + "posting_datetime": get_combine_datetime(self.args.posting_date, self.args.posting_time), + "posting_date": self.args.posting_date, + "posting_time": self.args.posting_time, + "creation": self.args.creation, + } + ) + } + + def repost_stock_ledgers(self, sle_dict=None): + self._sles = self.get_future_entries_to_repost(sle_dict) + + if not isinstance(self._sles, deque): + self._sles = deque(self._sles) + + i = 0 + while self._sles: + sle = self._sles.popleft() + i += 1 + if sle.name in self.distinct_sles: + continue + + item_wh_key = (sle.item_code, sle.warehouse) + if item_wh_key not in self.prev_sle_dict: + self.prev_sle_dict[item_wh_key] = get_previous_sle_of_current_voucher(sle) + + self.repost_stock_ledger_entry(sle) + + # To avoid duplicate reposting of same sle in case of multiple dependant sle + self.distinct_sles.add(sle.name) + + if sle.dependant_sle_voucher_detail_no: + self.include_dependant_sle_in_reposting(sle) + self.update_item_wh_wise_last_posted_sle(sle) + + if i % 1000 == 0: + self.update_data_in_repost(len(self._sles), i) + + def sort_sles(self, sles): + return sorted( + sles, + key=lambda d: ( + get_datetime(d.posting_datetime), + get_datetime(d.creation), + ), + ) + + def include_dependant_sle_in_reposting(self, sle): + repost_dependant_sle = False + if sle.voucher_type == "Stock Entry" and is_repack_entry(sle.voucher_no): + repack_sles = self.get_sles_for_repack(sle) + for repack_sle in repack_sles: + if (repack_sle.item_code, repack_sle.warehouse) in self.distinct_dependant_item_wh: + continue + + repost_dependant_sle = True + self.distinct_dependant_item_wh.add((repack_sle.item_code, repack_sle.warehouse)) + self._sles.extend(self.get_future_entries_to_repost(repack_sle)) + else: + dependant_sles = get_sle_by_voucher_detail_no(sle.dependant_sle_voucher_detail_no) + for depend_sle in dependant_sles: + if (depend_sle.item_code, depend_sle.warehouse) in self.distinct_dependant_item_wh: + continue + + repost_dependant_sle = True + self.distinct_dependant_item_wh.add((depend_sle.item_code, depend_sle.warehouse)) + self._sles.extend(self.get_future_entries_to_repost(depend_sle)) + + if repost_dependant_sle: + self._sles = deque(self.sort_sles(self._sles)) + + def repost_stock_ledger_entry(self, sle): + if self.args.item_code != sle.item_code or self.args.warehouse != sle.warehouse: + self.repost_affected_transaction.add((sle.voucher_type, sle.voucher_no)) + + if isinstance(sle, dict): + sle = frappe._dict(sle) + + self.process_sle(sle) + self.update_item_wh_wise_last_posted_sle(sle) + + def update_item_wh_wise_last_posted_sle(self, sle): + if not self._sles: + self.item_wh_wise_last_posted_sle = frappe._dict() + return + + self.item_wh_wise_last_posted_sle[(sle.item_code, sle.warehouse)] = frappe._dict( + { + "item_code": sle.item_code, + "warehouse": sle.warehouse, + "posting_date": sle.posting_date, + "posting_time": sle.posting_time, + "posting_datetime": sle.posting_datetime + or get_combine_datetime(sle.posting_date, sle.posting_time), + "creation": sle.creation, + } + ) + + def reset_vouchers_and_idx(self): + self.stock_ledgers_to_repost = [] + self.prev_sle_dict = frappe._dict() + self.item_wh_wise_last_posted_sle = frappe._dict() + + def update_data_in_repost(self, total_sles=None, index=None): + if not self.repost_doc: + return + + values_to_update = { + "total_vouchers": cint(total_sles) + cint(index), + "vouchers_posted": index or 0, + } + + self.repost_doc.db_set(values_to_update) + + update_args_in_repost_item_valuation( + self.repost_doc, + self.current_idx, + self.items_to_be_repost, + self.repost_affected_transaction, + self.item_wh_wise_last_posted_sle, + only_affected_transaction=True, + ) + + if not frappe.flags.in_test: + # To maintain the state of the reposting, so if timeout happens, it can be resumed from the last posted voucher + frappe.db.commit() # nosemgrep + + self.publish_real_time_progress(total_sles=total_sles, index=index) + + def publish_real_time_progress(self, total_sles=None, index=None): + frappe.publish_realtime( + "item_reposting_progress", + { + "name": self.repost_doc.name, + "total_vouchers": cint(total_sles) + cint(index), + "vouchers_posted": index or 0, + }, + doctype=self.repost_doc.doctype, + docname=self.repost_doc.name, + ) + + def get_future_entries_to_repost(self, kwargs): + return get_stock_ledger_entries(kwargs, ">=", "asc", for_update=True, check_serial_no=False) + + def get_sles_for_repack(self, sle): + return ( frappe.get_all( "Stock Ledger Entry", filters={ @@ -663,16 +762,20 @@ class update_entries_after: "voucher_no": sle.voucher_no, "actual_qty": (">", 0), "is_cancelled": 0, - "voucher_detail_no": ("!=", sle.dependant_sle_voucher_detail_no), + "dependant_sle_voucher_detail_no": ("!=", sle.dependant_sle_voucher_detail_no), }, - fields=["*"], + fields=[ + "item_code", + "warehouse", + "posting_date", + "posting_time", + "posting_datetime", + "creation", + ], ) or [] ) - for dependant_sle in sles: - self.update_distinct_item_warehouses(dependant_sle) - def has_stock_reco_with_serial_batch(self, sle): if ( sle.voucher_type == "Stock Reconciliation" @@ -683,35 +786,10 @@ class update_entries_after: return False def process_sle_against_current_timestamp(self): - sl_entries = self.get_sle_against_current_voucher() + sl_entries = get_sle_against_current_voucher(self.args) for sle in sl_entries: self.process_sle(sle) - def get_sle_against_current_voucher(self): - self.args["posting_datetime"] = get_combine_datetime(self.args.posting_date, self.args.posting_time) - - return frappe.db.sql( - """ - select - *, posting_datetime as "timestamp" - from - `tabStock Ledger Entry` - where - item_code = %(item_code)s - and warehouse = %(warehouse)s - and is_cancelled = 0 - and ( - posting_datetime = %(posting_datetime)s - ) - and creation = %(creation)s - order by - creation ASC - for update - """, - self.args, - as_dict=1, - ) - def get_future_entries_to_fix(self): # includes current entry! args = self.data[self.args.warehouse].previous_sle or frappe._dict( @@ -720,78 +798,8 @@ class update_entries_after: return list(self.get_sle_after_datetime(args)) - def get_dependent_entries_to_fix(self, entries_to_fix, sle): - dependant_sle = get_sle_by_voucher_detail_no( - sle.dependant_sle_voucher_detail_no, excluded_sle=sle.name - ) - - if not dependant_sle: - return entries_to_fix - elif dependant_sle.item_code == self.item_code and dependant_sle.warehouse == self.args.warehouse: - return entries_to_fix - elif dependant_sle.item_code != self.item_code: - self.update_distinct_item_warehouses(dependant_sle) - return entries_to_fix - elif dependant_sle.item_code == self.item_code and dependant_sle.warehouse in self.data: - return entries_to_fix - else: - self.initialize_previous_data(dependant_sle) - self.update_distinct_item_warehouses(dependant_sle) - return entries_to_fix - - def update_distinct_item_warehouses(self, dependant_sle): - key = (dependant_sle.item_code, dependant_sle.warehouse) - val = frappe._dict({"sle": dependant_sle}) - - if key not in self.distinct_item_warehouses: - self.distinct_item_warehouses[key] = val - self.new_items_found = True - else: - existing_sle = self.distinct_item_warehouses[key].get("sle", {}) - if getdate(existing_sle.get("posting_date")) > getdate(dependant_sle.posting_date): - self.distinct_item_warehouses[key] = val - self.new_items_found = True - elif ( - dependant_sle.actual_qty > 0 - and dependant_sle.voucher_type == "Stock Entry" - and is_transfer_stock_entry(dependant_sle.voucher_no) - ): - if self.distinct_item_warehouses[key].get("transfer_entry_to_repost"): - return - - val["transfer_entry_to_repost"] = True - self.distinct_item_warehouses[key] = val - self.new_items_found = True - - def is_dependent_voucher_reposted(self, dependant_sle) -> bool: - # Return False if the dependent voucher is not reposted - - if self.args.items_to_be_repost and self.args.current_index: - index = self.args.current_index - while index < len(self.args.items_to_be_repost): - if ( - self.args.items_to_be_repost[index].get("item_code") == dependant_sle.item_code - and self.args.items_to_be_repost[index].get("warehouse") == dependant_sle.warehouse - ): - if getdate(self.args.items_to_be_repost[index].get("posting_date")) > getdate( - dependant_sle.posting_date - ): - self.args.items_to_be_repost[index]["posting_date"] = dependant_sle.posting_date - - return False - - index += 1 - - return True - - def get_dependent_voucher_detail_nos(self, key): - if "dependent_voucher_detail_nos" not in self.distinct_item_warehouses[key]: - self.distinct_item_warehouses[key].dependent_voucher_detail_nos = [] - - return self.distinct_item_warehouses[key].dependent_voucher_detail_nos - def validate_previous_sle_qty(self, sle): - previous_sle = self.data[sle.warehouse].previous_sle + previous_sle = self.prev_sle_dict.get((sle.item_code, sle.warehouse)) if previous_sle and previous_sle.get("qty_after_transaction") < 0 and sle.get("actual_qty") > 0: frappe.msgprint( _( @@ -810,10 +818,32 @@ class update_entries_after: def process_sle(self, sle): # previous sle data for this warehouse - self.wh_data = self.data[sle.warehouse] + key = (sle.item_code, sle.warehouse) + if key not in self.prev_sle_dict: + prev_sle = get_previous_sle_of_current_voucher(sle) + if prev_sle: + self.prev_sle_dict[key] = prev_sle + + if not self.prev_sle_dict.get(key): + self.prev_sle_dict[key] = frappe._dict( + { + "qty_after_transaction": 0.0, + "valuation_rate": 0.0, + "stock_value": 0.0, + "prev_stock_value": 0.0, + "stock_queue": [], + } + ) + + self.wh_data = self.prev_sle_dict.get(key) + + if self.wh_data.stock_queue and isinstance(self.wh_data.stock_queue, str): + self.wh_data.stock_queue = json.loads(self.wh_data.stock_queue) + + if not self.wh_data.prev_stock_value: + self.wh_data.prev_stock_value = self.wh_data.stock_value self.validate_previous_sle_qty(sle) - self.affected_transactions.add((sle.voucher_type, sle.voucher_no)) if (sle.serial_no and not self.via_landed_cost_voucher) or not cint(self.allow_negative_stock): # validate negative stock for serialized items, fifo valuation @@ -916,6 +946,7 @@ class update_entries_after: sle.stock_queue = json.dumps(self.wh_data.stock_queue) sle.stock_value_difference = stock_value_difference + if ( sle.is_adjustment_entry and flt(sle.qty_after_transaction, self.flt_precision) == 0 @@ -940,6 +971,8 @@ class update_entries_after: sle.modified = now() frappe.get_doc(sle).db_update() + self.prev_sle_dict[key] = sle + if not self.args.get("sle_id") or ( sle.serial_and_batch_bundle and sle.auto_created_serial_and_batch_bundle ): @@ -1713,15 +1746,42 @@ class update_entries_after: def update_bin(self): # update bin for each warehouse - for warehouse, data in self.data.items(): - bin_name = get_or_make_bin(self.item_code, warehouse) + for (item_code, warehouse), data in self.prev_sle_dict.items(): + bin_name = get_or_make_bin(item_code, warehouse) - updated_values = {"actual_qty": data.qty_after_transaction, "stock_value": data.stock_value} + updated_values = { + "actual_qty": flt(data.qty_after_transaction), + "stock_value": flt(data.stock_value), + } if data.valuation_rate is not None: - updated_values["valuation_rate"] = data.valuation_rate + updated_values["valuation_rate"] = flt(data.valuation_rate) + frappe.db.set_value("Bin", bin_name, updated_values, update_modified=True) +def get_sle_against_current_voucher(kwargs): + kwargs["posting_datetime"] = get_combine_datetime(kwargs.posting_date, kwargs.posting_time) + doctype = frappe.qb.DocType("Stock Ledger Entry") + + query = ( + frappe.qb.from_(doctype) + .select("*") + .where( + (doctype.item_code == kwargs.item_code) + & (doctype.warehouse == kwargs.warehouse) + & (doctype.is_cancelled == 0) + & (doctype.posting_datetime == kwargs.posting_datetime) + ) + .orderby(doctype.creation, order=Order.asc) + .for_update() + ) + + if not kwargs.get("cancelled"): + query = query.where(doctype.creation == kwargs.creation) + + return query.run(as_dict=True) + + def get_previous_sle_of_current_voucher(args, operator="<", exclude_current_voucher=False): """get stock ledger entries filtered by specific posting datetime conditions""" @@ -1874,23 +1934,15 @@ def get_stock_ledger_entries( ) -def get_sle_by_voucher_detail_no(voucher_detail_no, excluded_sle=None): - return frappe.db.get_value( +def get_sle_by_voucher_detail_no(voucher_detail_no): + return frappe.get_all( "Stock Ledger Entry", - {"voucher_detail_no": voucher_detail_no, "name": ["!=", excluded_sle], "is_cancelled": 0}, - [ - "item_code", - "warehouse", - "actual_qty", - "qty_after_transaction", - "posting_date", - "posting_time", - "voucher_detail_no", - "posting_datetime as timestamp", - "voucher_type", - "voucher_no", - ], - as_dict=1, + filters={ + "voucher_detail_no": voucher_detail_no, + "is_cancelled": 0, + "dependant_sle_voucher_detail_no": ("is", "not set"), + }, + fields=["item_code", "warehouse", "posting_date", "posting_time", "posting_datetime", "creation"], )