diff --git a/erpnext/stock/doctype/repost_item_valuation/repost_item_valuation.json b/erpnext/stock/doctype/repost_item_valuation/repost_item_valuation.json index e1ae6d00cd9..7bdc4e2f462 100644 --- a/erpnext/stock/doctype/repost_item_valuation/repost_item_valuation.json +++ b/erpnext/stock/doctype/repost_item_valuation/repost_item_valuation.json @@ -34,7 +34,15 @@ "total_reposting_count", "current_index", "gl_reposting_index", +<<<<<<< HEAD "affected_transactions" +======= + "reposting_data_file", + "vouchers_based_on_item_and_warehouse_section", + "total_vouchers", + "column_break_yqwo", + "vouchers_posted" +>>>>>>> 8fbb86d53e (fix: corrected logic to retry reposting if timeout occurs after dependent SLE processing) ], "fields": [ { @@ -247,13 +255,51 @@ "fieldname": "repost_only_accounting_ledgers", "fieldtype": "Check", "label": "Repost Only Accounting Ledgers" +<<<<<<< HEAD +======= + }, + { + "fieldname": "vouchers_based_on_item_and_warehouse_section", + "fieldtype": "Section Break", + "hidden": 1, + "label": "Reposting Vouchers" + }, + { + "fieldname": "total_vouchers", + "fieldtype": "Int", + "label": "Total Ledgers", + "no_copy": 1, + "read_only": 1 + }, + { + "fieldname": "column_break_yqwo", + "fieldtype": "Column Break" + }, + { + "fieldname": "vouchers_posted", + "fieldtype": "Int", + "label": "Ledgers Posted", + "no_copy": 1, + "read_only": 1 + }, + { + "fieldname": "reposting_data_file", + "fieldtype": "Attach", + "label": "Reposting Data File", + "no_copy": 1, + "read_only": 1 +>>>>>>> 8fbb86d53e (fix: corrected logic to retry reposting if timeout occurs after dependent SLE processing) } ], "grid_page_length": 50, "index_web_pages_for_search": 1, "is_submittable": 1, "links": [], +<<<<<<< HEAD "modified": "2026-02-25 14:22:21.681549", +======= + "modified": "2026-03-27 18:59:58.637964", +>>>>>>> 8fbb86d53e (fix: corrected logic to retry reposting if timeout occurs after dependent SLE processing) "modified_by": "Administrator", "module": "Stock", "name": "Repost Item Valuation", diff --git a/erpnext/stock/doctype/repost_item_valuation/repost_item_valuation.py b/erpnext/stock/doctype/repost_item_valuation/repost_item_valuation.py index f5b4ef3e8f5..9220dcaf5ee 100644 --- a/erpnext/stock/doctype/repost_item_valuation/repost_item_valuation.py +++ b/erpnext/stock/doctype/repost_item_valuation/repost_item_valuation.py @@ -261,6 +261,11 @@ class RepostItemValuation(Document): self.items_to_be_repost = None self.gl_reposting_index = 0 self.total_reposting_count = 0 +<<<<<<< HEAD +======= + self.total_vouchers = 0 + self.vouchers_posted = 0 +>>>>>>> 8fbb86d53e (fix: corrected logic to retry reposting if timeout occurs after dependent SLE processing) self.clear_attachment() self.db_update() diff --git a/erpnext/stock/stock_ledger.py b/erpnext/stock/stock_ledger.py index 1e6cec59a5c..f24131fd91e 100644 --- a/erpnext/stock/stock_ledger.py +++ b/erpnext/stock/stock_ledger.py @@ -4,6 +4,7 @@ import copy import gzip import json +from collections import deque import frappe from frappe import _, bold, scrub @@ -256,6 +257,7 @@ def repost_future_sle( if doc and doc.reposting_data_file: reposting_data = get_reposting_data(doc.reposting_data_file) +<<<<<<< HEAD items_to_be_repost = get_items_to_be_repost( voucher_type=voucher_type, voucher_no=voucher_no, doc=doc, reposting_data=reposting_data ) @@ -268,9 +270,21 @@ def repost_future_sle( i = get_current_index(doc) or 0 while i < len(args): validate_item_warehouse(args[i]) +======= + if doc and doc.reposting_data_file: + reposting_data = get_reposting_data(doc.reposting_data_file) + + repost_affected_transaction = get_affected_transactions(doc, reposting_data) or set() + resume_item_wh_wise_last_posted_sle = ( + get_item_wh_wise_last_posted_sle_from_reposting_data(doc, reposting_data) or {} + ) + if not items_to_be_repost: + return +>>>>>>> 8fbb86d53e (fix: corrected logic to retry reposting if timeout occurs after dependent SLE processing) obj = update_entries_after( { +<<<<<<< HEAD "item_code": args[i].get("item_code"), "warehouse": args[i].get("warehouse"), "posting_date": args[i].get("posting_date"), @@ -279,6 +293,18 @@ def repost_future_sle( "distinct_item_warehouses": distinct_item_warehouses, "items_to_be_repost": args, "current_index": i, +======= + "item_code": items_to_be_repost[index].get("item_code"), + "warehouse": items_to_be_repost[index].get("warehouse"), + "posting_date": items_to_be_repost[index].get("posting_date"), + "posting_time": items_to_be_repost[index].get("posting_time"), + "creation": items_to_be_repost[index].get("creation"), + "current_idx": index, + "items_to_be_repost": items_to_be_repost, + "repost_doc": doc, + "repost_affected_transaction": repost_affected_transaction, + "item_wh_wise_last_posted_sle": resume_item_wh_wise_last_posted_sle, +>>>>>>> 8fbb86d53e (fix: corrected logic to retry reposting if timeout occurs after dependent SLE processing) }, allow_negative_stock=allow_negative_stock, via_landed_cost_voucher=via_landed_cost_voucher, @@ -289,6 +315,7 @@ def repost_future_sle( if distinct_item_warehouses.get(key): distinct_item_warehouses[key].reposting_status = True +<<<<<<< HEAD if obj.new_items_found: for _item_wh, data in distinct_item_warehouses.items(): if ("args_idx" not in data and not data.reposting_status) or ( @@ -314,6 +341,39 @@ def get_reposting_data(file_path) -> dict: { "file_url": file_path, "attached_to_field": "reposting_data_file", +======= + resume_item_wh_wise_last_posted_sle = {} + repost_affected_transaction.update(obj.repost_affected_transaction) + update_args_in_repost_item_valuation(doc, index, items_to_be_repost, repost_affected_transaction) + + +def update_args_in_repost_item_valuation( + doc, + index, + items_to_be_repost, + repost_affected_transaction, + item_wh_wise_last_posted_sle=None, + only_affected_transaction=False, +): + file_name = "" + has_file = False + + if not item_wh_wise_last_posted_sle: + item_wh_wise_last_posted_sle = {} + + if doc.reposting_data_file: + has_file = True + + if doc.reposting_data_file: + file_name = get_reposting_file_name(doc.doctype, doc.name) + # frappe.delete_doc("File", file_name, ignore_permissions=True, delete_permanently=True) + + doc.reposting_data_file = create_json_gz_file( + { + "repost_affected_transaction": repost_affected_transaction, + "item_wh_wise_last_posted_sle": {str(k): v for k, v in item_wh_wise_last_posted_sle.items()} + or {}, +>>>>>>> 8fbb86d53e (fix: corrected logic to retry reposting if timeout occurs after dependent SLE processing) }, "name", ) @@ -515,8 +575,50 @@ def get_affected_transactions(doc, reposting_data=None) -> set[tuple[str, str]]: if not doc.affected_transactions: return set() +<<<<<<< HEAD transactions = frappe.parse_json(doc.affected_transactions) return {tuple(transaction) for transaction in transactions} +======= + +def get_item_wh_wise_last_posted_sle_from_reposting_data(doc, reposting_data=None): + if not reposting_data and doc and doc.reposting_data_file: + reposting_data = get_reposting_data(doc.reposting_data_file) + + if reposting_data and reposting_data.item_wh_wise_last_posted_sle: + return frappe._dict(reposting_data.item_wh_wise_last_posted_sle) + + return frappe._dict() + + +def get_reposting_data(file_path) -> dict: + file_name = frappe.db.get_value( + "File", + { + "file_url": file_path, + "attached_to_field": "reposting_data_file", + }, + "name", + ) + + if not file_name: + return frappe._dict() + + attached_file = frappe.get_doc("File", file_name) + + content = attached_file.get_content() + if isinstance(content, str): + content = content.encode("utf-8") + + try: + data = gzip.decompress(content) + except Exception: + return frappe._dict() + + if data := json.loads(data.decode("utf-8")): + data = data + + return parse_json(data) +>>>>>>> 8fbb86d53e (fix: corrected logic to retry reposting if timeout occurs after dependent SLE processing) def get_current_index(doc=None): @@ -552,6 +654,13 @@ class update_entries_after: self.allow_zero_rate = allow_zero_rate self.via_landed_cost_voucher = via_landed_cost_voucher self.item_code = args.get("item_code") +<<<<<<< HEAD +======= + self.stock_ledgers_to_repost = [] + self.current_idx = args.get("current_idx", 0) + self.repost_doc = args.get("repost_doc") or None + self.items_to_be_repost = args.get("items_to_be_repost") or None +>>>>>>> 8fbb86d53e (fix: corrected logic to retry reposting if timeout occurs after dependent SLE processing) self.allow_negative_stock = allow_negative_stock or is_negative_stock_allowed( item_code=self.item_code @@ -561,6 +670,7 @@ class update_entries_after: if self.args.sle_id: self.args["name"] = self.args.sle_id + self.prev_sle_dict = frappe._dict({}) self.company = frappe.get_cached_value("Warehouse", self.args.warehouse, "company") self.set_precision() self.valuation_method = get_valuation_method(self.item_code, self.company) @@ -571,7 +681,14 @@ class update_entries_after: self.reserved_stock = self.get_reserved_stock() self.data = frappe._dict() +<<<<<<< HEAD self.initialize_previous_data(self.args) +======= + + if not self.repost_doc or not self.args.get("item_wh_wise_last_posted_sle"): + self.initialize_previous_data(self.args) + +>>>>>>> 8fbb86d53e (fix: corrected logic to retry reposting if timeout occurs after dependent SLE processing) self.build() def get_reserved_stock(self): @@ -643,6 +760,7 @@ class update_entries_after: if not future_sle_exists(self.args): self.update_bin() else: +<<<<<<< HEAD entries_to_fix = self.get_future_entries_to_fix() i = 0 @@ -658,12 +776,185 @@ class update_entries_after: if sle.voucher_type == "Stock Entry" and is_repack_entry(sle.voucher_no): # for repack entries, we need to repost both source and target warehouses self.update_distinct_item_warehouses_for_repack(sle) +======= + self.item_wh_wise_last_posted_sle = self.get_item_wh_wise_last_posted_sle() + _item_wh_sle = self.sort_sles(self.item_wh_wise_last_posted_sle.values()) + + while _item_wh_sle: + self.initialize_reposting() + sle_dict = _item_wh_sle.pop(0) + self.repost_stock_ledgers(sle_dict) + + self.update_bin() + self.reset_vouchers_and_idx() + self.update_data_in_repost() +>>>>>>> 8fbb86d53e (fix: corrected logic to retry reposting if timeout occurs after dependent SLE processing) if self.exceptions: self.raise_exceptions() +<<<<<<< HEAD def update_distinct_item_warehouses_for_repack(self, sle): sles = ( +======= + def initialize_reposting(self): + self._sles = [] + self.distinct_sles = set() + self.distinct_dependant_sle = set() + self.prev_sle_dict = frappe._dict({}) + + def get_item_wh_wise_last_posted_sle(self): + if self.args and self.args.get("item_wh_wise_last_posted_sle"): + _sles = {} + for key, sle in self.args.get("item_wh_wise_last_posted_sle").items(): + _sles[frappe.safe_eval(key)] = frappe._dict(sle) + + return _sles + + return { + (self.args.item_code, self.args.warehouse): frappe._dict( + { + "item_code": self.args.item_code, + "warehouse": self.args.warehouse, + "posting_datetime": get_combine_datetime(self.args.posting_date, self.args.posting_time), + "posting_date": self.args.posting_date, + "posting_time": self.args.posting_time, + "creation": self.args.creation, + } + ) + } + + def repost_stock_ledgers(self, sle_dict=None): + self._sles = self.get_future_entries_to_repost(sle_dict) + + if not isinstance(self._sles, deque): + self._sles = deque(self._sles) + + i = 0 + while self._sles: + sle = self._sles.popleft() + i += 1 + if sle.name in self.distinct_sles: + continue + + item_wh_key = (sle.item_code, sle.warehouse) + if item_wh_key not in self.prev_sle_dict: + self.prev_sle_dict[item_wh_key] = get_previous_sle_of_current_voucher(sle) + + if ( + sle.dependant_sle_voucher_detail_no + and sle.dependant_sle_voucher_detail_no not in self.distinct_dependant_sle + ): + self._sles.append(sle) + self.distinct_dependant_sle.add(sle.dependant_sle_voucher_detail_no) + self.include_dependant_sle_in_reposting(sle) + continue + + self.repost_stock_ledger_entry(sle) + + # To avoid duplicate reposting of same sle in case of multiple dependant sle + self.distinct_sles.add(sle.name) + + # if i % 1000 == 0: + self.update_data_in_repost(len(self._sles), i) + + def sort_sles(self, sles): + return sorted( + sles, + key=lambda d: ( + get_datetime(d.posting_datetime), + get_datetime(d.creation), + ), + ) + + def include_dependant_sle_in_reposting(self, sle): + if sle.voucher_type == "Stock Entry" and is_repack_entry(sle.voucher_no): + repack_sles = self.get_sles_for_repack(sle) + for repack_sle in repack_sles: + self._sles.extend(self.get_future_entries_to_repost(repack_sle)) + else: + dependant_sles = get_sle_by_voucher_detail_no(sle.dependant_sle_voucher_detail_no) + for depend_sle in dependant_sles: + self._sles.extend(self.get_future_entries_to_repost(depend_sle)) + + self._sles = deque(self.sort_sles(self._sles)) + + def repost_stock_ledger_entry(self, sle): + if self.args.item_code != sle.item_code or self.args.warehouse != sle.warehouse: + self.repost_affected_transaction.add((sle.voucher_type, sle.voucher_no)) + + if isinstance(sle, dict): + sle = frappe._dict(sle) + + self.process_sle(sle) + self.update_item_wh_wise_last_posted_sle(sle) + + def update_item_wh_wise_last_posted_sle(self, sle): + if not self._sles: + self.item_wh_wise_last_posted_sle = frappe._dict() + return + + self.item_wh_wise_last_posted_sle[(sle.item_code, sle.warehouse)] = frappe._dict( + { + "item_code": sle.item_code, + "warehouse": sle.warehouse, + "posting_date": sle.posting_date, + "posting_time": sle.posting_time, + "posting_datetime": sle.posting_datetime + or get_combine_datetime(sle.posting_date, sle.posting_time), + "creation": sle.creation, + } + ) + + def reset_vouchers_and_idx(self): + self.stock_ledgers_to_repost = [] + self.prev_sle_dict = frappe._dict() + self.item_wh_wise_last_posted_sle = frappe._dict() + + def update_data_in_repost(self, total_sles=None, index=None): + if not self.repost_doc: + return + + values_to_update = { + "total_vouchers": cint(total_sles) + cint(index), + "vouchers_posted": index or 0, + } + + self.repost_doc.db_set(values_to_update) + + update_args_in_repost_item_valuation( + self.repost_doc, + self.current_idx, + self.items_to_be_repost, + self.repost_affected_transaction, + self.item_wh_wise_last_posted_sle, + only_affected_transaction=True, + ) + + if not frappe.in_test: + # To maintain the state of the reposting, so if timeout happens, it can be resumed from the last posted voucher + frappe.db.commit() # nosemgrep + + self.publish_real_time_progress(total_sles=total_sles, index=index) + + def publish_real_time_progress(self, total_sles=None, index=None): + frappe.publish_realtime( + "item_reposting_progress", + { + "name": self.repost_doc.name, + "total_vouchers": cint(total_sles) + cint(index), + "vouchers_posted": index or 0, + }, + doctype=self.repost_doc.doctype, + docname=self.repost_doc.name, + ) + + def get_future_entries_to_repost(self, kwargs): + return get_stock_ledger_entries(kwargs, ">=", "asc", for_update=True, check_serial_no=False) + + def get_sles_for_repack(self, sle): + return ( +>>>>>>> 8fbb86d53e (fix: corrected logic to retry reposting if timeout occurs after dependent SLE processing) frappe.get_all( "Stock Ledger Entry", filters={ @@ -673,7 +964,14 @@ class update_entries_after: "is_cancelled": 0, "voucher_detail_no": ("!=", sle.dependant_sle_voucher_detail_no), }, - fields=["*"], + fields=[ + "item_code", + "warehouse", + "posting_date", + "posting_time", + "posting_datetime", + "creation", + ], ) or [] ) @@ -1889,6 +2187,7 @@ def get_stock_ledger_entries( ) +<<<<<<< HEAD def get_sle_by_voucher_detail_no(voucher_detail_no, excluded_sle=None): return frappe.db.get_value( "Stock Ledger Entry", @@ -1906,6 +2205,17 @@ def get_sle_by_voucher_detail_no(voucher_detail_no, excluded_sle=None): "voucher_no", ], as_dict=1, +======= +def get_sle_by_voucher_detail_no(voucher_detail_no): + return frappe.get_all( + "Stock Ledger Entry", + filters={ + "voucher_detail_no": voucher_detail_no, + "is_cancelled": 0, + "dependant_sle_voucher_detail_no": ("is", "not set"), + }, + fields=["item_code", "warehouse", "posting_date", "posting_time", "posting_datetime", "creation"], +>>>>>>> 8fbb86d53e (fix: corrected logic to retry reposting if timeout occurs after dependent SLE processing) )