fix: corrected logic to retry reposting if timeout occurs after dependent SLE processing

(cherry picked from commit 90b9ab0bc8)
(cherry picked from commit 8fbb86d53e)

# Conflicts:
#	erpnext/stock/doctype/repost_item_valuation/repost_item_valuation.json
#	erpnext/stock/doctype/repost_item_valuation/repost_item_valuation.py
#	erpnext/stock/stock_ledger.py
This commit is contained in:
Rohit Waghchaure
2026-03-27 19:02:31 +05:30
committed by Mergify
parent 372fc96f0c
commit 466da99595
3 changed files with 362 additions and 1 deletions

View File

@@ -34,7 +34,15 @@
"total_reposting_count",
"current_index",
"gl_reposting_index",
<<<<<<< HEAD
"affected_transactions"
=======
"reposting_data_file",
"vouchers_based_on_item_and_warehouse_section",
"total_vouchers",
"column_break_yqwo",
"vouchers_posted"
>>>>>>> 8fbb86d53e (fix: corrected logic to retry reposting if timeout occurs after dependent SLE processing)
],
"fields": [
{
@@ -247,13 +255,51 @@
"fieldname": "repost_only_accounting_ledgers",
"fieldtype": "Check",
"label": "Repost Only Accounting Ledgers"
<<<<<<< HEAD
=======
},
{
"fieldname": "vouchers_based_on_item_and_warehouse_section",
"fieldtype": "Section Break",
"hidden": 1,
"label": "Reposting Vouchers"
},
{
"fieldname": "total_vouchers",
"fieldtype": "Int",
"label": "Total Ledgers",
"no_copy": 1,
"read_only": 1
},
{
"fieldname": "column_break_yqwo",
"fieldtype": "Column Break"
},
{
"fieldname": "vouchers_posted",
"fieldtype": "Int",
"label": "Ledgers Posted",
"no_copy": 1,
"read_only": 1
},
{
"fieldname": "reposting_data_file",
"fieldtype": "Attach",
"label": "Reposting Data File",
"no_copy": 1,
"read_only": 1
>>>>>>> 8fbb86d53e (fix: corrected logic to retry reposting if timeout occurs after dependent SLE processing)
}
],
"grid_page_length": 50,
"index_web_pages_for_search": 1,
"is_submittable": 1,
"links": [],
<<<<<<< HEAD
"modified": "2026-02-25 14:22:21.681549",
=======
"modified": "2026-03-27 18:59:58.637964",
>>>>>>> 8fbb86d53e (fix: corrected logic to retry reposting if timeout occurs after dependent SLE processing)
"modified_by": "Administrator",
"module": "Stock",
"name": "Repost Item Valuation",

View File

@@ -261,6 +261,11 @@ class RepostItemValuation(Document):
self.items_to_be_repost = None
self.gl_reposting_index = 0
self.total_reposting_count = 0
<<<<<<< HEAD
=======
self.total_vouchers = 0
self.vouchers_posted = 0
>>>>>>> 8fbb86d53e (fix: corrected logic to retry reposting if timeout occurs after dependent SLE processing)
self.clear_attachment()
self.db_update()

View File

@@ -4,6 +4,7 @@
import copy
import gzip
import json
from collections import deque
import frappe
from frappe import _, bold, scrub
@@ -256,6 +257,7 @@ def repost_future_sle(
if doc and doc.reposting_data_file:
reposting_data = get_reposting_data(doc.reposting_data_file)
<<<<<<< HEAD
items_to_be_repost = get_items_to_be_repost(
voucher_type=voucher_type, voucher_no=voucher_no, doc=doc, reposting_data=reposting_data
)
@@ -268,9 +270,21 @@ def repost_future_sle(
i = get_current_index(doc) or 0
while i < len(args):
validate_item_warehouse(args[i])
=======
if doc and doc.reposting_data_file:
reposting_data = get_reposting_data(doc.reposting_data_file)
repost_affected_transaction = get_affected_transactions(doc, reposting_data) or set()
resume_item_wh_wise_last_posted_sle = (
get_item_wh_wise_last_posted_sle_from_reposting_data(doc, reposting_data) or {}
)
if not items_to_be_repost:
return
>>>>>>> 8fbb86d53e (fix: corrected logic to retry reposting if timeout occurs after dependent SLE processing)
obj = update_entries_after(
{
<<<<<<< HEAD
"item_code": args[i].get("item_code"),
"warehouse": args[i].get("warehouse"),
"posting_date": args[i].get("posting_date"),
@@ -279,6 +293,18 @@ def repost_future_sle(
"distinct_item_warehouses": distinct_item_warehouses,
"items_to_be_repost": args,
"current_index": i,
=======
"item_code": items_to_be_repost[index].get("item_code"),
"warehouse": items_to_be_repost[index].get("warehouse"),
"posting_date": items_to_be_repost[index].get("posting_date"),
"posting_time": items_to_be_repost[index].get("posting_time"),
"creation": items_to_be_repost[index].get("creation"),
"current_idx": index,
"items_to_be_repost": items_to_be_repost,
"repost_doc": doc,
"repost_affected_transaction": repost_affected_transaction,
"item_wh_wise_last_posted_sle": resume_item_wh_wise_last_posted_sle,
>>>>>>> 8fbb86d53e (fix: corrected logic to retry reposting if timeout occurs after dependent SLE processing)
},
allow_negative_stock=allow_negative_stock,
via_landed_cost_voucher=via_landed_cost_voucher,
@@ -289,6 +315,7 @@ def repost_future_sle(
if distinct_item_warehouses.get(key):
distinct_item_warehouses[key].reposting_status = True
<<<<<<< HEAD
if obj.new_items_found:
for _item_wh, data in distinct_item_warehouses.items():
if ("args_idx" not in data and not data.reposting_status) or (
@@ -314,6 +341,39 @@ def get_reposting_data(file_path) -> dict:
{
"file_url": file_path,
"attached_to_field": "reposting_data_file",
=======
resume_item_wh_wise_last_posted_sle = {}
repost_affected_transaction.update(obj.repost_affected_transaction)
update_args_in_repost_item_valuation(doc, index, items_to_be_repost, repost_affected_transaction)
def update_args_in_repost_item_valuation(
doc,
index,
items_to_be_repost,
repost_affected_transaction,
item_wh_wise_last_posted_sle=None,
only_affected_transaction=False,
):
file_name = ""
has_file = False
if not item_wh_wise_last_posted_sle:
item_wh_wise_last_posted_sle = {}
if doc.reposting_data_file:
has_file = True
if doc.reposting_data_file:
file_name = get_reposting_file_name(doc.doctype, doc.name)
# frappe.delete_doc("File", file_name, ignore_permissions=True, delete_permanently=True)
doc.reposting_data_file = create_json_gz_file(
{
"repost_affected_transaction": repost_affected_transaction,
"item_wh_wise_last_posted_sle": {str(k): v for k, v in item_wh_wise_last_posted_sle.items()}
or {},
>>>>>>> 8fbb86d53e (fix: corrected logic to retry reposting if timeout occurs after dependent SLE processing)
},
"name",
)
@@ -515,8 +575,50 @@ def get_affected_transactions(doc, reposting_data=None) -> set[tuple[str, str]]:
if not doc.affected_transactions:
return set()
<<<<<<< HEAD
transactions = frappe.parse_json(doc.affected_transactions)
return {tuple(transaction) for transaction in transactions}
=======
def get_item_wh_wise_last_posted_sle_from_reposting_data(doc, reposting_data=None):
if not reposting_data and doc and doc.reposting_data_file:
reposting_data = get_reposting_data(doc.reposting_data_file)
if reposting_data and reposting_data.item_wh_wise_last_posted_sle:
return frappe._dict(reposting_data.item_wh_wise_last_posted_sle)
return frappe._dict()
def get_reposting_data(file_path) -> dict:
file_name = frappe.db.get_value(
"File",
{
"file_url": file_path,
"attached_to_field": "reposting_data_file",
},
"name",
)
if not file_name:
return frappe._dict()
attached_file = frappe.get_doc("File", file_name)
content = attached_file.get_content()
if isinstance(content, str):
content = content.encode("utf-8")
try:
data = gzip.decompress(content)
except Exception:
return frappe._dict()
if data := json.loads(data.decode("utf-8")):
data = data
return parse_json(data)
>>>>>>> 8fbb86d53e (fix: corrected logic to retry reposting if timeout occurs after dependent SLE processing)
def get_current_index(doc=None):
@@ -552,6 +654,13 @@ class update_entries_after:
self.allow_zero_rate = allow_zero_rate
self.via_landed_cost_voucher = via_landed_cost_voucher
self.item_code = args.get("item_code")
<<<<<<< HEAD
=======
self.stock_ledgers_to_repost = []
self.current_idx = args.get("current_idx", 0)
self.repost_doc = args.get("repost_doc") or None
self.items_to_be_repost = args.get("items_to_be_repost") or None
>>>>>>> 8fbb86d53e (fix: corrected logic to retry reposting if timeout occurs after dependent SLE processing)
self.allow_negative_stock = allow_negative_stock or is_negative_stock_allowed(
item_code=self.item_code
@@ -561,6 +670,7 @@ class update_entries_after:
if self.args.sle_id:
self.args["name"] = self.args.sle_id
self.prev_sle_dict = frappe._dict({})
self.company = frappe.get_cached_value("Warehouse", self.args.warehouse, "company")
self.set_precision()
self.valuation_method = get_valuation_method(self.item_code, self.company)
@@ -571,7 +681,14 @@ class update_entries_after:
self.reserved_stock = self.get_reserved_stock()
self.data = frappe._dict()
<<<<<<< HEAD
self.initialize_previous_data(self.args)
=======
if not self.repost_doc or not self.args.get("item_wh_wise_last_posted_sle"):
self.initialize_previous_data(self.args)
>>>>>>> 8fbb86d53e (fix: corrected logic to retry reposting if timeout occurs after dependent SLE processing)
self.build()
def get_reserved_stock(self):
@@ -643,6 +760,7 @@ class update_entries_after:
if not future_sle_exists(self.args):
self.update_bin()
else:
<<<<<<< HEAD
entries_to_fix = self.get_future_entries_to_fix()
i = 0
@@ -658,12 +776,185 @@ class update_entries_after:
if sle.voucher_type == "Stock Entry" and is_repack_entry(sle.voucher_no):
# for repack entries, we need to repost both source and target warehouses
self.update_distinct_item_warehouses_for_repack(sle)
=======
self.item_wh_wise_last_posted_sle = self.get_item_wh_wise_last_posted_sle()
_item_wh_sle = self.sort_sles(self.item_wh_wise_last_posted_sle.values())
while _item_wh_sle:
self.initialize_reposting()
sle_dict = _item_wh_sle.pop(0)
self.repost_stock_ledgers(sle_dict)
self.update_bin()
self.reset_vouchers_and_idx()
self.update_data_in_repost()
>>>>>>> 8fbb86d53e (fix: corrected logic to retry reposting if timeout occurs after dependent SLE processing)
if self.exceptions:
self.raise_exceptions()
<<<<<<< HEAD
def update_distinct_item_warehouses_for_repack(self, sle):
sles = (
=======
def initialize_reposting(self):
self._sles = []
self.distinct_sles = set()
self.distinct_dependant_sle = set()
self.prev_sle_dict = frappe._dict({})
def get_item_wh_wise_last_posted_sle(self):
if self.args and self.args.get("item_wh_wise_last_posted_sle"):
_sles = {}
for key, sle in self.args.get("item_wh_wise_last_posted_sle").items():
_sles[frappe.safe_eval(key)] = frappe._dict(sle)
return _sles
return {
(self.args.item_code, self.args.warehouse): frappe._dict(
{
"item_code": self.args.item_code,
"warehouse": self.args.warehouse,
"posting_datetime": get_combine_datetime(self.args.posting_date, self.args.posting_time),
"posting_date": self.args.posting_date,
"posting_time": self.args.posting_time,
"creation": self.args.creation,
}
)
}
def repost_stock_ledgers(self, sle_dict=None):
self._sles = self.get_future_entries_to_repost(sle_dict)
if not isinstance(self._sles, deque):
self._sles = deque(self._sles)
i = 0
while self._sles:
sle = self._sles.popleft()
i += 1
if sle.name in self.distinct_sles:
continue
item_wh_key = (sle.item_code, sle.warehouse)
if item_wh_key not in self.prev_sle_dict:
self.prev_sle_dict[item_wh_key] = get_previous_sle_of_current_voucher(sle)
if (
sle.dependant_sle_voucher_detail_no
and sle.dependant_sle_voucher_detail_no not in self.distinct_dependant_sle
):
self._sles.append(sle)
self.distinct_dependant_sle.add(sle.dependant_sle_voucher_detail_no)
self.include_dependant_sle_in_reposting(sle)
continue
self.repost_stock_ledger_entry(sle)
# To avoid duplicate reposting of same sle in case of multiple dependant sle
self.distinct_sles.add(sle.name)
# if i % 1000 == 0:
self.update_data_in_repost(len(self._sles), i)
def sort_sles(self, sles):
return sorted(
sles,
key=lambda d: (
get_datetime(d.posting_datetime),
get_datetime(d.creation),
),
)
def include_dependant_sle_in_reposting(self, sle):
if sle.voucher_type == "Stock Entry" and is_repack_entry(sle.voucher_no):
repack_sles = self.get_sles_for_repack(sle)
for repack_sle in repack_sles:
self._sles.extend(self.get_future_entries_to_repost(repack_sle))
else:
dependant_sles = get_sle_by_voucher_detail_no(sle.dependant_sle_voucher_detail_no)
for depend_sle in dependant_sles:
self._sles.extend(self.get_future_entries_to_repost(depend_sle))
self._sles = deque(self.sort_sles(self._sles))
def repost_stock_ledger_entry(self, sle):
if self.args.item_code != sle.item_code or self.args.warehouse != sle.warehouse:
self.repost_affected_transaction.add((sle.voucher_type, sle.voucher_no))
if isinstance(sle, dict):
sle = frappe._dict(sle)
self.process_sle(sle)
self.update_item_wh_wise_last_posted_sle(sle)
def update_item_wh_wise_last_posted_sle(self, sle):
if not self._sles:
self.item_wh_wise_last_posted_sle = frappe._dict()
return
self.item_wh_wise_last_posted_sle[(sle.item_code, sle.warehouse)] = frappe._dict(
{
"item_code": sle.item_code,
"warehouse": sle.warehouse,
"posting_date": sle.posting_date,
"posting_time": sle.posting_time,
"posting_datetime": sle.posting_datetime
or get_combine_datetime(sle.posting_date, sle.posting_time),
"creation": sle.creation,
}
)
def reset_vouchers_and_idx(self):
self.stock_ledgers_to_repost = []
self.prev_sle_dict = frappe._dict()
self.item_wh_wise_last_posted_sle = frappe._dict()
def update_data_in_repost(self, total_sles=None, index=None):
if not self.repost_doc:
return
values_to_update = {
"total_vouchers": cint(total_sles) + cint(index),
"vouchers_posted": index or 0,
}
self.repost_doc.db_set(values_to_update)
update_args_in_repost_item_valuation(
self.repost_doc,
self.current_idx,
self.items_to_be_repost,
self.repost_affected_transaction,
self.item_wh_wise_last_posted_sle,
only_affected_transaction=True,
)
if not frappe.in_test:
# To maintain the state of the reposting, so if timeout happens, it can be resumed from the last posted voucher
frappe.db.commit() # nosemgrep
self.publish_real_time_progress(total_sles=total_sles, index=index)
def publish_real_time_progress(self, total_sles=None, index=None):
frappe.publish_realtime(
"item_reposting_progress",
{
"name": self.repost_doc.name,
"total_vouchers": cint(total_sles) + cint(index),
"vouchers_posted": index or 0,
},
doctype=self.repost_doc.doctype,
docname=self.repost_doc.name,
)
def get_future_entries_to_repost(self, kwargs):
return get_stock_ledger_entries(kwargs, ">=", "asc", for_update=True, check_serial_no=False)
def get_sles_for_repack(self, sle):
return (
>>>>>>> 8fbb86d53e (fix: corrected logic to retry reposting if timeout occurs after dependent SLE processing)
frappe.get_all(
"Stock Ledger Entry",
filters={
@@ -673,7 +964,14 @@ class update_entries_after:
"is_cancelled": 0,
"voucher_detail_no": ("!=", sle.dependant_sle_voucher_detail_no),
},
fields=["*"],
fields=[
"item_code",
"warehouse",
"posting_date",
"posting_time",
"posting_datetime",
"creation",
],
)
or []
)
@@ -1889,6 +2187,7 @@ def get_stock_ledger_entries(
)
<<<<<<< HEAD
def get_sle_by_voucher_detail_no(voucher_detail_no, excluded_sle=None):
return frappe.db.get_value(
"Stock Ledger Entry",
@@ -1906,6 +2205,17 @@ def get_sle_by_voucher_detail_no(voucher_detail_no, excluded_sle=None):
"voucher_no",
],
as_dict=1,
=======
def get_sle_by_voucher_detail_no(voucher_detail_no):
return frappe.get_all(
"Stock Ledger Entry",
filters={
"voucher_detail_no": voucher_detail_no,
"is_cancelled": 0,
"dependant_sle_voucher_detail_no": ("is", "not set"),
},
fields=["item_code", "warehouse", "posting_date", "posting_time", "posting_datetime", "creation"],
>>>>>>> 8fbb86d53e (fix: corrected logic to retry reposting if timeout occurs after dependent SLE processing)
)