From 7752f703d2c0851c1bc4468281f0d7320c7f6e09 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Sun, 24 May 2026 18:30:45 +0530 Subject: [PATCH] refactor: stock ageing report (backport #55231) (#55237) Co-authored-by: Mihir Kandoi --- .../stock/report/stock_ageing/stock_ageing.py | 981 +++++++++++------- .../report/stock_ageing/test_stock_ageing.py | 27 + 2 files changed, 605 insertions(+), 403 deletions(-) diff --git a/erpnext/stock/report/stock_ageing/stock_ageing.py b/erpnext/stock/report/stock_ageing/stock_ageing.py index 0bf6b3df9a3..bbe5392f4c3 100644 --- a/erpnext/stock/report/stock_ageing/stock_ageing.py +++ b/erpnext/stock/report/stock_ageing/stock_ageing.py @@ -15,10 +15,25 @@ from erpnext.stock.valuation import round_off_if_near_zero Filters = frappe._dict +FIFO_POSTING_DATE_INDEX = -2 +FIFO_QTY_INDEX = 0 +FIFO_DATE_INDEX = 1 +FIFO_VALUE_INDEX = 2 + +BATCH_SLOT_SIZE = 5 +BATCH_SLOT_BATCH_INDEX = 0 +BATCH_SLOT_VALUATION_INDEX = 1 +BATCH_SLOT_QTY_INDEX = 2 +BATCH_SLOT_DATE_INDEX = 3 +BATCH_SLOT_VALUE_INDEX = 4 + +AVERAGE_AGE_COLUMN = 6 +MAX_CHART_ITEMS = 10 + def execute(filters: Filters = None) -> tuple: to_date = filters["to_date"] - filters.ranges = [num.strip() for num in filters.range.split(",") if num.strip().isdigit()] + filters.ranges = get_age_ranges(filters.range) columns = get_columns(filters) item_details = FIFOSlots(filters).generate() @@ -29,95 +44,125 @@ def execute(filters: Filters = None) -> tuple: return columns, data, None, chart_data -def format_report_data(filters: Filters, item_details: dict, to_date: str) -> list[dict]: +def get_age_ranges(age_range: str) -> list[str]: + return [num.strip() for num in age_range.split(",") if num.strip().isdigit()] + + +def get_float_precision() -> int: + return cint(frappe.db.get_single_value("System Settings", "float_precision", cache=True)) + + +def format_report_data(filters: Filters, item_details: dict, to_date: str) -> list[list]: "Returns ordered, formatted data with ranges." - _func = itemgetter(-2) data = [] - precision = cint(frappe.db.get_single_value("System Settings", "float_precision", cache=True)) + precision = get_float_precision() for _item, item_dict in item_details.items(): if not flt(item_dict.get("total_qty"), precision): continue - earliest_age, latest_age = 0, 0 details = item_dict["details"] - - fifo_queue = sorted(filter(_func, item_dict["fifo_queue"]), key=_func) - + fifo_queue = get_report_fifo_queue(item_dict["fifo_queue"], details.has_batch_no) if not fifo_queue: continue - if details.has_batch_no: - fifo_queue = [slot[2:] if len(slot) == 5 else slot for slot in fifo_queue] - - average_age = get_average_age(fifo_queue, to_date) - earliest_age = date_diff(to_date, fifo_queue[0][1]) - latest_age = date_diff(to_date, fifo_queue[-1][1]) - range_values = get_range_age(filters, fifo_queue, to_date, item_dict) - - row = [details.name, details.item_name, details.description, details.item_group, details.brand] - - if filters.get("show_warehouse_wise_stock"): - row.append(details.warehouse) - - row.extend( - [ - flt(item_dict.get("total_qty"), precision), - average_age, - *range_values, - earliest_age, - latest_age, - details.stock_uom, - ] - ) - - data.append(row) + data.append(get_report_row(filters, item_dict, fifo_queue, to_date, precision)) return data -def get_average_age(fifo_queue: list, to_date: str) -> float: - batch_age = age_qty = total_qty = 0.0 - for batch in fifo_queue: - batch_age = date_diff(to_date, batch[1]) +def get_report_fifo_queue(fifo_queue: list, has_batch_no: bool) -> list: + get_posting_date = itemgetter(FIFO_POSTING_DATE_INDEX) + fifo_queue = sorted([slot for slot in fifo_queue if get_posting_date(slot)], key=get_posting_date) - if isinstance(batch[0], int | float): - age_qty += batch_age * batch[0] - total_qty += batch[0] - else: - age_qty += batch_age * 1 - total_qty += 1 + if has_batch_no: + return [get_batch_report_slot(slot) for slot in fifo_queue] + + return fifo_queue + + +def get_batch_report_slot(slot: list) -> list: + if is_batch_slot(slot): + return slot[BATCH_SLOT_QTY_INDEX:] + + return slot + + +def get_report_row(filters: Filters, item_dict: dict, fifo_queue: list, to_date: str, precision: int) -> list: + details = item_dict["details"] + range_values = get_range_age(filters, fifo_queue, to_date, item_dict, precision) + row = [details.name, details.item_name, details.description, details.item_group, details.brand] + + if filters.get("show_warehouse_wise_stock"): + row.append(details.warehouse) + + row.extend( + [ + flt(item_dict.get("total_qty"), precision), + get_average_age(fifo_queue, to_date), + *range_values, + date_diff(to_date, fifo_queue[0][FIFO_DATE_INDEX]), + date_diff(to_date, fifo_queue[-1][FIFO_DATE_INDEX]), + details.stock_uom, + ] + ) + + return row + + +def get_average_age(fifo_queue: list, to_date: str) -> float: + age_qty = total_qty = 0.0 + for slot in fifo_queue: + qty = get_slot_qty(slot) + age_qty += date_diff(to_date, slot[FIFO_DATE_INDEX]) * qty + total_qty += qty return flt(age_qty / total_qty, 2) if total_qty else 0.0 -def get_range_age(filters: Filters, fifo_queue: list, to_date: str, item_dict: dict) -> list: - precision = cint(frappe.db.get_single_value("System Settings", "float_precision", cache=True)) +def get_slot_qty(slot: list) -> float: + if is_qty_slot(slot): + return slot[FIFO_QTY_INDEX] + + return 1.0 + + +def get_range_age( + filters: Filters, fifo_queue: list, to_date: str, item_dict: dict, precision: int | None = None +) -> list: + precision = precision if precision is not None else get_float_precision() range_values = [0.0] * ((len(filters.ranges) * 2) + 2) - for item in fifo_queue: - age = flt(date_diff(to_date, item[1])) - qty = flt(item[0]) if not item_dict["has_serial_no"] else 1.0 - stock_value = flt(item[2]) - - for i, age_limit in enumerate(filters.ranges): - if age <= flt(age_limit): - i *= 2 - range_values[i] = flt(range_values[i] + qty, precision) - range_values[i + 1] = flt(range_values[i + 1] + stock_value, precision) - if range_values[i] == 0.0 and round_off_if_near_zero(range_values[i + 1], 2) == 0: - range_values[i + 1] = 0.0 - break - else: - range_values[-2] = flt(range_values[-2] + qty, precision) - range_values[-1] = flt(range_values[-1] + stock_value, precision) - if range_values[-2] == 0.0 and round_off_if_near_zero(range_values[-1], 2) == 0: - range_values[-1] = 0.0 + for slot in fifo_queue: + bucket_index = get_age_bucket_index(filters.ranges, slot, to_date) + qty = 1.0 if item_dict["has_serial_no"] else flt(slot[FIFO_QTY_INDEX]) + stock_value = flt(slot[FIFO_VALUE_INDEX]) + add_to_range_bucket(range_values, bucket_index, qty, stock_value, precision) return range_values +def get_age_bucket_index(age_ranges: list, slot: list, to_date: str) -> int: + age = flt(date_diff(to_date, slot[FIFO_DATE_INDEX])) + + for index, age_limit in enumerate(age_ranges): + if age <= flt(age_limit): + return index * 2 + + return len(age_ranges) * 2 + + +def add_to_range_bucket( + range_values: list, bucket_index: int, qty: float, stock_value: float, precision: int +) -> None: + range_values[bucket_index] = flt(range_values[bucket_index] + qty, precision) + range_values[bucket_index + 1] = flt(range_values[bucket_index + 1] + stock_value, precision) + + if range_values[bucket_index] == 0.0 and round_off_if_near_zero(range_values[bucket_index + 1], 2) == 0: + range_values[bucket_index + 1] = 0.0 + + def get_columns(filters: Filters) -> list[dict]: range_columns = [] setup_ageing_columns(filters, range_columns) @@ -185,14 +230,14 @@ def get_chart_data(data: list, filters: Filters) -> dict: if filters.get("show_warehouse_wise_stock"): return {} - data.sort(key=lambda row: row[6], reverse=True) + data.sort(key=lambda row: row[AVERAGE_AGE_COLUMN], reverse=True) - if len(data) > 10: - data = data[:10] + if len(data) > MAX_CHART_ITEMS: + data = data[:MAX_CHART_ITEMS] for row in data: labels.append(row[0]) - datapoints.append(row[6]) + datapoints.append(row[AVERAGE_AGE_COLUMN]) return { "data": {"labels": labels, "datasets": [{"name": _("Average Age"), "values": datapoints}]}, @@ -203,9 +248,9 @@ def get_chart_data(data: list, filters: Filters) -> dict: def setup_ageing_columns(filters: Filters, range_columns: list): prev_range_value = 0 ranges = [] - for range in filters.ranges: - ranges.append(f"{prev_range_value} - {range}") - prev_range_value = cint(range) + 1 + for age_range in filters.ranges: + ranges.append(f"{prev_range_value} - {age_range}") + prev_range_value = cint(age_range) + 1 ranges.append(f"{prev_range_value} - Above") @@ -219,6 +264,14 @@ def add_column(range_columns: list, label: str, fieldname: str, fieldtype: str = range_columns.append(dict(label=label, fieldname=fieldname, fieldtype=fieldtype, width=width)) +def is_batch_slot(slot: list) -> bool: + return len(slot) == BATCH_SLOT_SIZE + + +def is_qty_slot(slot: list) -> bool: + return isinstance(slot[FIFO_QTY_INDEX], int | float) + + class FIFOSlots: "Returns FIFO computed slots of inwarded stock as per date." @@ -233,7 +286,7 @@ class FIFOSlots: def generate(self) -> dict: """ - Returns dict of the foll.g structure: + Returns dict of the following structure: Key = Item A / (Item A, Warehouse A) Key: { 'details' -> Dict: ** item details **, @@ -241,105 +294,128 @@ class FIFOSlots: consumed/updated and maintained via FIFO. ** } """ - from erpnext.stock.serial_batch_bundle import get_serial_nos_from_bundle - stock_ledger_entries = self.sle - - bundle_wise_serial_nos = frappe._dict({}) - bundle_wise_batch_nos = frappe._dict({}) - if stock_ledger_entries is None: - bundle_wise_serial_nos = self.__get_bundle_wise_serial_nos() - bundle_wise_batch_nos = self.__get_bundle_wise_batch_nos() + bundle_wise_serial_nos, bundle_wise_batch_nos = self._get_bundle_wise_details(stock_ledger_entries) # prepare single sle voucher detail lookup self.prepare_stock_reco_voucher_wise_count() with frappe.db.unbuffered_cursor(): if stock_ledger_entries is None: - stock_ledger_entries = self.__get_stock_ledger_entries() + stock_ledger_entries = self._get_stock_ledger_entries() - for d in stock_ledger_entries: - key, fifo_queue, transferred_item_key = self.__init_key_stores(d) - prev_balance_qty = self.item_details[key].get("qty_after_transaction", 0) - - if d.voucher_type == "Stock Reconciliation" and ( - not d.batch_no or d.serial_no or d.serial_and_batch_bundle - ): - if d.voucher_detail_no in self.stock_reco_voucher_wise_count: - # for legacy recon with single sle has qty_after_transaction and stock_value_difference without outward entry - # for exisitng handle emptying the existing queue and details. - d.stock_value_difference = flt(d.qty_after_transaction * d.valuation_rate) - d.actual_qty = d.qty_after_transaction - self.item_details[key]["qty_after_transaction"] = 0 - self.item_details[key]["total_qty"] = 0 - fifo_queue.clear() - else: - d.actual_qty = flt(d.qty_after_transaction) - flt(prev_balance_qty) - - elif d.voucher_type == "Stock Reconciliation": - # get difference in qty shift as actual qty - d.actual_qty = flt(d.qty_after_transaction) - flt(prev_balance_qty) - - serial_nos = get_serial_nos(d.serial_no) if d.serial_no else [] - batch_nos = ( - [ - [ - d.batch_no.upper(), - self.__get_batchwise_valuation(d.batch_no), - abs(d.actual_qty), - abs(d.stock_value_difference), - ] - ] - if d.batch_no - else [] - ) - if d.serial_and_batch_bundle: - if d.has_serial_no: - if bundle_wise_serial_nos: - serial_nos = bundle_wise_serial_nos.get(d.serial_and_batch_bundle) or [] - else: - serial_nos = sorted(get_serial_nos_from_bundle(d.serial_and_batch_bundle)) or [] - elif d.has_batch_no: - if bundle_wise_batch_nos: - batch_nos = bundle_wise_batch_nos.get(d.serial_and_batch_bundle) or [] - else: - batch_nos = ( - self.__get_bundle_wise_batch_nos(d.serial_and_batch_bundle).get( - d.serial_and_batch_bundle - ) - or [] - ) - - serial_nos = self.uppercase_serial_nos(serial_nos) - if d.actual_qty > 0: - self.__compute_incoming_stock(d, fifo_queue, transferred_item_key, serial_nos, batch_nos) - else: - self.__compute_outgoing_stock(d, fifo_queue, transferred_item_key, serial_nos, batch_nos) - - self.__update_balances(d, key) - - # handle serial nos misconsumption - if d.has_serial_no: - qty_after = cint(self.item_details[key]["qty_after_transaction"]) - if qty_after <= 0: - fifo_queue.clear() - elif len(fifo_queue) > qty_after: - fifo_queue[:] = fifo_queue[:qty_after] + for row in stock_ledger_entries: + self._process_stock_ledger_entry(row, bundle_wise_serial_nos, bundle_wise_batch_nos) # Note that stock_ledger_entries is an iterator, you can not reuse it like a list del stock_ledger_entries if not self.filters.get("show_warehouse_wise_stock"): # (Item 1, WH 1), (Item 1, WH 2) => (Item 1) - self.item_details = self.__aggregate_details_by_item(self.item_details) + self.item_details = self._aggregate_details_by_item(self.item_details) return self.item_details + def _get_bundle_wise_details(self, stock_ledger_entries: list | None) -> tuple[dict, dict]: + if stock_ledger_entries is not None: + return frappe._dict({}), frappe._dict({}) + + return self._get_bundle_wise_serial_nos(), self._get_bundle_wise_batch_nos() + + def _process_stock_ledger_entry( + self, row: dict, bundle_wise_serial_nos: dict, bundle_wise_batch_nos: dict + ) -> None: + key, fifo_queue, transferred_item_key = self._init_key_stores(row) + prev_balance_qty = self.item_details[key].get("qty_after_transaction", 0) + + self._set_stock_reconciliation_actual_qty(row, key, fifo_queue, prev_balance_qty) + serial_nos, batch_nos = self._get_serial_and_batch_nos( + row, bundle_wise_serial_nos, bundle_wise_batch_nos + ) + + if row.actual_qty > 0: + self._compute_incoming_stock(row, fifo_queue, transferred_item_key, serial_nos, batch_nos) + else: + self._compute_outgoing_stock(row, fifo_queue, transferred_item_key, serial_nos, batch_nos) + + self._update_balances(row, key) + self._trim_serial_fifo_queue(row, key, fifo_queue) + + def _set_stock_reconciliation_actual_qty( + self, row: dict, key: tuple, fifo_queue: list, prev_balance_qty: float + ) -> None: + if row.voucher_type != "Stock Reconciliation": + return + + if not row.batch_no or row.serial_no or row.serial_and_batch_bundle: + if row.voucher_detail_no in self.stock_reco_voucher_wise_count: + # Legacy reconciliation with a single SLE has qty_after_transaction and + # stock_value_difference without an outward entry, so reset the queue first. + row.stock_value_difference = flt(row.qty_after_transaction * row.valuation_rate) + row.actual_qty = row.qty_after_transaction + self.item_details[key]["qty_after_transaction"] = 0 + self.item_details[key]["total_qty"] = 0 + fifo_queue.clear() + return + + # Stock reconciliation stores the final balance; FIFO needs the movement delta. + row.actual_qty = flt(row.qty_after_transaction) - flt(prev_balance_qty) + + def _get_serial_and_batch_nos( + self, row: dict, bundle_wise_serial_nos: dict, bundle_wise_batch_nos: dict + ) -> tuple[list, list]: + from erpnext.stock.serial_batch_bundle import get_serial_nos_from_bundle + + serial_nos = get_serial_nos(row.serial_no) if row.serial_no else [] + batch_nos = self._get_row_batch_nos(row) + + if row.serial_and_batch_bundle: + if row.has_serial_no: + if bundle_wise_serial_nos: + serial_nos = bundle_wise_serial_nos.get(row.serial_and_batch_bundle) or [] + else: + serial_nos = sorted(get_serial_nos_from_bundle(row.serial_and_batch_bundle)) or [] + elif row.has_batch_no: + if bundle_wise_batch_nos: + batch_nos = bundle_wise_batch_nos.get(row.serial_and_batch_bundle) or [] + else: + batch_nos = ( + self._get_bundle_wise_batch_nos(row.serial_and_batch_bundle).get( + row.serial_and_batch_bundle + ) + or [] + ) + + return self.uppercase_serial_nos(serial_nos), batch_nos + + def _get_row_batch_nos(self, row: dict) -> list: + if not row.batch_no: + return [] + + return [ + [ + row.batch_no.upper(), + self._get_batchwise_valuation(row.batch_no), + abs(row.actual_qty), + abs(row.stock_value_difference), + ] + ] + + def _trim_serial_fifo_queue(self, row: dict, key: tuple, fifo_queue: list) -> None: + if not row.has_serial_no: + return + + qty_after = cint(self.item_details[key]["qty_after_transaction"]) + if qty_after <= 0: + fifo_queue.clear() + elif len(fifo_queue) > qty_after: + fifo_queue[:] = fifo_queue[:qty_after] + def uppercase_serial_nos(self, serial_nos): "Convert serial nos to uppercase for uniformity." return [sn.upper() for sn in serial_nos] - def __get_batchwise_valuation(self, batch_no: str): + def _get_batchwise_valuation(self, batch_no: str): if batch_no not in self.batchwise_valuation_by_batch: self.batchwise_valuation_by_batch[batch_no] = frappe.db.get_value( "Batch", batch_no, "use_batchwise_valuation" @@ -347,7 +423,7 @@ class FIFOSlots: return self.batchwise_valuation_by_batch[batch_no] - def __init_key_stores(self, row: dict) -> tuple: + def _init_key_stores(self, row: dict) -> tuple: "Initialise keys and FIFO Queue." key = (row.name, row.warehouse) @@ -359,290 +435,389 @@ class FIFOSlots: return key, fifo_queue, transferred_item_key - def __compute_incoming_stock( + def _compute_incoming_stock( self, row: dict, fifo_queue: list, transfer_key: tuple, serial_nos: list, batch_nos: list ): "Update FIFO Queue on inward stock." - - def set_fifo_queue_for_serial_items(): - valuation = row.stock_value_difference / row.actual_qty - for serial_no in serial_nos: - if self.serial_no_details.get(serial_no): - fifo_queue.append([serial_no, self.serial_no_details.get(serial_no), valuation]) - else: - self.serial_no_details.setdefault(serial_no, row.posting_date) - fifo_queue.append([serial_no, row.posting_date, valuation]) - - def set_fifo_queue_for_batch_items(): - for batch_no, use_batchwise_valuation, qty, stock_value_difference in batch_nos: - qty, stock_value_difference = neutralize_negative_batch_stock( - batch_no, use_batchwise_valuation, qty, stock_value_difference - ) - - if not qty: - continue - - if self.batch_no_details.get(batch_no): - fifo_queue.append( - [ - batch_no, - use_batchwise_valuation, - qty, - self.batch_no_details.get(batch_no), - stock_value_difference, - ] - ) - else: - self.batch_no_details.setdefault(batch_no, row.posting_date) - fifo_queue.append( - [batch_no, use_batchwise_valuation, qty, row.posting_date, stock_value_difference] - ) - - def neutralize_negative_batch_stock(batch_no, use_batchwise_valuation, qty, stock_value_difference): - qty = flt(qty) - stock_value_difference = flt(stock_value_difference) - - if not qty: - return qty, stock_value_difference - - for slot in list(fifo_queue): - if ( - len(slot) != 5 - or slot[0] != batch_no - or slot[1] != use_batchwise_valuation - or flt(slot[2]) >= 0 - ): - continue - - qty_to_adjust = min(qty, abs(flt(slot[2]))) - value_to_adjust = ( - stock_value_difference - if qty_to_adjust == qty - else flt(stock_value_difference * (qty_to_adjust / qty)) - ) - - slot[2] = flt(slot[2]) + qty_to_adjust - slot[3] = row.posting_date - slot[4] = flt(slot[4]) + value_to_adjust - - qty = flt(qty - qty_to_adjust) - stock_value_difference = flt(stock_value_difference - value_to_adjust) - - if not flt(slot[2]) and not flt(slot[4]): - fifo_queue.remove(slot) - - if not qty: - break - - return qty, stock_value_difference - transfer_data = self.transferred_item_details.get(transfer_key) if transfer_data: # inward/outward from same voucher, item & warehouse # eg: Repack with same item, Stock reco for batch item # consume transfer data and add stock to fifo queue - self.__adjust_incoming_transfer_qty(transfer_data, fifo_queue, row, batch_nos) + self._adjust_incoming_transfer_qty( + transfer_data, + fifo_queue, + row, + batch_nos, + serial_nos=serial_nos if row.get("has_serial_no") else None, + ) + elif serial_nos and row.get("has_serial_no"): + self._add_serial_fifo_slots(row, fifo_queue, serial_nos) + elif batch_nos and row.get("has_batch_no"): + self._add_batch_fifo_slots(row, fifo_queue, batch_nos) + elif fifo_queue and flt(fifo_queue[0][FIFO_QTY_INDEX]) <= 0: + self._add_to_negative_fifo_head(row, fifo_queue) else: - if serial_nos and row.get("has_serial_no"): - set_fifo_queue_for_serial_items() - elif batch_nos and row.get("has_batch_no"): - set_fifo_queue_for_batch_items() - elif fifo_queue and flt(fifo_queue[0][0]) <= 0: - # neutralize 0/negative stock by adding positive stock - fifo_queue[0][0] += flt(row.actual_qty) - fifo_queue[0][1] = row.posting_date - fifo_queue[0][2] += flt(row.stock_value_difference) - else: - fifo_queue.append([flt(row.actual_qty), row.posting_date, flt(row.stock_value_difference)]) + fifo_queue.append([flt(row.actual_qty), row.posting_date, flt(row.stock_value_difference)]) - def __compute_outgoing_stock( + def _add_serial_fifo_slots(self, row: dict, fifo_queue: list, serial_nos: list) -> None: + valuation = row.stock_value_difference / row.actual_qty + for serial_no in serial_nos: + posting_date = self.serial_no_details.setdefault(serial_no, row.posting_date) + fifo_queue.append([serial_no, posting_date, valuation]) + + def _add_batch_fifo_slots(self, row: dict, fifo_queue: list, batch_nos: list) -> None: + for batch_no, use_batchwise_valuation, qty, stock_value_difference in batch_nos: + qty, stock_value_difference = self._neutralize_negative_batch_stock( + fifo_queue, row, batch_no, use_batchwise_valuation, qty, stock_value_difference + ) + + if not qty: + continue + + posting_date = self.batch_no_details.setdefault(batch_no, row.posting_date) + fifo_queue.append([batch_no, use_batchwise_valuation, qty, posting_date, stock_value_difference]) + + def _neutralize_negative_batch_stock( + self, + fifo_queue: list, + row: dict, + batch_no: str, + use_batchwise_valuation: bool, + qty: float, + stock_value_difference: float, + ) -> tuple[float, float]: + qty = flt(qty) + stock_value_difference = flt(stock_value_difference) + + if not qty: + return qty, stock_value_difference + + for slot in list(fifo_queue): + if not self._is_matching_negative_batch_slot(slot, batch_no, use_batchwise_valuation): + continue + + qty_to_adjust = min(qty, abs(flt(slot[BATCH_SLOT_QTY_INDEX]))) + value_to_adjust = ( + stock_value_difference + if qty_to_adjust == qty + else flt(stock_value_difference * (qty_to_adjust / qty)) + ) + + slot[BATCH_SLOT_QTY_INDEX] = flt(slot[BATCH_SLOT_QTY_INDEX]) + qty_to_adjust + slot[BATCH_SLOT_DATE_INDEX] = row.posting_date + slot[BATCH_SLOT_VALUE_INDEX] = flt(slot[BATCH_SLOT_VALUE_INDEX]) + value_to_adjust + + qty = flt(qty - qty_to_adjust) + stock_value_difference = flt(stock_value_difference - value_to_adjust) + + if not flt(slot[BATCH_SLOT_QTY_INDEX]) and not flt(slot[BATCH_SLOT_VALUE_INDEX]): + fifo_queue.remove(slot) + + if not qty: + break + + return qty, stock_value_difference + + def _is_matching_negative_batch_slot( + self, slot: list, batch_no: str, use_batchwise_valuation: bool, include_zero_qty: bool = False + ) -> bool: + if not is_batch_slot(slot): + return False + + qty = flt(slot[BATCH_SLOT_QTY_INDEX]) + + return ( + slot[BATCH_SLOT_BATCH_INDEX] == batch_no + and slot[BATCH_SLOT_VALUATION_INDEX] == use_batchwise_valuation + and (qty <= 0 if include_zero_qty else qty < 0) + ) + + def _add_to_negative_fifo_head(self, row: dict, fifo_queue: list) -> None: + fifo_queue[0][FIFO_QTY_INDEX] += flt(row.actual_qty) + fifo_queue[0][FIFO_DATE_INDEX] = row.posting_date + fifo_queue[0][FIFO_VALUE_INDEX] += flt(row.stock_value_difference) + + def _compute_outgoing_stock( self, row: dict, fifo_queue: list, transfer_key: tuple, serial_nos: list, batch_nos: list ): "Update FIFO Queue on outward stock." if serial_nos: - fifo_queue[:] = [serial_no for serial_no in fifo_queue if serial_no[0] not in serial_nos] + self._consume_serial_fifo_slots(fifo_queue, serial_nos) elif batch_nos: - for batch_no, use_batchwise_valuation, qty, stock_value_difference in batch_nos: - items_to_remove = [] - - for slot in fifo_queue: - slot_batch_no, slot_use_batchwise_valuation, slot_qty, _, slot_stock_value = slot - - if flt(slot_qty) <= 0: - continue - - # Batchwise valuation: consume only from same batch - if use_batchwise_valuation: - if slot_batch_no != batch_no: - continue - # Non-batchwise valuation: consume from any non-batchwise batch - else: - if slot_use_batchwise_valuation: - continue - - if flt(slot_qty) <= qty: - qty -= flt(slot_qty) - stock_value_difference -= flt(slot_stock_value) - self.transferred_item_details[transfer_key].append( - [flt(slot_qty), slot[3], flt(slot_stock_value)] - ) - items_to_remove.append(slot) - else: - slot[2] = flt(slot_qty) - qty - # preserve ledger valuation (moving average / SLE value), not slot proportional value - slot[4] = flt(slot_stock_value) - stock_value_difference - self.transferred_item_details[transfer_key].append( - [qty, slot[3], stock_value_difference] - ) - qty = 0 - stock_value_difference = 0 - break - - for item in items_to_remove: - fifo_queue.remove(item) - - if qty: - fifo_queue.append( - [ - batch_no, - use_batchwise_valuation, - -(qty), - row.posting_date, - -(stock_value_difference), - ] - ) - self.transferred_item_details[transfer_key].append( - [qty, row.posting_date, stock_value_difference] - ) + self._consume_batch_fifo_slots(row, fifo_queue, transfer_key, batch_nos) else: - qty_to_pop = abs(row.actual_qty) - stock_value = abs(row.stock_value_difference) + self._consume_fifo_slots(row, fifo_queue, transfer_key) - while qty_to_pop: - slot = fifo_queue[0] if fifo_queue else [0, None, 0] - if 0 < flt(slot[0]) <= qty_to_pop: - # qty to pop >= slot qty - # if +ve and not enough or exactly same balance in current slot, consume whole slot - qty_to_pop -= flt(slot[0]) - stock_value -= flt(slot[2]) - self.transferred_item_details[transfer_key].append(fifo_queue.pop(0)) - elif not fifo_queue: - # negative stock, no balance but qty yet to consume - fifo_queue.append([-(qty_to_pop), row.posting_date, -(stock_value)]) + def _consume_serial_fifo_slots(self, fifo_queue: list, serial_nos: list) -> None: + fifo_queue[:] = [slot for slot in fifo_queue if slot[FIFO_QTY_INDEX] not in serial_nos] + + def _consume_batch_fifo_slots( + self, row: dict, fifo_queue: list, transfer_key: tuple, batch_nos: list + ) -> None: + for batch_no, use_batchwise_valuation, qty, stock_value_difference in batch_nos: + items_to_remove = [] + + for slot in fifo_queue: + if not self._can_consume_batch_slot(slot, batch_no, use_batchwise_valuation): + continue + + slot_qty = flt(slot[BATCH_SLOT_QTY_INDEX]) + slot_stock_value = flt(slot[BATCH_SLOT_VALUE_INDEX]) + + if slot_qty <= qty: + qty -= slot_qty + stock_value_difference -= slot_stock_value self.transferred_item_details[transfer_key].append( - [qty_to_pop, row.posting_date, stock_value] + [slot_qty, slot[BATCH_SLOT_DATE_INDEX], slot_stock_value] ) - qty_to_pop = 0 - stock_value = 0 + items_to_remove.append(slot) else: - # qty to pop < slot qty, ample balance - # consume actual_qty from first slot - slot[0] = flt(slot[0]) - qty_to_pop - slot[2] = flt(slot[2]) - stock_value - self.transferred_item_details[transfer_key].append([qty_to_pop, slot[1], stock_value]) - qty_to_pop = 0 - stock_value = 0 + slot[BATCH_SLOT_QTY_INDEX] = slot_qty - qty + # Preserve ledger valuation (moving average / SLE value), not slot proportional value. + slot[BATCH_SLOT_VALUE_INDEX] = slot_stock_value - stock_value_difference + self.transferred_item_details[transfer_key].append( + [qty, slot[BATCH_SLOT_DATE_INDEX], stock_value_difference] + ) + qty = 0 + stock_value_difference = 0 + break - def __adjust_incoming_transfer_qty( - self, transfer_data: dict, fifo_queue: list, row: dict, batch_nos: list | None = None + for item in items_to_remove: + fifo_queue.remove(item) + + if qty: + self._append_negative_batch_slot( + row, + fifo_queue, + transfer_key, + batch_no, + use_batchwise_valuation, + qty, + stock_value_difference, + ) + + def _can_consume_batch_slot(self, slot: list, batch_no: str, use_batchwise_valuation: bool) -> bool: + if not is_batch_slot(slot): + return False + + if flt(slot[BATCH_SLOT_QTY_INDEX]) <= 0: + return False + + if use_batchwise_valuation: + return slot[BATCH_SLOT_BATCH_INDEX] == batch_no + + return not slot[BATCH_SLOT_VALUATION_INDEX] + + def _append_negative_batch_slot( + self, + row: dict, + fifo_queue: list, + transfer_key: tuple, + batch_no: str, + use_batchwise_valuation: bool, + qty: float, + stock_value_difference: float, + ) -> None: + fifo_queue.append( + [batch_no, use_batchwise_valuation, -(qty), row.posting_date, -(stock_value_difference)] + ) + self.transferred_item_details[transfer_key].append([qty, row.posting_date, stock_value_difference]) + + def _consume_fifo_slots(self, row: dict, fifo_queue: list, transfer_key: tuple) -> None: + qty_to_pop = abs(row.actual_qty) + stock_value = abs(row.stock_value_difference) + + while qty_to_pop: + slot = fifo_queue[0] if fifo_queue else [0, None, 0] + slot_qty = flt(slot[FIFO_QTY_INDEX]) + slot_value = flt(slot[FIFO_VALUE_INDEX]) + + if 0 < slot_qty <= qty_to_pop: + qty_to_pop -= slot_qty + stock_value -= slot_value + self.transferred_item_details[transfer_key].append(fifo_queue.pop(0)) + elif not fifo_queue: + fifo_queue.append([-(qty_to_pop), row.posting_date, -(stock_value)]) + self.transferred_item_details[transfer_key].append( + [qty_to_pop, row.posting_date, stock_value] + ) + qty_to_pop = 0 + stock_value = 0 + else: + slot[FIFO_QTY_INDEX] = slot_qty - qty_to_pop + slot[FIFO_VALUE_INDEX] = slot_value - stock_value + self.transferred_item_details[transfer_key].append( + [qty_to_pop, slot[FIFO_DATE_INDEX], stock_value] + ) + qty_to_pop = 0 + stock_value = 0 + + def _adjust_incoming_transfer_qty( + self, + transfer_data: dict, + fifo_queue: list, + row: dict, + batch_nos: list | None = None, + serial_nos: list | None = None, ): "Add previously removed stock back to FIFO Queue." transfer_qty_to_pop = flt(row.actual_qty) stock_value = flt(row.stock_value_difference) batch_nos = [list(batch_no) for batch_no in batch_nos or []] - - def is_batch_slot(slot): - return len(slot) == 5 - - def get_incoming_slots(qty, posting_date, value): - if not batch_nos: - return [[qty, posting_date, value]] - - incoming_slots = [] - remaining_qty = flt(qty) - remaining_value = flt(value) - - while remaining_qty and batch_nos: - batch_no, use_batchwise_valuation, batch_qty, _ = batch_nos[0] - batch_qty = flt(batch_qty) - slot_qty = min(batch_qty, remaining_qty) - slot_value = ( - remaining_value - if slot_qty == remaining_qty - else flt(remaining_value * (slot_qty / remaining_qty)) - ) - - incoming_slots.append([batch_no, use_batchwise_valuation, slot_qty, posting_date, slot_value]) - - batch_nos[0][2] = flt(batch_qty - slot_qty) - if not batch_nos[0][2]: - batch_nos.pop(0) - - remaining_qty = flt(remaining_qty - slot_qty) - remaining_value = flt(remaining_value - slot_value) - - if remaining_qty: - incoming_slots.append([remaining_qty, posting_date, remaining_value]) - - return incoming_slots - - def add_to_fifo_queue(slot): - matching_negative_batch_slot = next( - ( - existing_slot - for existing_slot in fifo_queue - if is_batch_slot(existing_slot) - and is_batch_slot(slot) - and flt(existing_slot[2]) <= 0 - and existing_slot[0] == slot[0] - and existing_slot[1] == slot[1] - ), - None, - ) - - if ( - fifo_queue - and not is_batch_slot(fifo_queue[0]) - and not is_batch_slot(slot) - and flt(fifo_queue[0][0]) <= 0 - ): - fifo_queue[0][0] += flt(slot[0]) - fifo_queue[0][1] = slot[1] - fifo_queue[0][2] += flt(slot[2]) - elif matching_negative_batch_slot: - matching_negative_batch_slot[2] += flt(slot[2]) - matching_negative_batch_slot[3] = slot[3] - matching_negative_batch_slot[4] += flt(slot[4]) - else: - fifo_queue.append(slot) + serial_nos = list(serial_nos or []) while transfer_qty_to_pop: - if transfer_data and 0 < flt(transfer_data[0][0]) <= transfer_qty_to_pop: + if transfer_data and 0 < flt(transfer_data[0][FIFO_QTY_INDEX]) <= transfer_qty_to_pop: # bucket qty is not enough, consume whole - transfer_qty = flt(transfer_data[0][0]) - transfer_date = transfer_data[0][1] - transfer_value = flt(transfer_data[0][2]) + transfer_qty = flt(transfer_data[0][FIFO_QTY_INDEX]) + transfer_date = transfer_data[0][FIFO_DATE_INDEX] + transfer_value = flt(transfer_data[0][FIFO_VALUE_INDEX]) transfer_qty_to_pop -= transfer_qty stock_value -= transfer_value - for slot in get_incoming_slots(transfer_qty, transfer_date, transfer_value): - add_to_fifo_queue(slot) + self._add_incoming_transfer_slots( + fifo_queue, batch_nos, transfer_qty, transfer_date, transfer_value, serial_nos + ) transfer_data.pop(0) elif not transfer_data: # transfer bucket is empty, extra incoming qty - for slot in get_incoming_slots(transfer_qty_to_pop, row.posting_date, stock_value): - add_to_fifo_queue(slot) + self._add_incoming_transfer_slots( + fifo_queue, batch_nos, transfer_qty_to_pop, row.posting_date, stock_value, serial_nos + ) transfer_qty_to_pop = 0 stock_value = 0 else: # ample bucket qty to consume - transfer_data[0][0] -= transfer_qty_to_pop - transfer_data[0][2] -= stock_value - for slot in get_incoming_slots(transfer_qty_to_pop, transfer_data[0][1], stock_value): - add_to_fifo_queue(slot) + transfer_data[0][FIFO_QTY_INDEX] -= transfer_qty_to_pop + transfer_data[0][FIFO_VALUE_INDEX] -= stock_value + self._add_incoming_transfer_slots( + fifo_queue, + batch_nos, + transfer_qty_to_pop, + transfer_data[0][FIFO_DATE_INDEX], + stock_value, + serial_nos, + ) transfer_qty_to_pop = 0 stock_value = 0 - def __update_balances(self, row: dict, key: tuple | str): + def _add_incoming_transfer_slots( + self, + fifo_queue: list, + batch_nos: list, + qty: float, + posting_date: str, + value: float, + serial_nos: list | None = None, + ) -> None: + for slot in self._get_incoming_transfer_slots(batch_nos, qty, posting_date, value, serial_nos): + self._add_transfer_slot_to_fifo_queue(fifo_queue, slot) + + def _get_incoming_transfer_slots( + self, + batch_nos: list, + qty: float, + posting_date: str, + value: float, + serial_nos: list | None = None, + ) -> list: + if serial_nos: + return self._get_serial_incoming_transfer_slots(serial_nos, qty, posting_date, value) + + if not batch_nos: + return [[qty, posting_date, value]] + + incoming_slots = [] + remaining_qty = flt(qty) + remaining_value = flt(value) + + while remaining_qty and batch_nos: + batch_no, use_batchwise_valuation, batch_qty, _ = batch_nos[0] + batch_qty = flt(batch_qty) + slot_qty = min(batch_qty, remaining_qty) + slot_value = ( + remaining_value + if slot_qty == remaining_qty + else flt(remaining_value * (slot_qty / remaining_qty)) + ) + + incoming_slots.append([batch_no, use_batchwise_valuation, slot_qty, posting_date, slot_value]) + + batch_nos[0][2] = flt(batch_qty - slot_qty) + if not batch_nos[0][2]: + batch_nos.pop(0) + + remaining_qty = flt(remaining_qty - slot_qty) + remaining_value = flt(remaining_value - slot_value) + + if remaining_qty: + incoming_slots.append([remaining_qty, posting_date, remaining_value]) + + return incoming_slots + + def _get_serial_incoming_transfer_slots( + self, serial_nos: list, qty: float, posting_date: str, value: float + ) -> list: + incoming_slots = [] + remaining_value = flt(value) + serial_count = min(cint(qty), len(serial_nos)) + + for index in range(serial_count): + serial_no = serial_nos.pop(0) + serial_value = remaining_value if index == serial_count - 1 else flt(value / serial_count) + serial_posting_date = self.serial_no_details.setdefault(serial_no, posting_date) + + incoming_slots.append([serial_no, serial_posting_date, serial_value]) + remaining_value = flt(remaining_value - serial_value) + + return incoming_slots + + def _add_transfer_slot_to_fifo_queue(self, fifo_queue: list, slot: list) -> None: + matching_negative_batch_slot = self._get_matching_negative_batch_slot(fifo_queue, slot) + + if ( + fifo_queue + and is_qty_slot(fifo_queue[0]) + and is_qty_slot(slot) + and flt(fifo_queue[0][FIFO_QTY_INDEX]) <= 0 + ): + fifo_queue[0][FIFO_QTY_INDEX] += flt(slot[FIFO_QTY_INDEX]) + fifo_queue[0][FIFO_DATE_INDEX] = slot[FIFO_DATE_INDEX] + fifo_queue[0][FIFO_VALUE_INDEX] += flt(slot[FIFO_VALUE_INDEX]) + elif matching_negative_batch_slot: + matching_negative_batch_slot[BATCH_SLOT_QTY_INDEX] += flt(slot[BATCH_SLOT_QTY_INDEX]) + matching_negative_batch_slot[BATCH_SLOT_DATE_INDEX] = slot[BATCH_SLOT_DATE_INDEX] + matching_negative_batch_slot[BATCH_SLOT_VALUE_INDEX] += flt(slot[BATCH_SLOT_VALUE_INDEX]) + if self._is_empty_batch_slot(matching_negative_batch_slot): + fifo_queue.remove(matching_negative_batch_slot) + else: + fifo_queue.append(slot) + + def _is_empty_batch_slot(self, slot: list) -> bool: + return ( + not flt(slot[BATCH_SLOT_QTY_INDEX]) + and round_off_if_near_zero(slot[BATCH_SLOT_VALUE_INDEX], 2) == 0 + ) + + def _get_matching_negative_batch_slot(self, fifo_queue: list, slot: list) -> list | None: + if not is_batch_slot(slot): + return None + + return next( + ( + existing_slot + for existing_slot in fifo_queue + if self._is_matching_negative_batch_slot( + existing_slot, + slot[BATCH_SLOT_BATCH_INDEX], + slot[BATCH_SLOT_VALUATION_INDEX], + include_zero_qty=True, + ) + ), + None, + ) + + def _update_balances(self, row: dict, key: tuple | str): self.item_details[key]["qty_after_transaction"] = row.qty_after_transaction if "total_qty" not in self.item_details[key]: self.item_details[key]["total_qty"] = row.actual_qty @@ -653,7 +828,7 @@ class FIFOSlots: self.item_details[key]["has_batch_no"] = row.has_batch_no self.item_details[key]["details"].valuation_rate = row.valuation_rate - def __aggregate_details_by_item(self, wh_wise_data: dict) -> dict: + def _aggregate_details_by_item(self, wh_wise_data: dict) -> dict: "Aggregate Item-Wh wise data into single Item entry." item_aggregated_data = {} for key, row in wh_wise_data.items(): @@ -678,9 +853,9 @@ class FIFOSlots: return item_aggregated_data - def __get_stock_ledger_entries(self) -> Iterator[dict]: + def _get_stock_ledger_entries(self) -> Iterator[dict]: sle = frappe.qb.DocType("Stock Ledger Entry") - item = self.__get_item_query() # used as derived table in sle query + item = self._get_item_query() # used as derived table in sle query to_date = get_datetime(self.filters.get("to_date") + " 23:59:59") sle_query = ( @@ -717,7 +892,7 @@ class FIFOSlots: ) if self.filters.get("warehouse"): - sle_query = self.__get_warehouse_conditions(sle, sle_query) + sle_query = self._get_warehouse_conditions(sle, sle_query) elif self.filters.get("warehouse_type"): warehouses = frappe.get_all( "Warehouse", @@ -732,7 +907,7 @@ class FIFOSlots: return sle_query.run(as_dict=True, as_iterator=True) - def __get_bundle_wise_serial_nos(self) -> dict: + def _get_bundle_wise_serial_nos(self) -> dict: bundle = frappe.qb.DocType("Serial and Batch Bundle") entry = frappe.qb.DocType("Serial and Batch Entry") @@ -755,7 +930,7 @@ class FIFOSlots: query = query.where(bundle[field] == self.filters.get(field)) if self.filters.get("warehouse"): - query = self.__get_warehouse_conditions(bundle, query) + query = self._get_warehouse_conditions(bundle, query) bundle_wise_serial_nos = frappe._dict({}) for bundle_name, serial_no in query.run(): @@ -763,7 +938,7 @@ class FIFOSlots: return bundle_wise_serial_nos - def __get_bundle_wise_batch_nos(self, sabb_name=None) -> dict: + def _get_bundle_wise_batch_nos(self, sabb_name=None) -> dict: bundle = frappe.qb.DocType("Serial and Batch Bundle") entry = frappe.qb.DocType("Serial and Batch Entry") batch = frappe.qb.DocType("Batch") @@ -795,7 +970,7 @@ class FIFOSlots: query = query.where(bundle[field] == self.filters.get(field)) if self.filters.get("warehouse"): - query = self.__get_warehouse_conditions(bundle, query) + query = self._get_warehouse_conditions(bundle, query) if sabb_name: query = query.where(bundle.name == sabb_name) @@ -808,7 +983,7 @@ class FIFOSlots: return bundle_wise_batch_nos - def __get_item_query(self) -> str: + def _get_item_query(self) -> str: item_table = frappe.qb.DocType("Item") item = frappe.qb.from_("Item").select( @@ -830,7 +1005,7 @@ class FIFOSlots: return item - def __get_warehouse_conditions(self, sle, sle_query) -> str: + def _get_warehouse_conditions(self, sle, sle_query) -> str: warehouse = frappe.qb.DocType("Warehouse") lft, rgt = frappe.db.get_value("Warehouse", self.filters.get("warehouse"), ["lft", "rgt"]) diff --git a/erpnext/stock/report/stock_ageing/test_stock_ageing.py b/erpnext/stock/report/stock_ageing/test_stock_ageing.py index dee556e1e88..3fdae7ca281 100644 --- a/erpnext/stock/report/stock_ageing/test_stock_ageing.py +++ b/erpnext/stock/report/stock_ageing/test_stock_ageing.py @@ -893,6 +893,33 @@ class TestStockAgeing(ERPNextTestSuite): self.assertEqual(report_data[0][7:15], [8.0, 80.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]) + def test_serial_transfer_replay_preserves_serial_slots(self): + fifo_slots = FIFOSlots(self.filters, []) + transfer_key = ("001", "Serial Item", "WH 1") + fifo_slots.transferred_item_details[transfer_key] = [[2, "2021-12-01", 20]] + + row = frappe._dict( + name="Serial Item", + actual_qty=2, + stock_value_difference=20, + posting_date="2021-12-05", + has_serial_no=True, + ) + fifo_queue = [] + + fifo_slots._compute_incoming_stock(row, fifo_queue, transfer_key, ["SN-A", "SN-B"], []) + + self.assertEqual(fifo_queue, [["SN-A", "2021-12-01", 10.0], ["SN-B", "2021-12-01", 10.0]]) + self.assertFalse(fifo_slots.transferred_item_details[transfer_key]) + + def test_batch_transfer_replay_removes_zeroed_negative_slot(self): + fifo_slots = FIFOSlots(self.filters, []) + fifo_queue = [["SA-ZERO-BATCH", 1, -4, "2021-12-01", -40]] + + fifo_slots._add_transfer_slot_to_fifo_queue(fifo_queue, ["SA-ZERO-BATCH", 1, 4, "2021-12-02", 40]) + + self.assertEqual(fifo_queue, []) + def test_batchwise_valuation(self): from erpnext.stock.doctype.item.test_item import make_item