From 5bbecbf7c4194969731de58cef363cd140bfb260 Mon Sep 17 00:00:00 2001 From: Rohit Waghchaure Date: Tue, 27 Jan 2026 23:35:35 +0530 Subject: [PATCH] refactor: reposting for better peformance (cherry picked from commit 20787ef5da3a71e3b4a9970470ef035d7c225786) --- .../repost_item_valuation.py | 2 +- erpnext/stock/stock_ledger.py | 240 ++++++++++-------- 2 files changed, 142 insertions(+), 100 deletions(-) diff --git a/erpnext/stock/doctype/repost_item_valuation/repost_item_valuation.py b/erpnext/stock/doctype/repost_item_valuation/repost_item_valuation.py index f5b4ef3e8f5..c68d3ed329a 100644 --- a/erpnext/stock/doctype/repost_item_valuation/repost_item_valuation.py +++ b/erpnext/stock/doctype/repost_item_valuation/repost_item_valuation.py @@ -435,7 +435,7 @@ def repost_sl_entries(doc): ) else: repost_future_sle( - args=[ + item_wh_to_repost=[ frappe._dict( { "item_code": doc.item_code, diff --git a/erpnext/stock/stock_ledger.py b/erpnext/stock/stock_ledger.py index 1e6cec59a5c..6810f45827c 100644 --- a/erpnext/stock/stock_ledger.py +++ b/erpnext/stock/stock_ledger.py @@ -16,6 +16,7 @@ from frappe.utils import ( cstr, flt, format_date, + get_datetime, get_link_to_form, getdate, now, @@ -242,15 +243,15 @@ def make_entry(args, allow_negative_stock=False, via_landed_cost_voucher=False): def repost_future_sle( - args=None, + item_wh_to_repost=None, voucher_type=None, voucher_no=None, allow_negative_stock=None, via_landed_cost_voucher=False, doc=None, ): - if not args: - args = [] # set args to empty list if None to avoid enumerate error + if not item_wh_to_repost: + item_wh_to_repost = [] # set args to empty list if None to avoid enumerate error reposting_data = {} if doc and doc.reposting_data_file: @@ -260,53 +261,34 @@ def repost_future_sle( voucher_type=voucher_type, voucher_no=voucher_no, doc=doc, reposting_data=reposting_data ) if items_to_be_repost: - args = items_to_be_repost - - distinct_item_warehouses = get_distinct_item_warehouse(args, doc, reposting_data=reposting_data) - affected_transactions = get_affected_transactions(doc, reposting_data=reposting_data) + item_wh_to_repost = items_to_be_repost + distinct_item_wh_sles = frappe._dict() + prev_sle_dict = frappe._dict() + vouchers_to_repost = set() + item_wh_idx = 0 i = get_current_index(doc) or 0 - while i < len(args): - validate_item_warehouse(args[i]) - obj = update_entries_after( + while i < len(item_wh_to_repost): + update_entries_after( { - "item_code": args[i].get("item_code"), - "warehouse": args[i].get("warehouse"), - "posting_date": args[i].get("posting_date"), - "posting_time": args[i].get("posting_time"), - "creation": args[i].get("creation"), - "distinct_item_warehouses": distinct_item_warehouses, - "items_to_be_repost": args, - "current_index": i, + "item_code": item_wh_to_repost[i].get("item_code"), + "warehouse": item_wh_to_repost[i].get("warehouse"), + "posting_date": item_wh_to_repost[i].get("posting_date"), + "posting_time": item_wh_to_repost[i].get("posting_time"), + "creation": item_wh_to_repost[i].get("creation"), + "distinct_item_wh_sles": distinct_item_wh_sles, + "prev_sle_dict": prev_sle_dict, + "vouchers_to_repost": vouchers_to_repost, + "item_wh_idx": item_wh_idx, + "current_idx": i, }, allow_negative_stock=allow_negative_stock, via_landed_cost_voucher=via_landed_cost_voucher, ) - affected_transactions.update(obj.affected_transactions) - key = (args[i].get("item_code"), args[i].get("warehouse")) - if distinct_item_warehouses.get(key): - distinct_item_warehouses[key].reposting_status = True - - if obj.new_items_found: - for _item_wh, data in distinct_item_warehouses.items(): - if ("args_idx" not in data and not data.reposting_status) or ( - data.sle_changed and data.reposting_status - ): - data.args_idx = len(args) - args.append(data.sle) - elif data.sle_changed and not data.reposting_status: - args[data.args_idx] = data.sle - - data.sle_changed = False i += 1 - if doc: - update_args_in_repost_item_valuation( - doc, i, args, distinct_item_warehouses, affected_transactions - ) - def get_reposting_data(file_path) -> dict: file_name = frappe.db.get_value( @@ -552,6 +534,11 @@ class update_entries_after: self.allow_zero_rate = allow_zero_rate self.via_landed_cost_voucher = via_landed_cost_voucher self.item_code = args.get("item_code") + self.distinct_item_wh_sles = args.get("distinct_item_wh_sles", frappe._dict()) + self.prev_sle_dict = args.get("prev_sle_dict", frappe._dict()) + self.vouchers_to_repost = args.get("vouchers_to_repost", set()) + self.item_wh_idx = args.get("item_wh_idx", 0) + self.current_idx = args.get("current_idx", 0) self.allow_negative_stock = allow_negative_stock or is_negative_stock_allowed( item_code=self.item_code @@ -566,7 +553,6 @@ class update_entries_after: self.valuation_method = get_valuation_method(self.item_code, self.company) self.new_items_found = False - self.distinct_item_warehouses = args.get("distinct_item_warehouses", frappe._dict()) self.affected_transactions: set[tuple[str, str]] = set() self.reserved_stock = self.get_reserved_stock() @@ -622,6 +608,8 @@ class update_entries_after: self.data.setdefault(args.warehouse, frappe._dict()) warehouse_dict = self.data[args.warehouse] previous_sle = get_previous_sle_of_current_voucher(args) + self.prev_sle_dict[(args.get("item_code"), args.get("warehouse"))] = previous_sle + warehouse_dict.previous_sle = previous_sle for key in ("qty_after_transaction", "valuation_rate", "stock_value"): @@ -642,28 +630,78 @@ class update_entries_after: self.process_sle_against_current_timestamp() if not future_sle_exists(self.args): self.update_bin() + elif self.vouchers_to_repost: + self.repost_vouchers() else: - entries_to_fix = self.get_future_entries_to_fix() + kwargs = frappe._dict( + { + "item_code": self.item_code, + "warehouse": self.args.warehouse, + "posting_date": self.args.posting_date, + "posting_time": self.args.posting_time, + "creation": self.args.creation, + } + ) - i = 0 - while i < len(entries_to_fix): - sle = entries_to_fix[i] - i += 1 + self.prepare_sles_to_repost(kwargs) + self.repost_vouchers() + self.update_bin() - self.process_sle(sle) - self.update_bin_data(sle) + def repost_vouchers(self): + vouchers = self.vouchers_to_repost or self.get_vouchere_to_repost() + for row in vouchers: + self.process_sle(row) - if sle.dependant_sle_voucher_detail_no: - entries_to_fix = self.get_dependent_entries_to_fix(entries_to_fix, sle) - if sle.voucher_type == "Stock Entry" and is_repack_entry(sle.voucher_no): - # for repack entries, we need to repost both source and target warehouses - self.update_distinct_item_warehouses_for_repack(sle) + def get_vouchere_to_repost(self): + if not self.distinct_item_wh_sles: + return [] - if self.exceptions: - self.raise_exceptions() + sorted_rows = sorted( + (row for rows in self.distinct_item_wh_sles.values() for row in rows), + key=lambda d: (get_datetime(d.get("posting_datetime")), get_datetime(d.get("creation"))), + ) - def update_distinct_item_warehouses_for_repack(self, sle): - sles = ( + return sorted_rows + + def prepare_sles_to_repost(self, kwargs): + sles = self.get_future_entries_to_repost(kwargs) + for sle in sles: + item_wh_key = (sle.item_code, sle.warehouse) + if item_wh_key not in self.prev_sle_dict: + prev_sle = get_previous_sle_of_current_voucher(kwargs) + self.prev_sle_dict[item_wh_key] = prev_sle + + key = (sle.item_code, sle.warehouse, sle.voucher_detail_no) + if key not in self.distinct_item_wh_sles: + self.distinct_item_wh_sles.setdefault(key, []).append(sle) + + if sle.dependant_sle_voucher_detail_no: + self.prepare_dependent_sles_to_repost(sle) + + def prepare_dependent_sles_to_repost(self, sle): + if sle.voucher_type == "Stock Entry" and is_repack_entry(sle.voucher_no): + sles = self.get_sles_for_repack(sle) + for repack_sle in sles: + key = (repack_sle.item_code, repack_sle.warehouse, repack_sle.voucher_detail_no) + if key not in self.distinct_item_wh_sles: + self.distinct_item_wh_sles.setdefault(key, []).append(repack_sle) + + self.prepare_sles_to_repost(repack_sle) + + elif sle.dependant_sle_voucher_detail_no: + dependant_sle = get_sle_by_voucher_detail_no(sle.dependant_sle_voucher_detail_no) + + key = (dependant_sle.item_code, dependant_sle.warehouse, dependant_sle.voucher_detail_no) + if key not in self.distinct_item_wh_sles: + self.distinct_item_wh_sles.setdefault(key, []).append(dependant_sle) + + self.prepare_sles_to_repost(dependant_sle) + + def get_future_entries_to_repost(self, kwargs): + return get_stock_ledger_entries(kwargs, ">=", "asc", for_update=True, check_serial_no=False) + + def get_sles_for_repack(self, sle): + return ( frappe.get_all( "Stock Ledger Entry", filters={ @@ -671,16 +709,13 @@ class update_entries_after: "voucher_no": sle.voucher_no, "actual_qty": (">", 0), "is_cancelled": 0, - "voucher_detail_no": ("!=", sle.dependant_sle_voucher_detail_no), + "dependant_sle_voucher_detail_no": ("!=", sle.dependant_sle_voucher_detail_no), }, fields=["*"], ) or [] ) - for dependant_sle in sles: - self.update_distinct_item_warehouses(dependant_sle) - def has_stock_reco_with_serial_batch(self, sle): if ( sle.voucher_type == "Stock Reconciliation" @@ -691,33 +726,11 @@ class update_entries_after: return False def process_sle_against_current_timestamp(self): - sl_entries = self.get_sle_against_current_voucher() + sl_entries = get_sle_against_current_voucher(self.args) for sle in sl_entries: sle["timestamp"] = sle.posting_datetime self.process_sle(sle) - def get_sle_against_current_voucher(self): - self.args["posting_datetime"] = get_combine_datetime(self.args.posting_date, self.args.posting_time) - doctype = frappe.qb.DocType("Stock Ledger Entry") - - query = ( - frappe.qb.from_(doctype) - .select("*") - .where( - (doctype.item_code == self.args.item_code) - & (doctype.warehouse == self.args.warehouse) - & (doctype.is_cancelled == 0) - & (doctype.posting_datetime == self.args.posting_datetime) - ) - .orderby(doctype.creation, order=Order.asc) - .for_update() - ) - - if not self.args.get("cancelled"): - query = query.where(doctype.creation == self.args.creation) - - return query.run(as_dict=True) - def get_future_entries_to_fix(self): # includes current entry! args = self.data[self.args.warehouse].previous_sle or frappe._dict( @@ -797,7 +810,7 @@ class update_entries_after: return self.distinct_item_warehouses[key].dependent_voucher_detail_nos def validate_previous_sle_qty(self, sle): - previous_sle = self.data[sle.warehouse].previous_sle + previous_sle = self.prev_sle_dict.get((sle.item_code, sle.warehouse)) if previous_sle and previous_sle.get("qty_after_transaction") < 0 and sle.get("actual_qty") > 0: frappe.msgprint( _( @@ -816,7 +829,22 @@ class update_entries_after: def process_sle(self, sle): # previous sle data for this warehouse - self.wh_data = self.data[sle.warehouse] + key = (sle.item_code, sle.warehouse) + self.wh_data = self.prev_sle_dict.get(key) or frappe._dict( + { + "qty_after_transaction": 0.0, + "valuation_rate": 0.0, + "stock_value": 0.0, + "prev_stock_value": 0.0, + "stock_queue": [], + } + ) + + if self.wh_data.stock_queue and isinstance(self.wh_data.stock_queue, str): + self.wh_data.stock_queue = json.loads(self.wh_data.stock_queue) + + if not self.wh_data.prev_stock_value: + self.wh_data.prev_stock_value = self.wh_data.stock_value self.validate_previous_sle_qty(sle) self.affected_transactions.add((sle.voucher_type, sle.voucher_no)) @@ -946,6 +974,8 @@ class update_entries_after: sle.modified = now() frappe.get_doc(sle).db_update() + self.prev_sle_dict[key] = sle + if not self.args.get("sle_id") or ( sle.serial_and_batch_bundle and sle.auto_created_serial_and_batch_bundle ): @@ -1728,8 +1758,8 @@ class update_entries_after: def update_bin(self): # update bin for each warehouse - for warehouse, data in self.data.items(): - bin_name = get_or_make_bin(self.item_code, warehouse) + for (item_code, warehouse), data in self.prev_sle_dict.items(): + bin_name = get_or_make_bin(item_code, warehouse) updated_values = {"actual_qty": data.qty_after_transaction, "stock_value": data.stock_value} if data.valuation_rate is not None: @@ -1737,6 +1767,29 @@ class update_entries_after: frappe.db.set_value("Bin", bin_name, updated_values, update_modified=True) +def get_sle_against_current_voucher(kwargs): + kwargs["posting_datetime"] = get_combine_datetime(kwargs.posting_date, kwargs.posting_time) + doctype = frappe.qb.DocType("Stock Ledger Entry") + + query = ( + frappe.qb.from_(doctype) + .select("*") + .where( + (doctype.item_code == kwargs.item_code) + & (doctype.warehouse == kwargs.warehouse) + & (doctype.is_cancelled == 0) + & (doctype.posting_datetime == kwargs.posting_datetime) + ) + .orderby(doctype.creation, order=Order.asc) + .for_update() + ) + + if not kwargs.get("cancelled"): + query = query.where(doctype.creation == kwargs.creation) + + return query.run(as_dict=True) + + def get_previous_sle_of_current_voucher(args, operator="<", exclude_current_voucher=False): """get stock ledger entries filtered by specific posting datetime conditions""" @@ -1893,18 +1946,7 @@ def get_sle_by_voucher_detail_no(voucher_detail_no, excluded_sle=None): return frappe.db.get_value( "Stock Ledger Entry", {"voucher_detail_no": voucher_detail_no, "name": ["!=", excluded_sle], "is_cancelled": 0}, - [ - "item_code", - "warehouse", - "actual_qty", - "qty_after_transaction", - "posting_date", - "posting_time", - "voucher_detail_no", - "posting_datetime as timestamp", - "voucher_type", - "voucher_no", - ], + ["*"], as_dict=1, )