fix: corrected logic to retry reposting if timeout occurs after dependent SLE processing

(cherry picked from commit 90b9ab0bc8)
This commit is contained in:
Rohit Waghchaure
2026-03-27 19:02:31 +05:30
committed by Mergify
parent 1c1369fea8
commit 8fbb86d53e
3 changed files with 159 additions and 187 deletions

View File

@@ -32,18 +32,11 @@
"total_reposting_count",
"current_index",
"gl_reposting_index",
"reposting_data_file",
"vouchers_based_on_item_and_warehouse_section",
"total_vouchers",
"column_break_yqwo",
"vouchers_posted",
"last_sle_posted_section",
"reposted_item_code",
"reposted_warehouse",
"reposting_data_file",
"column_break_miwc",
"sle_posting_date",
"sle_posting_time",
"reposted_sle_creation"
"vouchers_posted"
],
"fields": [
{
@@ -236,6 +229,7 @@
{
"fieldname": "vouchers_based_on_item_and_warehouse_section",
"fieldtype": "Section Break",
"hidden": 1,
"label": "Reposting Vouchers"
},
{
@@ -256,52 +250,6 @@
"no_copy": 1,
"read_only": 1
},
{
"fieldname": "last_sle_posted_section",
"fieldtype": "Section Break",
"label": "Last SLE Posted"
},
{
"fieldname": "reposted_item_code",
"fieldtype": "Link",
"label": "Reposted Item Code",
"no_copy": 1,
"options": "Item",
"read_only": 1
},
{
"fieldname": "reposted_warehouse",
"fieldtype": "Link",
"label": "Reposted Warehouse",
"no_copy": 1,
"options": "Warehouse",
"read_only": 1
},
{
"fieldname": "column_break_miwc",
"fieldtype": "Column Break"
},
{
"fieldname": "reposted_sle_creation",
"fieldtype": "Datetime",
"label": "Reposted SLE Creation",
"no_copy": 1,
"read_only": 1
},
{
"fieldname": "sle_posting_date",
"fieldtype": "Date",
"label": "SLE Posting Date",
"no_copy": 1,
"read_only": 1
},
{
"fieldname": "sle_posting_time",
"fieldtype": "Time",
"label": "SLE Posting Time",
"no_copy": 1,
"read_only": 1
},
{
"fieldname": "reposting_data_file",
"fieldtype": "Attach",
@@ -314,7 +262,7 @@
"index_web_pages_for_search": 1,
"is_submittable": 1,
"links": [],
"modified": "2026-03-26 13:52:51.895504",
"modified": "2026-03-27 18:59:58.637964",
"modified_by": "Administrator",
"module": "Stock",
"name": "Repost Item Valuation",

View File

@@ -49,13 +49,8 @@ class RepostItemValuation(Document):
posting_time: DF.Time | None
recreate_stock_ledgers: DF.Check
repost_only_accounting_ledgers: DF.Check
reposted_item_code: DF.Link | None
reposted_sle_creation: DF.Datetime | None
reposted_warehouse: DF.Link | None
reposting_data_file: DF.Attach | None
reposting_reference: DF.Data | None
sle_posting_date: DF.Date | None
sle_posting_time: DF.Time | None
status: DF.Literal["Queued", "In Progress", "Completed", "Skipped", "Failed", "Cancelled"]
total_reposting_count: DF.Int
total_vouchers: DF.Int
@@ -268,11 +263,6 @@ class RepostItemValuation(Document):
self.total_reposting_count = 0
self.total_vouchers = 0
self.vouchers_posted = 0
self.reposted_item_code = None
self.reposted_warehouse = None
self.sle_posting_date = None
self.sle_posting_time = None
self.reposted_sle_creation = None
self.clear_attachment()
self.db_update()

View File

@@ -4,6 +4,7 @@
import copy
import gzip
import json
from collections import deque
import frappe
from frappe import _, bold, scrub
@@ -224,7 +225,13 @@ def repost_future_sle(
voucher_type=voucher_type, voucher_no=voucher_no, doc=doc, reposting_data=reposting_data
)
repost_affected_transaction = get_affected_transactions(doc) or set()
if doc and doc.reposting_data_file:
reposting_data = get_reposting_data(doc.reposting_data_file)
repost_affected_transaction = get_affected_transactions(doc, reposting_data) or set()
resume_item_wh_wise_last_posted_sle = (
get_item_wh_wise_last_posted_sle_from_reposting_data(doc, reposting_data) or {}
)
if not items_to_be_repost:
return
@@ -241,6 +248,7 @@ def repost_future_sle(
"items_to_be_repost": items_to_be_repost,
"repost_doc": doc,
"repost_affected_transaction": repost_affected_transaction,
"item_wh_wise_last_posted_sle": resume_item_wh_wise_last_posted_sle,
},
allow_negative_stock=allow_negative_stock,
via_landed_cost_voucher=via_landed_cost_voucher,
@@ -248,15 +256,25 @@ def repost_future_sle(
index += 1
resume_item_wh_wise_last_posted_sle = {}
repost_affected_transaction.update(obj.repost_affected_transaction)
update_args_in_repost_item_valuation(doc, index, items_to_be_repost, repost_affected_transaction)
def update_args_in_repost_item_valuation(
doc, index, items_to_be_repost, repost_affected_transaction, only_affected_transaction=False
doc,
index,
items_to_be_repost,
repost_affected_transaction,
item_wh_wise_last_posted_sle=None,
only_affected_transaction=False,
):
file_name = ""
has_file = False
if not item_wh_wise_last_posted_sle:
item_wh_wise_last_posted_sle = {}
if doc.reposting_data_file:
has_file = True
@@ -267,6 +285,8 @@ def update_args_in_repost_item_valuation(
doc.reposting_data_file = create_json_gz_file(
{
"repost_affected_transaction": repost_affected_transaction,
"item_wh_wise_last_posted_sle": {str(k): v for k, v in item_wh_wise_last_posted_sle.items()}
or {},
},
doc,
file_name,
@@ -385,6 +405,16 @@ def get_affected_transactions(doc, reposting_data=None) -> set[tuple[str, str]]:
return set()
def get_item_wh_wise_last_posted_sle_from_reposting_data(doc, reposting_data=None):
if not reposting_data and doc and doc.reposting_data_file:
reposting_data = get_reposting_data(doc.reposting_data_file)
if reposting_data and reposting_data.item_wh_wise_last_posted_sle:
return frappe._dict(reposting_data.item_wh_wise_last_posted_sle)
return frappe._dict()
def get_reposting_data(file_path) -> dict:
file_name = frappe.db.get_value(
"File",
@@ -448,7 +478,6 @@ class update_entries_after:
self.allow_zero_rate = allow_zero_rate
self.via_landed_cost_voucher = via_landed_cost_voucher
self.item_code = args.get("item_code")
self.prev_sle_dict = frappe._dict({})
self.stock_ledgers_to_repost = []
self.current_idx = args.get("current_idx", 0)
self.repost_doc = args.get("repost_doc") or None
@@ -462,6 +491,7 @@ class update_entries_after:
if self.args.sle_id:
self.args["name"] = self.args.sle_id
self.prev_sle_dict = frappe._dict({})
self.company = frappe.get_cached_value("Warehouse", self.args.warehouse, "company")
self.set_precision()
self.valuation_method = get_valuation_method(self.item_code, self.company)
@@ -472,7 +502,7 @@ class update_entries_after:
self.data = frappe._dict()
if not self.repost_doc or not self.repost_doc.reposted_item_code:
if not self.repost_doc or not self.args.get("item_wh_wise_last_posted_sle"):
self.initialize_previous_data(self.args)
self.build()
@@ -553,12 +583,14 @@ class update_entries_after:
if not future_sle_exists(self.args):
self.update_bin()
else:
ledgers_to_repost = self.get_sles_to_repost()
if not ledgers_to_repost:
return
self.item_wh_wise_last_posted_sle = self.get_item_wh_wise_last_posted_sle()
_item_wh_sle = self.sort_sles(self.item_wh_wise_last_posted_sle.values())
while _item_wh_sle:
self.initialize_reposting()
sle_dict = _item_wh_sle.pop(0)
self.repost_stock_ledgers(sle_dict)
self.stock_ledgers_to_repost = ledgers_to_repost
self.repost_stock_ledger_entries()
self.update_bin()
self.reset_vouchers_and_idx()
self.update_data_in_repost()
@@ -566,134 +598,129 @@ class update_entries_after:
if self.exceptions:
self.raise_exceptions()
def get_sles_to_repost(self):
self.distinct_item_wh_sles = frappe._dict()
def initialize_reposting(self):
self._sles = []
self.distinct_sles = set()
self.distinct_dependant_sle = set()
self.prev_sle_dict = frappe._dict({})
sle_dict = self.get_items_to_be_repost()
self.prepare_sles_to_repost(sle_dict)
if not self.distinct_item_wh_sles:
return []
def get_item_wh_wise_last_posted_sle(self):
if self.args and self.args.get("item_wh_wise_last_posted_sle"):
_sles = {}
for key, sle in self.args.get("item_wh_wise_last_posted_sle").items():
_sles[frappe.safe_eval(key)] = frappe._dict(sle)
ledgers_to_repost = sorted(
(row for rows in self.distinct_item_wh_sles.values() for row in rows),
key=lambda d: (get_datetime(d.get("posting_datetime")), get_datetime(d.get("creation"))),
)
return _sles
return ledgers_to_repost
def prepare_sles_to_repost(self, sle_dict):
sles = self.get_future_entries_to_repost(sle_dict)
for sle in sles:
item_wh_key = (sle.item_code, sle.warehouse)
if item_wh_key not in self.prev_sle_dict:
prev_sle = get_previous_sle_of_current_voucher(sle)
self.prev_sle_dict[item_wh_key] = prev_sle
key = (sle.item_code, sle.warehouse, sle.voucher_detail_no, sle.name)
if key not in self.distinct_item_wh_sles:
self.distinct_item_wh_sles.setdefault(key, []).append(sle)
if sle.dependant_sle_voucher_detail_no:
self.prepare_dependent_sles_to_repost(sle)
def prepare_dependent_sles_to_repost(self, sle):
if sle.voucher_type == "Stock Entry" and is_repack_entry(sle.voucher_no):
sles = self.get_sles_for_repack(sle)
for repack_sle in sles:
key = (
repack_sle.item_code,
repack_sle.warehouse,
repack_sle.voucher_detail_no,
repack_sle.name,
)
if key not in self.distinct_item_wh_sles:
self.distinct_item_wh_sles.setdefault(key, []).append(repack_sle)
self.prepare_sles_to_repost(repack_sle)
elif sle.dependant_sle_voucher_detail_no:
dependant_sle = get_sle_by_voucher_detail_no(sle.dependant_sle_voucher_detail_no)
if not dependant_sle:
return
key = (
dependant_sle.item_code,
dependant_sle.warehouse,
dependant_sle.voucher_detail_no,
dependant_sle.name,
)
if key not in self.distinct_item_wh_sles:
self.distinct_item_wh_sles.setdefault(key, []).append(dependant_sle)
self.prepare_sles_to_repost(dependant_sle)
def get_items_to_be_repost(self):
if self.repost_doc and self.repost_doc.reposted_item_code:
return frappe._dict(
return {
(self.args.item_code, self.args.warehouse): frappe._dict(
{
"item_code": self.repost_doc.reposted_item_code,
"warehouse": self.repost_doc.reposted_warehouse,
"posting_date": self.repost_doc.sle_posting_date,
"posting_time": self.repost_doc.sle_posting_time,
"creation": self.repost_doc.reposted_sle_creation,
"item_code": self.args.item_code,
"warehouse": self.args.warehouse,
"posting_datetime": get_combine_datetime(self.args.posting_date, self.args.posting_time),
"posting_date": self.args.posting_date,
"posting_time": self.args.posting_time,
"creation": self.args.creation,
}
)
}
return frappe._dict(
{
"item_code": self.args.item_code,
"warehouse": self.args.warehouse,
"posting_date": self.args.posting_date,
"posting_time": self.args.posting_time,
"creation": self.args.creation,
}
def repost_stock_ledgers(self, sle_dict=None):
self._sles = self.get_future_entries_to_repost(sle_dict)
if not isinstance(self._sles, deque):
self._sles = deque(self._sles)
i = 0
while self._sles:
sle = self._sles.popleft()
i += 1
if sle.name in self.distinct_sles:
continue
item_wh_key = (sle.item_code, sle.warehouse)
if item_wh_key not in self.prev_sle_dict:
self.prev_sle_dict[item_wh_key] = get_previous_sle_of_current_voucher(sle)
if (
sle.dependant_sle_voucher_detail_no
and sle.dependant_sle_voucher_detail_no not in self.distinct_dependant_sle
):
self._sles.append(sle)
self.distinct_dependant_sle.add(sle.dependant_sle_voucher_detail_no)
self.include_dependant_sle_in_reposting(sle)
continue
self.repost_stock_ledger_entry(sle)
# To avoid duplicate reposting of same sle in case of multiple dependant sle
self.distinct_sles.add(sle.name)
# if i % 1000 == 0:
self.update_data_in_repost(len(self._sles), i)
def sort_sles(self, sles):
return sorted(
sles,
key=lambda d: (
get_datetime(d.posting_datetime),
get_datetime(d.creation),
),
)
def repost_stock_ledger_entries(self):
i = 0
while self.stock_ledgers_to_repost:
sle = self.stock_ledgers_to_repost.pop(0)
def include_dependant_sle_in_reposting(self, sle):
if sle.voucher_type == "Stock Entry" and is_repack_entry(sle.voucher_no):
repack_sles = self.get_sles_for_repack(sle)
for repack_sle in repack_sles:
self._sles.extend(self.get_future_entries_to_repost(repack_sle))
else:
dependant_sles = get_sle_by_voucher_detail_no(sle.dependant_sle_voucher_detail_no)
for depend_sle in dependant_sles:
self._sles.extend(self.get_future_entries_to_repost(depend_sle))
if self.args.item_code != sle.item_code or self.args.warehouse != sle.warehouse:
self.repost_affected_transaction.add((sle.voucher_type, sle.voucher_no))
self._sles = deque(self.sort_sles(self._sles))
if isinstance(sle, dict):
sle = frappe._dict(sle)
def repost_stock_ledger_entry(self, sle):
if self.args.item_code != sle.item_code or self.args.warehouse != sle.warehouse:
self.repost_affected_transaction.add((sle.voucher_type, sle.voucher_no))
self.process_sle(sle)
i += 1
if i % 500 == 0:
self.update_data_in_repost(sle, i)
if isinstance(sle, dict):
sle = frappe._dict(sle)
self.process_sle(sle)
self.update_item_wh_wise_last_posted_sle(sle)
def update_item_wh_wise_last_posted_sle(self, sle):
if not self._sles:
self.item_wh_wise_last_posted_sle = frappe._dict()
return
self.item_wh_wise_last_posted_sle[(sle.item_code, sle.warehouse)] = frappe._dict(
{
"item_code": sle.item_code,
"warehouse": sle.warehouse,
"posting_date": sle.posting_date,
"posting_time": sle.posting_time,
"posting_datetime": sle.posting_datetime
or get_combine_datetime(sle.posting_date, sle.posting_time),
"creation": sle.creation,
}
)
def reset_vouchers_and_idx(self):
self.stock_ledgers_to_repost = []
self.prev_sle_dict = frappe._dict()
self.item_wh_wise_last_posted_sle = frappe._dict()
def update_data_in_repost(self, sle=None, index=None):
def update_data_in_repost(self, total_sles=None, index=None):
if not self.repost_doc:
return
values_to_update = {
"total_vouchers": len(self.stock_ledgers_to_repost) + cint(index),
"total_vouchers": cint(total_sles) + cint(index),
"vouchers_posted": index or 0,
"reposted_item_code": None,
"reposted_warehouse": None,
"sle_posting_date": None,
"sle_posting_time": None,
"reposted_sle_creation": None,
}
if sle:
values_to_update.update(
{
"reposted_item_code": sle.item_code,
"reposted_warehouse": sle.warehouse,
"sle_posting_date": sle.posting_date,
"sle_posting_time": sle.posting_time,
"reposted_sle_creation": sle.creation,
}
)
self.repost_doc.db_set(values_to_update)
update_args_in_repost_item_valuation(
@@ -701,6 +728,7 @@ class update_entries_after:
self.current_idx,
self.items_to_be_repost,
self.repost_affected_transaction,
self.item_wh_wise_last_posted_sle,
only_affected_transaction=True,
)
@@ -708,14 +736,14 @@ class update_entries_after:
# To maintain the state of the reposting, so if timeout happens, it can be resumed from the last posted voucher
frappe.db.commit() # nosemgrep
self.publish_real_time_progress(index=index)
self.publish_real_time_progress(total_sles=total_sles, index=index)
def publish_real_time_progress(self, index=None):
def publish_real_time_progress(self, total_sles=None, index=None):
frappe.publish_realtime(
"item_reposting_progress",
{
"name": self.repost_doc.name,
"total_vouchers": len(self.stock_ledgers_to_repost) + cint(index),
"total_vouchers": cint(total_sles) + cint(index),
"vouchers_posted": index or 0,
},
doctype=self.repost_doc.doctype,
@@ -736,7 +764,14 @@ class update_entries_after:
"is_cancelled": 0,
"dependant_sle_voucher_detail_no": ("!=", sle.dependant_sle_voucher_detail_no),
},
fields=["*"],
fields=[
"item_code",
"warehouse",
"posting_date",
"posting_time",
"posting_datetime",
"creation",
],
)
or []
)
@@ -1910,15 +1945,14 @@ def get_stock_ledger_entries(
def get_sle_by_voucher_detail_no(voucher_detail_no):
return frappe.db.get_value(
return frappe.get_all(
"Stock Ledger Entry",
{
filters={
"voucher_detail_no": voucher_detail_no,
"is_cancelled": 0,
"dependant_sle_voucher_detail_no": ("is", "not set"),
},
["*"],
as_dict=1,
fields=["item_code", "warehouse", "posting_date", "posting_time", "posting_datetime", "creation"],
)