fix: corrected logic to retry reposting if timeout occurs after dependent SLE processing

(cherry picked from commit 90b9ab0bc8)
This commit is contained in:
Rohit Waghchaure
2026-03-27 19:02:31 +05:30
committed by Mergify
parent 1c1369fea8
commit 8fbb86d53e
3 changed files with 159 additions and 187 deletions

View File

@@ -32,18 +32,11 @@
"total_reposting_count", "total_reposting_count",
"current_index", "current_index",
"gl_reposting_index", "gl_reposting_index",
"reposting_data_file",
"vouchers_based_on_item_and_warehouse_section", "vouchers_based_on_item_and_warehouse_section",
"total_vouchers", "total_vouchers",
"column_break_yqwo", "column_break_yqwo",
"vouchers_posted", "vouchers_posted"
"last_sle_posted_section",
"reposted_item_code",
"reposted_warehouse",
"reposting_data_file",
"column_break_miwc",
"sle_posting_date",
"sle_posting_time",
"reposted_sle_creation"
], ],
"fields": [ "fields": [
{ {
@@ -236,6 +229,7 @@
{ {
"fieldname": "vouchers_based_on_item_and_warehouse_section", "fieldname": "vouchers_based_on_item_and_warehouse_section",
"fieldtype": "Section Break", "fieldtype": "Section Break",
"hidden": 1,
"label": "Reposting Vouchers" "label": "Reposting Vouchers"
}, },
{ {
@@ -256,52 +250,6 @@
"no_copy": 1, "no_copy": 1,
"read_only": 1 "read_only": 1
}, },
{
"fieldname": "last_sle_posted_section",
"fieldtype": "Section Break",
"label": "Last SLE Posted"
},
{
"fieldname": "reposted_item_code",
"fieldtype": "Link",
"label": "Reposted Item Code",
"no_copy": 1,
"options": "Item",
"read_only": 1
},
{
"fieldname": "reposted_warehouse",
"fieldtype": "Link",
"label": "Reposted Warehouse",
"no_copy": 1,
"options": "Warehouse",
"read_only": 1
},
{
"fieldname": "column_break_miwc",
"fieldtype": "Column Break"
},
{
"fieldname": "reposted_sle_creation",
"fieldtype": "Datetime",
"label": "Reposted SLE Creation",
"no_copy": 1,
"read_only": 1
},
{
"fieldname": "sle_posting_date",
"fieldtype": "Date",
"label": "SLE Posting Date",
"no_copy": 1,
"read_only": 1
},
{
"fieldname": "sle_posting_time",
"fieldtype": "Time",
"label": "SLE Posting Time",
"no_copy": 1,
"read_only": 1
},
{ {
"fieldname": "reposting_data_file", "fieldname": "reposting_data_file",
"fieldtype": "Attach", "fieldtype": "Attach",
@@ -314,7 +262,7 @@
"index_web_pages_for_search": 1, "index_web_pages_for_search": 1,
"is_submittable": 1, "is_submittable": 1,
"links": [], "links": [],
"modified": "2026-03-26 13:52:51.895504", "modified": "2026-03-27 18:59:58.637964",
"modified_by": "Administrator", "modified_by": "Administrator",
"module": "Stock", "module": "Stock",
"name": "Repost Item Valuation", "name": "Repost Item Valuation",

View File

@@ -49,13 +49,8 @@ class RepostItemValuation(Document):
posting_time: DF.Time | None posting_time: DF.Time | None
recreate_stock_ledgers: DF.Check recreate_stock_ledgers: DF.Check
repost_only_accounting_ledgers: DF.Check repost_only_accounting_ledgers: DF.Check
reposted_item_code: DF.Link | None
reposted_sle_creation: DF.Datetime | None
reposted_warehouse: DF.Link | None
reposting_data_file: DF.Attach | None reposting_data_file: DF.Attach | None
reposting_reference: DF.Data | None reposting_reference: DF.Data | None
sle_posting_date: DF.Date | None
sle_posting_time: DF.Time | None
status: DF.Literal["Queued", "In Progress", "Completed", "Skipped", "Failed", "Cancelled"] status: DF.Literal["Queued", "In Progress", "Completed", "Skipped", "Failed", "Cancelled"]
total_reposting_count: DF.Int total_reposting_count: DF.Int
total_vouchers: DF.Int total_vouchers: DF.Int
@@ -268,11 +263,6 @@ class RepostItemValuation(Document):
self.total_reposting_count = 0 self.total_reposting_count = 0
self.total_vouchers = 0 self.total_vouchers = 0
self.vouchers_posted = 0 self.vouchers_posted = 0
self.reposted_item_code = None
self.reposted_warehouse = None
self.sle_posting_date = None
self.sle_posting_time = None
self.reposted_sle_creation = None
self.clear_attachment() self.clear_attachment()
self.db_update() self.db_update()

View File

@@ -4,6 +4,7 @@
import copy import copy
import gzip import gzip
import json import json
from collections import deque
import frappe import frappe
from frappe import _, bold, scrub from frappe import _, bold, scrub
@@ -224,7 +225,13 @@ def repost_future_sle(
voucher_type=voucher_type, voucher_no=voucher_no, doc=doc, reposting_data=reposting_data voucher_type=voucher_type, voucher_no=voucher_no, doc=doc, reposting_data=reposting_data
) )
repost_affected_transaction = get_affected_transactions(doc) or set() if doc and doc.reposting_data_file:
reposting_data = get_reposting_data(doc.reposting_data_file)
repost_affected_transaction = get_affected_transactions(doc, reposting_data) or set()
resume_item_wh_wise_last_posted_sle = (
get_item_wh_wise_last_posted_sle_from_reposting_data(doc, reposting_data) or {}
)
if not items_to_be_repost: if not items_to_be_repost:
return return
@@ -241,6 +248,7 @@ def repost_future_sle(
"items_to_be_repost": items_to_be_repost, "items_to_be_repost": items_to_be_repost,
"repost_doc": doc, "repost_doc": doc,
"repost_affected_transaction": repost_affected_transaction, "repost_affected_transaction": repost_affected_transaction,
"item_wh_wise_last_posted_sle": resume_item_wh_wise_last_posted_sle,
}, },
allow_negative_stock=allow_negative_stock, allow_negative_stock=allow_negative_stock,
via_landed_cost_voucher=via_landed_cost_voucher, via_landed_cost_voucher=via_landed_cost_voucher,
@@ -248,15 +256,25 @@ def repost_future_sle(
index += 1 index += 1
resume_item_wh_wise_last_posted_sle = {}
repost_affected_transaction.update(obj.repost_affected_transaction) repost_affected_transaction.update(obj.repost_affected_transaction)
update_args_in_repost_item_valuation(doc, index, items_to_be_repost, repost_affected_transaction) update_args_in_repost_item_valuation(doc, index, items_to_be_repost, repost_affected_transaction)
def update_args_in_repost_item_valuation( def update_args_in_repost_item_valuation(
doc, index, items_to_be_repost, repost_affected_transaction, only_affected_transaction=False doc,
index,
items_to_be_repost,
repost_affected_transaction,
item_wh_wise_last_posted_sle=None,
only_affected_transaction=False,
): ):
file_name = "" file_name = ""
has_file = False has_file = False
if not item_wh_wise_last_posted_sle:
item_wh_wise_last_posted_sle = {}
if doc.reposting_data_file: if doc.reposting_data_file:
has_file = True has_file = True
@@ -267,6 +285,8 @@ def update_args_in_repost_item_valuation(
doc.reposting_data_file = create_json_gz_file( doc.reposting_data_file = create_json_gz_file(
{ {
"repost_affected_transaction": repost_affected_transaction, "repost_affected_transaction": repost_affected_transaction,
"item_wh_wise_last_posted_sle": {str(k): v for k, v in item_wh_wise_last_posted_sle.items()}
or {},
}, },
doc, doc,
file_name, file_name,
@@ -385,6 +405,16 @@ def get_affected_transactions(doc, reposting_data=None) -> set[tuple[str, str]]:
return set() return set()
def get_item_wh_wise_last_posted_sle_from_reposting_data(doc, reposting_data=None):
if not reposting_data and doc and doc.reposting_data_file:
reposting_data = get_reposting_data(doc.reposting_data_file)
if reposting_data and reposting_data.item_wh_wise_last_posted_sle:
return frappe._dict(reposting_data.item_wh_wise_last_posted_sle)
return frappe._dict()
def get_reposting_data(file_path) -> dict: def get_reposting_data(file_path) -> dict:
file_name = frappe.db.get_value( file_name = frappe.db.get_value(
"File", "File",
@@ -448,7 +478,6 @@ class update_entries_after:
self.allow_zero_rate = allow_zero_rate self.allow_zero_rate = allow_zero_rate
self.via_landed_cost_voucher = via_landed_cost_voucher self.via_landed_cost_voucher = via_landed_cost_voucher
self.item_code = args.get("item_code") self.item_code = args.get("item_code")
self.prev_sle_dict = frappe._dict({})
self.stock_ledgers_to_repost = [] self.stock_ledgers_to_repost = []
self.current_idx = args.get("current_idx", 0) self.current_idx = args.get("current_idx", 0)
self.repost_doc = args.get("repost_doc") or None self.repost_doc = args.get("repost_doc") or None
@@ -462,6 +491,7 @@ class update_entries_after:
if self.args.sle_id: if self.args.sle_id:
self.args["name"] = self.args.sle_id self.args["name"] = self.args.sle_id
self.prev_sle_dict = frappe._dict({})
self.company = frappe.get_cached_value("Warehouse", self.args.warehouse, "company") self.company = frappe.get_cached_value("Warehouse", self.args.warehouse, "company")
self.set_precision() self.set_precision()
self.valuation_method = get_valuation_method(self.item_code, self.company) self.valuation_method = get_valuation_method(self.item_code, self.company)
@@ -472,7 +502,7 @@ class update_entries_after:
self.data = frappe._dict() self.data = frappe._dict()
if not self.repost_doc or not self.repost_doc.reposted_item_code: if not self.repost_doc or not self.args.get("item_wh_wise_last_posted_sle"):
self.initialize_previous_data(self.args) self.initialize_previous_data(self.args)
self.build() self.build()
@@ -553,12 +583,14 @@ class update_entries_after:
if not future_sle_exists(self.args): if not future_sle_exists(self.args):
self.update_bin() self.update_bin()
else: else:
ledgers_to_repost = self.get_sles_to_repost() self.item_wh_wise_last_posted_sle = self.get_item_wh_wise_last_posted_sle()
if not ledgers_to_repost: _item_wh_sle = self.sort_sles(self.item_wh_wise_last_posted_sle.values())
return
while _item_wh_sle:
self.initialize_reposting()
sle_dict = _item_wh_sle.pop(0)
self.repost_stock_ledgers(sle_dict)
self.stock_ledgers_to_repost = ledgers_to_repost
self.repost_stock_ledger_entries()
self.update_bin() self.update_bin()
self.reset_vouchers_and_idx() self.reset_vouchers_and_idx()
self.update_data_in_repost() self.update_data_in_repost()
@@ -566,134 +598,129 @@ class update_entries_after:
if self.exceptions: if self.exceptions:
self.raise_exceptions() self.raise_exceptions()
def get_sles_to_repost(self): def initialize_reposting(self):
self.distinct_item_wh_sles = frappe._dict() self._sles = []
self.distinct_sles = set()
self.distinct_dependant_sle = set()
self.prev_sle_dict = frappe._dict({})
sle_dict = self.get_items_to_be_repost() def get_item_wh_wise_last_posted_sle(self):
self.prepare_sles_to_repost(sle_dict) if self.args and self.args.get("item_wh_wise_last_posted_sle"):
if not self.distinct_item_wh_sles: _sles = {}
return [] for key, sle in self.args.get("item_wh_wise_last_posted_sle").items():
_sles[frappe.safe_eval(key)] = frappe._dict(sle)
ledgers_to_repost = sorted( return _sles
(row for rows in self.distinct_item_wh_sles.values() for row in rows),
key=lambda d: (get_datetime(d.get("posting_datetime")), get_datetime(d.get("creation"))),
)
return ledgers_to_repost return {
(self.args.item_code, self.args.warehouse): frappe._dict(
def prepare_sles_to_repost(self, sle_dict):
sles = self.get_future_entries_to_repost(sle_dict)
for sle in sles:
item_wh_key = (sle.item_code, sle.warehouse)
if item_wh_key not in self.prev_sle_dict:
prev_sle = get_previous_sle_of_current_voucher(sle)
self.prev_sle_dict[item_wh_key] = prev_sle
key = (sle.item_code, sle.warehouse, sle.voucher_detail_no, sle.name)
if key not in self.distinct_item_wh_sles:
self.distinct_item_wh_sles.setdefault(key, []).append(sle)
if sle.dependant_sle_voucher_detail_no:
self.prepare_dependent_sles_to_repost(sle)
def prepare_dependent_sles_to_repost(self, sle):
if sle.voucher_type == "Stock Entry" and is_repack_entry(sle.voucher_no):
sles = self.get_sles_for_repack(sle)
for repack_sle in sles:
key = (
repack_sle.item_code,
repack_sle.warehouse,
repack_sle.voucher_detail_no,
repack_sle.name,
)
if key not in self.distinct_item_wh_sles:
self.distinct_item_wh_sles.setdefault(key, []).append(repack_sle)
self.prepare_sles_to_repost(repack_sle)
elif sle.dependant_sle_voucher_detail_no:
dependant_sle = get_sle_by_voucher_detail_no(sle.dependant_sle_voucher_detail_no)
if not dependant_sle:
return
key = (
dependant_sle.item_code,
dependant_sle.warehouse,
dependant_sle.voucher_detail_no,
dependant_sle.name,
)
if key not in self.distinct_item_wh_sles:
self.distinct_item_wh_sles.setdefault(key, []).append(dependant_sle)
self.prepare_sles_to_repost(dependant_sle)
def get_items_to_be_repost(self):
if self.repost_doc and self.repost_doc.reposted_item_code:
return frappe._dict(
{ {
"item_code": self.repost_doc.reposted_item_code, "item_code": self.args.item_code,
"warehouse": self.repost_doc.reposted_warehouse, "warehouse": self.args.warehouse,
"posting_date": self.repost_doc.sle_posting_date, "posting_datetime": get_combine_datetime(self.args.posting_date, self.args.posting_time),
"posting_time": self.repost_doc.sle_posting_time, "posting_date": self.args.posting_date,
"creation": self.repost_doc.reposted_sle_creation, "posting_time": self.args.posting_time,
"creation": self.args.creation,
} }
) )
}
return frappe._dict( def repost_stock_ledgers(self, sle_dict=None):
{ self._sles = self.get_future_entries_to_repost(sle_dict)
"item_code": self.args.item_code,
"warehouse": self.args.warehouse, if not isinstance(self._sles, deque):
"posting_date": self.args.posting_date, self._sles = deque(self._sles)
"posting_time": self.args.posting_time,
"creation": self.args.creation, i = 0
} while self._sles:
sle = self._sles.popleft()
i += 1
if sle.name in self.distinct_sles:
continue
item_wh_key = (sle.item_code, sle.warehouse)
if item_wh_key not in self.prev_sle_dict:
self.prev_sle_dict[item_wh_key] = get_previous_sle_of_current_voucher(sle)
if (
sle.dependant_sle_voucher_detail_no
and sle.dependant_sle_voucher_detail_no not in self.distinct_dependant_sle
):
self._sles.append(sle)
self.distinct_dependant_sle.add(sle.dependant_sle_voucher_detail_no)
self.include_dependant_sle_in_reposting(sle)
continue
self.repost_stock_ledger_entry(sle)
# To avoid duplicate reposting of same sle in case of multiple dependant sle
self.distinct_sles.add(sle.name)
# if i % 1000 == 0:
self.update_data_in_repost(len(self._sles), i)
def sort_sles(self, sles):
return sorted(
sles,
key=lambda d: (
get_datetime(d.posting_datetime),
get_datetime(d.creation),
),
) )
def repost_stock_ledger_entries(self): def include_dependant_sle_in_reposting(self, sle):
i = 0 if sle.voucher_type == "Stock Entry" and is_repack_entry(sle.voucher_no):
while self.stock_ledgers_to_repost: repack_sles = self.get_sles_for_repack(sle)
sle = self.stock_ledgers_to_repost.pop(0) for repack_sle in repack_sles:
self._sles.extend(self.get_future_entries_to_repost(repack_sle))
else:
dependant_sles = get_sle_by_voucher_detail_no(sle.dependant_sle_voucher_detail_no)
for depend_sle in dependant_sles:
self._sles.extend(self.get_future_entries_to_repost(depend_sle))
if self.args.item_code != sle.item_code or self.args.warehouse != sle.warehouse: self._sles = deque(self.sort_sles(self._sles))
self.repost_affected_transaction.add((sle.voucher_type, sle.voucher_no))
if isinstance(sle, dict): def repost_stock_ledger_entry(self, sle):
sle = frappe._dict(sle) if self.args.item_code != sle.item_code or self.args.warehouse != sle.warehouse:
self.repost_affected_transaction.add((sle.voucher_type, sle.voucher_no))
self.process_sle(sle) if isinstance(sle, dict):
i += 1 sle = frappe._dict(sle)
if i % 500 == 0:
self.update_data_in_repost(sle, i) self.process_sle(sle)
self.update_item_wh_wise_last_posted_sle(sle)
def update_item_wh_wise_last_posted_sle(self, sle):
if not self._sles:
self.item_wh_wise_last_posted_sle = frappe._dict()
return
self.item_wh_wise_last_posted_sle[(sle.item_code, sle.warehouse)] = frappe._dict(
{
"item_code": sle.item_code,
"warehouse": sle.warehouse,
"posting_date": sle.posting_date,
"posting_time": sle.posting_time,
"posting_datetime": sle.posting_datetime
or get_combine_datetime(sle.posting_date, sle.posting_time),
"creation": sle.creation,
}
)
def reset_vouchers_and_idx(self): def reset_vouchers_and_idx(self):
self.stock_ledgers_to_repost = [] self.stock_ledgers_to_repost = []
self.prev_sle_dict = frappe._dict() self.prev_sle_dict = frappe._dict()
self.item_wh_wise_last_posted_sle = frappe._dict()
def update_data_in_repost(self, sle=None, index=None): def update_data_in_repost(self, total_sles=None, index=None):
if not self.repost_doc: if not self.repost_doc:
return return
values_to_update = { values_to_update = {
"total_vouchers": len(self.stock_ledgers_to_repost) + cint(index), "total_vouchers": cint(total_sles) + cint(index),
"vouchers_posted": index or 0, "vouchers_posted": index or 0,
"reposted_item_code": None,
"reposted_warehouse": None,
"sle_posting_date": None,
"sle_posting_time": None,
"reposted_sle_creation": None,
} }
if sle:
values_to_update.update(
{
"reposted_item_code": sle.item_code,
"reposted_warehouse": sle.warehouse,
"sle_posting_date": sle.posting_date,
"sle_posting_time": sle.posting_time,
"reposted_sle_creation": sle.creation,
}
)
self.repost_doc.db_set(values_to_update) self.repost_doc.db_set(values_to_update)
update_args_in_repost_item_valuation( update_args_in_repost_item_valuation(
@@ -701,6 +728,7 @@ class update_entries_after:
self.current_idx, self.current_idx,
self.items_to_be_repost, self.items_to_be_repost,
self.repost_affected_transaction, self.repost_affected_transaction,
self.item_wh_wise_last_posted_sle,
only_affected_transaction=True, only_affected_transaction=True,
) )
@@ -708,14 +736,14 @@ class update_entries_after:
# To maintain the state of the reposting, so if timeout happens, it can be resumed from the last posted voucher # To maintain the state of the reposting, so if timeout happens, it can be resumed from the last posted voucher
frappe.db.commit() # nosemgrep frappe.db.commit() # nosemgrep
self.publish_real_time_progress(index=index) self.publish_real_time_progress(total_sles=total_sles, index=index)
def publish_real_time_progress(self, index=None): def publish_real_time_progress(self, total_sles=None, index=None):
frappe.publish_realtime( frappe.publish_realtime(
"item_reposting_progress", "item_reposting_progress",
{ {
"name": self.repost_doc.name, "name": self.repost_doc.name,
"total_vouchers": len(self.stock_ledgers_to_repost) + cint(index), "total_vouchers": cint(total_sles) + cint(index),
"vouchers_posted": index or 0, "vouchers_posted": index or 0,
}, },
doctype=self.repost_doc.doctype, doctype=self.repost_doc.doctype,
@@ -736,7 +764,14 @@ class update_entries_after:
"is_cancelled": 0, "is_cancelled": 0,
"dependant_sle_voucher_detail_no": ("!=", sle.dependant_sle_voucher_detail_no), "dependant_sle_voucher_detail_no": ("!=", sle.dependant_sle_voucher_detail_no),
}, },
fields=["*"], fields=[
"item_code",
"warehouse",
"posting_date",
"posting_time",
"posting_datetime",
"creation",
],
) )
or [] or []
) )
@@ -1910,15 +1945,14 @@ def get_stock_ledger_entries(
def get_sle_by_voucher_detail_no(voucher_detail_no): def get_sle_by_voucher_detail_no(voucher_detail_no):
return frappe.db.get_value( return frappe.get_all(
"Stock Ledger Entry", "Stock Ledger Entry",
{ filters={
"voucher_detail_no": voucher_detail_no, "voucher_detail_no": voucher_detail_no,
"is_cancelled": 0, "is_cancelled": 0,
"dependant_sle_voucher_detail_no": ("is", "not set"), "dependant_sle_voucher_detail_no": ("is", "not set"),
}, },
["*"], fields=["item_code", "warehouse", "posting_date", "posting_time", "posting_datetime", "creation"],
as_dict=1,
) )