fix: consider batchwise valuation in stock ageing report (backport #54919) (#55229)

Co-authored-by: Mihir Kandoi <kandoimihir@gmail.com>
This commit is contained in:
mergify[bot]
2026-05-24 09:48:12 +00:00
committed by GitHub
parent 304474d2f7
commit 418a7fb301
4 changed files with 913 additions and 95 deletions

View File

@@ -7,7 +7,7 @@ from operator import itemgetter
import frappe
from frappe import _
from frappe.query_builder.functions import Count
from frappe.query_builder.functions import Abs, Count
from frappe.utils import cint, date_diff, flt, get_datetime
from erpnext.stock.doctype.serial_no.serial_no import get_serial_nos
@@ -31,7 +31,7 @@ def execute(filters: Filters = None) -> tuple:
def format_report_data(filters: Filters, item_details: dict, to_date: str) -> list[dict]:
"Returns ordered, formatted data with ranges."
_func = itemgetter(1)
_func = itemgetter(-2)
data = []
precision = cint(frappe.db.get_single_value("System Settings", "float_precision", cache=True))
@@ -48,15 +48,13 @@ def format_report_data(filters: Filters, item_details: dict, to_date: str) -> li
if not fifo_queue:
continue
fifo_queue = normalize_fifo_queue(fifo_queue)
average_age = get_average_age(fifo_queue, to_date)
earliest_age = date_diff(to_date, fifo_queue[0][1])
latest_age = date_diff(to_date, fifo_queue[-1][1])
range_values = get_range_age(filters, fifo_queue, to_date, item_dict)
check_and_replace_valuations_if_moving_average(
range_values, details.valuation_method, details.valuation_rate
)
row = [details.name, details.item_name, details.description, details.item_group, details.brand]
if filters.get("show_warehouse_wise_stock"):
@@ -78,18 +76,14 @@ def format_report_data(filters: Filters, item_details: dict, to_date: str) -> li
return data
def check_and_replace_valuations_if_moving_average(range_values, item_valuation_method, valuation_rate):
if item_valuation_method == "Moving Average" or (
not item_valuation_method
and frappe.db.get_single_value("Stock Settings", "valuation_method") == "Moving Average"
):
for i in range(0, len(range_values), 2):
range_values[i + 1] = range_values[i] * valuation_rate
def normalize_fifo_queue(fifo_queue: list) -> list:
"""Convert batch valuation slots to the standard [qty, posting_date, value] shape."""
return [slot[2:] if len(slot) == 5 else slot for slot in fifo_queue]
def get_average_age(fifo_queue: list, to_date: str) -> float:
batch_age = age_qty = total_qty = 0.0
for batch in fifo_queue:
for batch in normalize_fifo_queue(fifo_queue):
batch_age = date_diff(to_date, batch[1])
if isinstance(batch[0], int | float):
@@ -235,7 +229,9 @@ class FIFOSlots:
def __init__(self, filters: dict | None = None, sle: list | None = None):
self.item_details = {}
self.transferred_item_details = {}
self.serial_no_batch_purchase_details = {}
self.serial_no_details = {}
self.batch_no_details = {}
self.batchwise_valuation_by_batch = {}
self.filters = filters
self.sle = sle
@@ -252,10 +248,13 @@ class FIFOSlots:
from erpnext.stock.serial_batch_bundle import get_serial_nos_from_bundle
stock_ledger_entries = self.sle
use_prefetched_bundle_data = stock_ledger_entries is None
bundle_wise_serial_nos = frappe._dict({})
if stock_ledger_entries is None:
bundle_wise_batch_nos = frappe._dict({})
if use_prefetched_bundle_data:
bundle_wise_serial_nos = self.__get_bundle_wise_serial_nos()
bundle_wise_batch_nos = self.__get_bundle_wise_batch_nos()
# prepare single sle voucher detail lookup
self.prepare_stock_reco_voucher_wise_count()
@@ -287,17 +286,37 @@ class FIFOSlots:
d.actual_qty = flt(d.qty_after_transaction) - flt(prev_balance_qty)
serial_nos = get_serial_nos(d.serial_no) if d.serial_no else []
if d.serial_and_batch_bundle and d.has_serial_no:
if bundle_wise_serial_nos:
serial_nos = bundle_wise_serial_nos.get(d.serial_and_batch_bundle) or []
else:
serial_nos = sorted(get_serial_nos_from_bundle(d.serial_and_batch_bundle)) or []
batch_nos = (
[
[
d.batch_no.upper(),
self.__get_batchwise_valuation(d.batch_no),
abs(d.actual_qty),
abs(d.stock_value_difference),
]
]
if d.batch_no
else []
)
if d.serial_and_batch_bundle:
if d.has_serial_no:
if use_prefetched_bundle_data:
serial_nos = bundle_wise_serial_nos.get(d.serial_and_batch_bundle) or []
else:
serial_nos = sorted(get_serial_nos_from_bundle(d.serial_and_batch_bundle)) or []
elif d.has_batch_no:
if use_prefetched_bundle_data:
batch_nos = bundle_wise_batch_nos.get(d.serial_and_batch_bundle) or []
else:
batch_nos = self.__get_bundle_wise_batch_nos(d.serial_and_batch_bundle).get(
d.serial_and_batch_bundle, []
)
serial_nos = self.uppercase_serial_nos(serial_nos)
if d.actual_qty > 0:
self.__compute_incoming_stock(d, fifo_queue, transferred_item_key, serial_nos)
self.__compute_incoming_stock(d, fifo_queue, transferred_item_key, serial_nos, batch_nos)
else:
self.__compute_outgoing_stock(d, fifo_queue, transferred_item_key, serial_nos)
self.__compute_outgoing_stock(d, fifo_queue, transferred_item_key, serial_nos, batch_nos)
self.__update_balances(d, key)
@@ -322,6 +341,14 @@ class FIFOSlots:
"Convert serial nos to uppercase for uniformity."
return [sn.upper() for sn in serial_nos]
def __get_batchwise_valuation(self, batch_no: str):
if batch_no not in self.batchwise_valuation_by_batch:
self.batchwise_valuation_by_batch[batch_no] = frappe.db.get_value(
"Batch", batch_no, "use_batchwise_valuation"
)
return self.batchwise_valuation_by_batch[batch_no]
def __init_key_stores(self, row: dict) -> tuple:
"Initialise keys and FIFO Queue."
@@ -334,102 +361,286 @@ class FIFOSlots:
return key, fifo_queue, transferred_item_key
def __compute_incoming_stock(self, row: dict, fifo_queue: list, transfer_key: tuple, serial_nos: list):
def __compute_incoming_stock(
self, row: dict, fifo_queue: list, transfer_key: tuple, serial_nos: list, batch_nos: list
):
"Update FIFO Queue on inward stock."
def set_fifo_queue_for_serial_items():
valuation = row.stock_value_difference / row.actual_qty
for serial_no in serial_nos:
if self.serial_no_details.get(serial_no):
fifo_queue.append([serial_no, self.serial_no_details.get(serial_no), valuation])
else:
self.serial_no_details.setdefault(serial_no, row.posting_date)
fifo_queue.append([serial_no, row.posting_date, valuation])
def set_fifo_queue_for_batch_items():
for batch_no, use_batchwise_valuation, qty, stock_value_difference in batch_nos:
qty, stock_value_difference = neutralize_negative_batch_stock(
batch_no, use_batchwise_valuation, qty, stock_value_difference
)
if not qty:
continue
if self.batch_no_details.get(batch_no):
fifo_queue.append(
[
batch_no,
use_batchwise_valuation,
qty,
self.batch_no_details.get(batch_no),
stock_value_difference,
]
)
else:
self.batch_no_details.setdefault(batch_no, row.posting_date)
fifo_queue.append(
[batch_no, use_batchwise_valuation, qty, row.posting_date, stock_value_difference]
)
def neutralize_negative_batch_stock(batch_no, use_batchwise_valuation, qty, stock_value_difference):
qty = flt(qty)
stock_value_difference = flt(stock_value_difference)
if not qty:
return qty, stock_value_difference
for slot in list(fifo_queue):
if (
len(slot) != 5
or slot[0] != batch_no
or slot[1] != use_batchwise_valuation
or flt(slot[2]) >= 0
):
continue
qty_to_adjust = min(qty, abs(flt(slot[2])))
value_to_adjust = (
stock_value_difference
if qty_to_adjust == qty
else flt(stock_value_difference * (qty_to_adjust / qty))
)
slot[2] = flt(slot[2]) + qty_to_adjust
slot[3] = row.posting_date
slot[4] = flt(slot[4]) + value_to_adjust
qty = flt(qty - qty_to_adjust)
stock_value_difference = flt(stock_value_difference - value_to_adjust)
if not flt(slot[2]) and not flt(slot[4]):
fifo_queue.remove(slot)
if not qty:
break
return qty, stock_value_difference
transfer_data = self.transferred_item_details.get(transfer_key)
if transfer_data:
# inward/outward from same voucher, item & warehouse
# eg: Repack with same item, Stock reco for batch item
# consume transfer data and add stock to fifo queue
self.__adjust_incoming_transfer_qty(transfer_data, fifo_queue, row)
self.__adjust_incoming_transfer_qty(transfer_data, fifo_queue, row, batch_nos)
else:
if not serial_nos and not row.get("has_serial_no"):
if fifo_queue and flt(fifo_queue[0][0]) <= 0:
# neutralize 0/negative stock by adding positive stock
fifo_queue[0][0] += flt(row.actual_qty)
fifo_queue[0][1] = row.posting_date
fifo_queue[0][2] += flt(row.stock_value_difference)
else:
fifo_queue.append(
[flt(row.actual_qty), row.posting_date, flt(row.stock_value_difference)]
)
return
if serial_nos and row.get("has_serial_no"):
set_fifo_queue_for_serial_items()
elif batch_nos and row.get("has_batch_no"):
set_fifo_queue_for_batch_items()
elif fifo_queue and flt(fifo_queue[0][0]) <= 0:
# neutralize 0/negative stock by adding positive stock
fifo_queue[0][0] += flt(row.actual_qty)
fifo_queue[0][1] = row.posting_date
fifo_queue[0][2] += flt(row.stock_value_difference)
else:
fifo_queue.append([flt(row.actual_qty), row.posting_date, flt(row.stock_value_difference)])
valuation = row.stock_value_difference / row.actual_qty
for serial_no in serial_nos:
if self.serial_no_batch_purchase_details.get(serial_no):
fifo_queue.append(
[serial_no, self.serial_no_batch_purchase_details.get(serial_no), valuation]
)
else:
self.serial_no_batch_purchase_details.setdefault(serial_no, row.posting_date)
fifo_queue.append([serial_no, row.posting_date, valuation])
def __compute_outgoing_stock(self, row: dict, fifo_queue: list, transfer_key: tuple, serial_nos: list):
def __compute_outgoing_stock(
self, row: dict, fifo_queue: list, transfer_key: tuple, serial_nos: list, batch_nos: list
):
"Update FIFO Queue on outward stock."
if serial_nos:
fifo_queue[:] = [serial_no for serial_no in fifo_queue if serial_no[0] not in serial_nos]
return
elif batch_nos:
for batch_no, use_batchwise_valuation, qty, stock_value_difference in batch_nos:
items_to_remove = []
qty_to_pop = abs(row.actual_qty)
stock_value = abs(row.stock_value_difference)
for slot in fifo_queue:
slot_batch_no, slot_use_batchwise_valuation, slot_qty, _, slot_stock_value = slot
while qty_to_pop:
slot = fifo_queue[0] if fifo_queue else [0, None, 0]
if 0 < flt(slot[0]) <= qty_to_pop:
# qty to pop >= slot qty
# if +ve and not enough or exactly same balance in current slot, consume whole slot
qty_to_pop -= flt(slot[0])
stock_value -= flt(slot[2])
self.transferred_item_details[transfer_key].append(fifo_queue.pop(0))
elif not fifo_queue:
# negative stock, no balance but qty yet to consume
fifo_queue.append([-(qty_to_pop), row.posting_date, -(stock_value)])
self.transferred_item_details[transfer_key].append(
[qty_to_pop, row.posting_date, stock_value]
)
qty_to_pop = 0
stock_value = 0
else:
# qty to pop < slot qty, ample balance
# consume actual_qty from first slot
slot[0] = flt(slot[0]) - qty_to_pop
slot[2] = flt(slot[2]) - stock_value
self.transferred_item_details[transfer_key].append([qty_to_pop, slot[1], stock_value])
qty_to_pop = 0
stock_value = 0
if flt(slot_qty) <= 0:
continue
def __adjust_incoming_transfer_qty(self, transfer_data: dict, fifo_queue: list, row: dict):
# Batchwise valuation: consume only from same batch
if use_batchwise_valuation:
if slot_batch_no != batch_no:
continue
# Non-batchwise valuation: consume from any non-batchwise batch
else:
if slot_use_batchwise_valuation:
continue
if flt(slot_qty) <= qty:
qty -= flt(slot_qty)
stock_value_difference -= flt(slot_stock_value)
self.transferred_item_details[transfer_key].append(
[flt(slot_qty), slot[3], flt(slot_stock_value)]
)
items_to_remove.append(slot)
else:
slot[2] = flt(slot_qty) - qty
# preserve ledger valuation (moving average / SLE value), not slot proportional value
slot[4] = flt(slot_stock_value) - stock_value_difference
self.transferred_item_details[transfer_key].append(
[qty, slot[3], stock_value_difference]
)
qty = 0
stock_value_difference = 0
break
for item in items_to_remove:
fifo_queue.remove(item)
if qty:
fifo_queue.append(
[
batch_no,
use_batchwise_valuation,
-(qty),
row.posting_date,
-(stock_value_difference),
]
)
self.transferred_item_details[transfer_key].append(
[qty, row.posting_date, stock_value_difference]
)
else:
qty_to_pop = abs(row.actual_qty)
stock_value = abs(row.stock_value_difference)
while qty_to_pop:
slot = fifo_queue[0] if fifo_queue else [0, None, 0]
if 0 < flt(slot[0]) <= qty_to_pop:
# qty to pop >= slot qty
# if +ve and not enough or exactly same balance in current slot, consume whole slot
qty_to_pop -= flt(slot[0])
stock_value -= flt(slot[2])
self.transferred_item_details[transfer_key].append(fifo_queue.pop(0))
elif not fifo_queue:
# negative stock, no balance but qty yet to consume
fifo_queue.append([-(qty_to_pop), row.posting_date, -(stock_value)])
self.transferred_item_details[transfer_key].append(
[qty_to_pop, row.posting_date, stock_value]
)
qty_to_pop = 0
stock_value = 0
else:
# qty to pop < slot qty, ample balance
# consume actual_qty from first slot
slot[0] = flt(slot[0]) - qty_to_pop
slot[2] = flt(slot[2]) - stock_value
self.transferred_item_details[transfer_key].append([qty_to_pop, slot[1], stock_value])
qty_to_pop = 0
stock_value = 0
def __adjust_incoming_transfer_qty(
self, transfer_data: dict, fifo_queue: list, row: dict, batch_nos: list | None = None
):
"Add previously removed stock back to FIFO Queue."
transfer_qty_to_pop = flt(row.actual_qty)
stock_value = flt(row.stock_value_difference)
batch_nos = [list(batch_no) for batch_no in batch_nos or []]
def is_batch_slot(slot):
return len(slot) == 5
def get_incoming_slots(qty, posting_date, value):
if not batch_nos:
return [[qty, posting_date, value]]
incoming_slots = []
remaining_qty = flt(qty)
remaining_value = flt(value)
while remaining_qty and batch_nos:
batch_no, use_batchwise_valuation, batch_qty, _ = batch_nos[0]
batch_qty = flt(batch_qty)
slot_qty = min(batch_qty, remaining_qty)
slot_value = (
remaining_value
if slot_qty == remaining_qty
else flt(remaining_value * (slot_qty / remaining_qty))
)
incoming_slots.append([batch_no, use_batchwise_valuation, slot_qty, posting_date, slot_value])
batch_nos[0][2] = flt(batch_qty - slot_qty)
if not batch_nos[0][2]:
batch_nos.pop(0)
remaining_qty = flt(remaining_qty - slot_qty)
remaining_value = flt(remaining_value - slot_value)
if remaining_qty:
incoming_slots.append([remaining_qty, posting_date, remaining_value])
return incoming_slots
def add_to_fifo_queue(slot):
if fifo_queue and flt(fifo_queue[0][0]) <= 0:
# neutralize 0/negative stock by adding positive stock
matching_negative_batch_slot = next(
(
existing_slot
for existing_slot in fifo_queue
if is_batch_slot(existing_slot)
and is_batch_slot(slot)
and flt(existing_slot[2]) <= 0
and existing_slot[0] == slot[0]
and existing_slot[1] == slot[1]
),
None,
)
if (
fifo_queue
and not is_batch_slot(fifo_queue[0])
and not is_batch_slot(slot)
and flt(fifo_queue[0][0]) <= 0
):
fifo_queue[0][0] += flt(slot[0])
fifo_queue[0][1] = slot[1]
fifo_queue[0][2] += flt(slot[2])
elif matching_negative_batch_slot:
matching_negative_batch_slot[2] += flt(slot[2])
matching_negative_batch_slot[3] = slot[3]
matching_negative_batch_slot[4] += flt(slot[4])
else:
fifo_queue.append(slot)
while transfer_qty_to_pop:
if transfer_data and 0 < transfer_data[0][0] <= transfer_qty_to_pop:
if transfer_data and 0 < flt(transfer_data[0][0]) <= transfer_qty_to_pop:
# bucket qty is not enough, consume whole
transfer_qty_to_pop -= transfer_data[0][0]
stock_value -= transfer_data[0][2]
add_to_fifo_queue(transfer_data.pop(0))
transfer_qty = flt(transfer_data[0][0])
transfer_date = transfer_data[0][1]
transfer_value = flt(transfer_data[0][2])
transfer_qty_to_pop -= transfer_qty
stock_value -= transfer_value
for slot in get_incoming_slots(transfer_qty, transfer_date, transfer_value):
add_to_fifo_queue(slot)
transfer_data.pop(0)
elif not transfer_data:
# transfer bucket is empty, extra incoming qty
add_to_fifo_queue([transfer_qty_to_pop, row.posting_date, stock_value])
for slot in get_incoming_slots(transfer_qty_to_pop, row.posting_date, stock_value):
add_to_fifo_queue(slot)
transfer_qty_to_pop = 0
stock_value = 0
else:
# ample bucket qty to consume
transfer_data[0][0] -= transfer_qty_to_pop
transfer_data[0][2] -= stock_value
add_to_fifo_queue([transfer_qty_to_pop, transfer_data[0][1], stock_value])
for slot in get_incoming_slots(transfer_qty_to_pop, transfer_data[0][1], stock_value):
add_to_fifo_queue(slot)
transfer_qty_to_pop = 0
stock_value = 0
@@ -441,6 +652,7 @@ class FIFOSlots:
self.item_details[key]["total_qty"] += row.actual_qty
self.item_details[key]["has_serial_no"] = row.has_serial_no
self.item_details[key]["has_batch_no"] = row.has_batch_no
self.item_details[key]["details"].valuation_rate = row.valuation_rate
def __aggregate_details_by_item(self, wh_wise_data: dict) -> dict:
@@ -464,6 +676,7 @@ class FIFOSlots:
item_row["qty_after_transaction"] += flt(row["qty_after_transaction"])
item_row["total_qty"] += flt(row["total_qty"])
item_row["has_serial_no"] = row["has_serial_no"]
item_row["has_batch_no"] = row["has_batch_no"]
return item_aggregated_data
@@ -482,8 +695,8 @@ class FIFOSlots:
item.brand,
item.description,
item.stock_uom,
item.has_batch_no,
item.has_serial_no,
item.valuation_method,
sle.actual_qty,
sle.stock_value_difference,
sle.valuation_rate,
@@ -524,33 +737,97 @@ class FIFOSlots:
def __get_bundle_wise_serial_nos(self) -> dict:
bundle = frappe.qb.DocType("Serial and Batch Bundle")
entry = frappe.qb.DocType("Serial and Batch Entry")
sle = frappe.qb.DocType("Stock Ledger Entry")
query = (
frappe.qb.from_(bundle)
.join(entry)
.on(bundle.name == entry.parent)
.select(bundle.name, entry.serial_no)
.where((bundle.docstatus == 1) & (entry.serial_no.isnotnull()))
)
to_date = get_datetime(self.filters.get("to_date") + " 23:59:59")
query = (
query.join(sle)
.on(sle.serial_and_batch_bundle == bundle.name)
.where(
(bundle.docstatus == 1)
& (entry.serial_no.isnotnull())
& (bundle.company == self.filters.get("company"))
& (bundle.posting_date <= self.filters.get("to_date"))
(sle.company == self.filters.get("company"))
& (sle.posting_datetime <= to_date)
& (sle.is_cancelled != 1)
)
)
for field in ["item_code"]:
if self.filters.get(field):
query = query.where(bundle[field] == self.filters.get(field))
query = query.where(sle[field] == self.filters.get(field))
if self.filters.get("warehouse"):
query = self.__get_warehouse_conditions(bundle, query)
query = self.__get_warehouse_conditions(sle, query)
bundle_wise_serial_nos = frappe._dict({})
for bundle_name, serial_no in query.run():
for bundle_name, serial_no in query.distinct().run():
bundle_wise_serial_nos.setdefault(bundle_name, []).append(serial_no)
return bundle_wise_serial_nos
def __get_bundle_wise_batch_nos(self, sabb_name=None) -> dict:
bundle = frappe.qb.DocType("Serial and Batch Bundle")
entry = frappe.qb.DocType("Serial and Batch Entry")
batch = frappe.qb.DocType("Batch")
sle = frappe.qb.DocType("Stock Ledger Entry")
query = (
frappe.qb.from_(bundle)
.join(entry)
.on(bundle.name == entry.parent)
.join(batch)
.on(entry.batch_no == batch.name)
.select(
bundle.name,
entry.batch_no,
batch.use_batchwise_valuation,
Abs(entry.qty).as_("qty"),
Abs(entry.stock_value_difference).as_("stock_value_difference"),
)
.where((bundle.docstatus == 1) & (entry.batch_no.isnotnull()))
)
if sabb_name:
query = query.where(bundle.name == sabb_name)
else:
to_date = get_datetime(self.filters.get("to_date") + " 23:59:59")
query = (
query.join(sle)
.on(sle.serial_and_batch_bundle == bundle.name)
.where(
(sle.company == self.filters.get("company"))
& (sle.posting_datetime <= to_date)
& (sle.is_cancelled != 1)
)
)
for field in ["item_code"]:
if self.filters.get(field):
query = query.where(sle[field] == self.filters.get(field))
if self.filters.get("warehouse"):
query = self.__get_warehouse_conditions(sle, query)
bundle_wise_batch_nos = frappe._dict({})
for (
bundle_name,
batch_no,
use_batchwise_valuation,
qty,
stock_value_difference,
) in query.distinct().run():
bundle_wise_batch_nos.setdefault(bundle_name, []).append(
[batch_no.upper(), use_batchwise_valuation, qty, stock_value_difference]
)
return bundle_wise_batch_nos
def __get_item_query(self) -> str:
item_table = frappe.qb.DocType("Item")
@@ -562,7 +839,7 @@ class FIFOSlots:
"brand",
"item_group",
"has_serial_no",
"valuation_method",
"has_batch_no",
)
if self.filters.get("item_code"):

View File

@@ -4,7 +4,11 @@
import frappe
from frappe.tests.utils import FrappeTestCase
from erpnext.stock.report.stock_ageing.stock_ageing import FIFOSlots, format_report_data
from erpnext.stock.report.stock_ageing.stock_ageing import (
FIFOSlots,
format_report_data,
get_average_age,
)
class TestStockAgeing(FrappeTestCase):
@@ -868,6 +872,533 @@ class TestStockAgeing(FrappeTestCase):
range_valuations = range_values[1::2]
self.assertEqual(range_valuations, [15, 7.5, 20, 5])
def test_batch_item_report_formatting_preserves_mixed_fifo_slots(self):
item_details = {
"Batch Mixed Item": {
"details": frappe._dict(
name="Batch Mixed Item",
item_name="Batch Mixed Item",
description="Batch Mixed Item",
item_group=None,
brand=None,
has_batch_no=True,
stock_uom="Nos",
),
"fifo_queue": [
["SA-BATCH-MIXED-SLOT", 1, 5.0, "2021-12-01", 50.0],
[3.0, "2021-12-02", 30.0],
],
"has_serial_no": False,
"total_qty": 8.0,
}
}
report_data = format_report_data(self.filters, item_details, self.filters["to_date"])
self.assertEqual(report_data[0][7:15], [8.0, 80.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
def test_average_age_accepts_batchwise_valuation_slots(self):
fifo_queue = [["SA-BATCH-SLOT", 1, 5.0, "2021-12-01", 50.0]]
self.assertEqual(get_average_age(fifo_queue, self.filters["to_date"]), 9.0)
def test_batchwise_valuation(self):
from erpnext.stock.doctype.item.test_item import make_item
item_code = make_item(
"Test Stock Ageing Batchwise Valuation",
{
"is_stock_item": 1,
"has_batch_no": 1,
"valuation_method": "FIFO",
},
).name
def make_batch(batch_id, use_batchwise_valuation):
if not frappe.db.exists("Batch", batch_id):
frappe.get_doc(
{
"doctype": "Batch",
"batch_id": batch_id,
"item": item_code,
}
).insert(ignore_permissions=True)
frappe.db.set_value("Batch", batch_id, "use_batchwise_valuation", use_batchwise_valuation)
batchwise_above_90 = "SA-BATCHWISE-ABOVE-90"
non_batchwise_above_90 = "SA-NON-BATCHWISE-ABOVE-90"
batchwise_61_90 = "SA-BATCHWISE-61-90"
non_batchwise_61_90 = "SA-NON-BATCHWISE-61-90"
batchwise_31_60 = "SA-BATCHWISE-31-60"
non_batchwise_31_60 = "SA-NON-BATCHWISE-31-60"
batchwise_0_30 = "SA-BATCHWISE-0-30"
non_batchwise_0_30 = "SA-NON-BATCHWISE-0-30"
for batch_id, use_batchwise_valuation in {
batchwise_above_90: 1,
non_batchwise_above_90: 0,
batchwise_61_90: 1,
non_batchwise_61_90: 0,
batchwise_31_60: 1,
non_batchwise_31_60: 0,
batchwise_0_30: 1,
non_batchwise_0_30: 0,
}.items():
make_batch(batch_id, use_batchwise_valuation)
qty_after_transaction = 0
def make_sle(posting_date, voucher_no, batch_no, actual_qty, stock_value_difference):
nonlocal qty_after_transaction
qty_after_transaction += actual_qty
return frappe._dict(
name=item_code,
actual_qty=actual_qty,
qty_after_transaction=qty_after_transaction,
stock_value_difference=stock_value_difference,
warehouse="WH 1",
posting_date=posting_date,
voucher_type="Stock Entry",
voucher_no=voucher_no,
has_serial_no=False,
has_batch_no=True,
serial_no=None,
batch_no=batch_no,
valuation_rate=10,
)
sle = [
make_sle("2021-08-01", "001", batchwise_above_90, 50, 500),
make_sle("2021-08-10", "002", non_batchwise_above_90, 60, 600),
make_sle("2021-08-20", "003", batchwise_above_90, -10, -100),
make_sle("2021-09-01", "004", non_batchwise_above_90, -15, -150),
make_sle("2021-09-20", "005", batchwise_61_90, 40, 400),
make_sle("2021-09-25", "006", non_batchwise_61_90, 50, 500),
make_sle("2021-09-30", "007", batchwise_61_90, -5, -50),
make_sle("2021-10-05", "008", non_batchwise_above_90, -20, -200),
make_sle("2021-10-20", "009", batchwise_31_60, 30, 300),
make_sle("2021-10-25", "010", non_batchwise_31_60, 40, 400),
make_sle("2021-10-30", "011", batchwise_31_60, -8, -80),
make_sle("2021-11-05", "012", non_batchwise_above_90, -25, -250),
make_sle("2021-11-20", "013", batchwise_0_30, 20, 200),
make_sle("2021-11-25", "014", non_batchwise_0_30, 30, 300),
make_sle("2021-11-30", "015", batchwise_0_30, -6, -60),
make_sle("2021-12-01", "016", non_batchwise_61_90, -10, -100),
]
slots = FIFOSlots(self.filters, sle).generate()
item_result = slots[item_code]
self.assertEqual(item_result["qty_after_transaction"], item_result["total_qty"])
self.assertEqual(item_result["total_qty"], 221.0)
self.assertEqual(
item_result["fifo_queue"],
[
[batchwise_above_90, 1, 40.0, "2021-08-01", 400.0],
[batchwise_61_90, 1, 35.0, "2021-09-20", 350.0],
[non_batchwise_61_90, 0, 40.0, "2021-09-25", 400.0],
[batchwise_31_60, 1, 22.0, "2021-10-20", 220.0],
[non_batchwise_31_60, 0, 40, "2021-10-25", 400],
[batchwise_0_30, 1, 14.0, "2021-11-20", 140.0],
[non_batchwise_0_30, 0, 30, "2021-11-25", 300],
],
)
report_data = format_report_data(self.filters, slots, self.filters["to_date"])
range_values = report_data[0][7:15]
self.assertEqual(range_values, [44.0, 440.0, 62.0, 620.0, 75.0, 750.0, 40.0, 400.0])
def test_batchwise_valuation_same_voucher_transfer(self):
from erpnext.stock.doctype.item.test_item import make_item
item_code = make_item(
"Test Stock Ageing Batchwise Transfer",
{
"is_stock_item": 1,
"has_batch_no": 1,
"valuation_method": "FIFO",
},
).name
def make_batch(batch_id):
if not frappe.db.exists("Batch", batch_id):
frappe.get_doc(
{
"doctype": "Batch",
"batch_id": batch_id,
"item": item_code,
}
).insert(ignore_permissions=True)
frappe.db.set_value("Batch", batch_id, "use_batchwise_valuation", 1)
source_batch = "SA-BATCHWISE-TRANSFER-SOURCE"
target_batch = "SA-BATCHWISE-TRANSFER-TARGET"
make_batch(source_batch)
make_batch(target_batch)
sle = [
frappe._dict(
name=item_code,
actual_qty=20,
qty_after_transaction=20,
stock_value_difference=200,
warehouse="WH 1",
posting_date="2021-09-01",
voucher_type="Stock Entry",
voucher_no="001",
has_serial_no=False,
has_batch_no=True,
serial_no=None,
batch_no=source_batch,
valuation_rate=10,
),
frappe._dict(
name=item_code,
actual_qty=-15,
qty_after_transaction=5,
stock_value_difference=-150,
warehouse="WH 1",
posting_date="2021-10-01",
voucher_type="Stock Entry",
voucher_no="002",
has_serial_no=False,
has_batch_no=True,
serial_no=None,
batch_no=source_batch,
valuation_rate=10,
),
frappe._dict(
name=item_code,
actual_qty=10,
qty_after_transaction=15,
stock_value_difference=100,
warehouse="WH 1",
posting_date="2021-10-01",
voucher_type="Stock Entry",
voucher_no="002",
has_serial_no=False,
has_batch_no=True,
serial_no=None,
batch_no=target_batch,
valuation_rate=10,
),
]
fifo_slots = FIFOSlots(self.filters, sle)
slots = fifo_slots.generate()
item_result = slots[item_code]
self.assertEqual(item_result["total_qty"], 15.0)
self.assertEqual(
item_result["fifo_queue"],
[
[source_batch, 1, 5.0, "2021-09-01", 50.0],
[target_batch, 1, 10.0, "2021-09-01", 100.0],
],
)
self.assertEqual(
fifo_slots.transferred_item_details[("002", item_code, "WH 1")],
[[5.0, "2021-09-01", 50.0]],
)
def test_batchwise_valuation_negative_stock_same_voucher(self):
from erpnext.stock.doctype.item.test_item import make_item
item_code = make_item(
"Test Stock Ageing Batchwise Negative Stock",
{
"is_stock_item": 1,
"has_batch_no": 1,
"valuation_method": "FIFO",
},
).name
batch_no = "SA-BATCHWISE-NEGATIVE-STOCK"
if not frappe.db.exists("Batch", batch_no):
frappe.get_doc(
{
"doctype": "Batch",
"batch_id": batch_no,
"item": item_code,
}
).insert(ignore_permissions=True)
frappe.db.set_value("Batch", batch_no, "use_batchwise_valuation", 1)
sle = [
frappe._dict(
name=item_code,
actual_qty=-10,
qty_after_transaction=-10,
stock_value_difference=-100,
warehouse="WH 1",
posting_date="2021-12-01",
voucher_type="Stock Entry",
voucher_no="001",
has_serial_no=False,
has_batch_no=True,
serial_no=None,
batch_no=batch_no,
valuation_rate=10,
)
]
fifo_slots = FIFOSlots(self.filters, sle)
slots = fifo_slots.generate()
item_result = slots[item_code]
self.assertEqual(item_result["fifo_queue"], [[batch_no, 1, -10, "2021-12-01", -100]])
self.assertEqual(
fifo_slots.transferred_item_details[("001", item_code, "WH 1")], [[10, "2021-12-01", 100]]
)
sle.append(
frappe._dict(
name=item_code,
actual_qty=6,
qty_after_transaction=-4,
stock_value_difference=60,
warehouse="WH 1",
posting_date="2021-12-01",
voucher_type="Stock Entry",
voucher_no="001",
has_serial_no=False,
has_batch_no=True,
serial_no=None,
batch_no=batch_no,
valuation_rate=10,
)
)
fifo_slots = FIFOSlots(self.filters, sle)
slots = fifo_slots.generate()
item_result = slots[item_code]
self.assertEqual(item_result["fifo_queue"], [[batch_no, 1, -4.0, "2021-12-01", -40.0]])
self.assertEqual(
fifo_slots.transferred_item_details[("001", item_code, "WH 1")],
[[4.0, "2021-12-01", 40.0]],
)
def test_batchwise_valuation_neutralizes_non_head_negative_batch(self):
from erpnext.stock.doctype.item.test_item import make_item
item_code = make_item(
"Test Stock Ageing Batchwise Negative Non Head",
{
"is_stock_item": 1,
"has_batch_no": 1,
"valuation_method": "FIFO",
},
).name
buffer_batch = "SA-BATCHWISE-NEGATIVE-BUFFER"
negative_batch = "SA-BATCHWISE-NEGATIVE-NON-HEAD"
for batch_no in [buffer_batch, negative_batch]:
if not frappe.db.exists("Batch", batch_no):
frappe.get_doc(
{
"doctype": "Batch",
"batch_id": batch_no,
"item": item_code,
}
).insert(ignore_permissions=True)
frappe.db.set_value("Batch", batch_no, "use_batchwise_valuation", 1)
sle = [
frappe._dict(
name=item_code,
actual_qty=5,
qty_after_transaction=5,
stock_value_difference=50,
warehouse="WH 1",
posting_date="2021-11-30",
voucher_type="Stock Entry",
voucher_no="001",
has_serial_no=False,
has_batch_no=True,
serial_no=None,
batch_no=buffer_batch,
valuation_rate=10,
),
frappe._dict(
name=item_code,
actual_qty=-10,
qty_after_transaction=-5,
stock_value_difference=-100,
warehouse="WH 1",
posting_date="2021-12-01",
voucher_type="Stock Entry",
voucher_no="002",
has_serial_no=False,
has_batch_no=True,
serial_no=None,
batch_no=negative_batch,
valuation_rate=10,
),
frappe._dict(
name=item_code,
actual_qty=6,
qty_after_transaction=1,
stock_value_difference=60,
warehouse="WH 1",
posting_date="2021-12-01",
voucher_type="Stock Entry",
voucher_no="002",
has_serial_no=False,
has_batch_no=True,
serial_no=None,
batch_no=negative_batch,
valuation_rate=10,
),
]
fifo_slots = FIFOSlots(self.filters, sle)
slots = fifo_slots.generate()
item_result = slots[item_code]
self.assertEqual(item_result["qty_after_transaction"], item_result["total_qty"])
self.assertEqual(
item_result["fifo_queue"],
[
[buffer_batch, 1, 5, "2021-11-30", 50],
[negative_batch, 1, -4.0, "2021-12-01", -40.0],
],
)
self.assertEqual(
fifo_slots.transferred_item_details[("002", item_code, "WH 1")],
[[4.0, "2021-12-01", 40.0]],
)
def test_batchwise_valuation_negative_stock_later_voucher(self):
from erpnext.stock.doctype.item.test_item import make_item
item_code = make_item(
"Test Stock Ageing Batchwise Negative Later Voucher",
{
"is_stock_item": 1,
"has_batch_no": 1,
"valuation_method": "FIFO",
},
).name
batch_no = "SA-BATCHWISE-NEGATIVE-LATER-VOUCHER"
if not frappe.db.exists("Batch", batch_no):
frappe.get_doc(
{
"doctype": "Batch",
"batch_id": batch_no,
"item": item_code,
}
).insert(ignore_permissions=True)
frappe.db.set_value("Batch", batch_no, "use_batchwise_valuation", 1)
sle = [
frappe._dict(
name=item_code,
actual_qty=-10,
qty_after_transaction=-10,
stock_value_difference=-100,
warehouse="WH 1",
posting_date="2021-11-01",
voucher_type="Stock Entry",
voucher_no="001",
has_serial_no=False,
has_batch_no=True,
serial_no=None,
batch_no=batch_no,
valuation_rate=10,
),
frappe._dict(
name=item_code,
actual_qty=6,
qty_after_transaction=-4,
stock_value_difference=60,
warehouse="WH 1",
posting_date="2021-11-10",
voucher_type="Stock Entry",
voucher_no="002",
has_serial_no=False,
has_batch_no=True,
serial_no=None,
batch_no=batch_no,
valuation_rate=10,
),
]
slots = FIFOSlots(self.filters, sle).generate()
item_result = slots[item_code]
self.assertEqual(item_result["qty_after_transaction"], item_result["total_qty"])
self.assertEqual(item_result["total_qty"], -4.0)
self.assertEqual(item_result["fifo_queue"], [[batch_no, 1, -4.0, "2021-11-10", -40.0]])
def test_batchwise_valuation_stock_reconciliation_with_bundle(self):
from frappe.utils import add_days, getdate, nowdate
from erpnext.stock.doctype.item.test_item import make_item
from erpnext.stock.doctype.serial_and_batch_bundle.test_serial_and_batch_bundle import (
get_batch_from_bundle,
)
from erpnext.stock.doctype.stock_reconciliation.test_stock_reconciliation import (
create_stock_reconciliation,
)
suffix = frappe.generate_hash(length=8).upper()
item_code = make_item(
f"Test Stock Ageing Batch Reco {suffix}",
{
"is_stock_item": 1,
"has_batch_no": 1,
"create_new_batch": 1,
"batch_number_series": f"SA-RECO-{suffix}-.###",
"valuation_method": "FIFO",
},
).name
warehouse = "_Test Warehouse - _TC"
base_date = nowdate()
opening_reco = create_stock_reconciliation(
item_code=item_code,
warehouse=warehouse,
qty=12,
rate=10,
posting_date=add_days(base_date, -2),
posting_time="10:00:00",
)
batch_no = get_batch_from_bundle(opening_reco.items[0].serial_and_batch_bundle)
frappe.db.set_value("Batch", batch_no, "use_batchwise_valuation", 1)
create_stock_reconciliation(
item_code=item_code,
warehouse=warehouse,
qty=5,
rate=10,
batch_no=batch_no,
posting_date=add_days(base_date, -1),
posting_time="10:00:00",
)
filters = frappe._dict(
company="_Test Company",
to_date=base_date,
ranges=["30", "60", "90"],
item_code=item_code,
)
slots = FIFOSlots(filters).generate()
item_result = slots[item_code]
self.assertEqual(item_result["qty_after_transaction"], item_result["total_qty"])
self.assertEqual(item_result["total_qty"], 5.0)
self.assertEqual(
item_result["fifo_queue"], [[batch_no.upper(), 1, 5.0, getdate(add_days(base_date, -2)), 50.0]]
)
def generate_item_and_item_wh_wise_slots(filters, sle):
"Return results with and without 'show_warehouse_wise_stock'"

View File

@@ -15,7 +15,11 @@ from frappe.utils.nestedset import get_descendants_of
import erpnext
from erpnext.stock.doctype.inventory_dimension.inventory_dimension import get_inventory_dimensions
from erpnext.stock.doctype.warehouse.warehouse import apply_warehouse_filter
from erpnext.stock.report.stock_ageing.stock_ageing import FIFOSlots, get_average_age
from erpnext.stock.report.stock_ageing.stock_ageing import (
FIFOSlots,
get_average_age,
normalize_fifo_queue,
)
from erpnext.stock.utils import add_additional_uom_columns
@@ -123,6 +127,7 @@ class StockBalanceReport:
stock_ageing_data = {"average_age": 0, "earliest_age": 0, "latest_age": 0}
if opening_fifo_queue:
fifo_queue = sorted(filter(_func, opening_fifo_queue), key=_func)
fifo_queue = normalize_fifo_queue(fifo_queue)
if not fifo_queue:
continue

View File

@@ -10,7 +10,11 @@ from frappe import _
from frappe.query_builder.functions import Count
from frappe.utils import cint, flt, getdate
from erpnext.stock.report.stock_ageing.stock_ageing import FIFOSlots, get_average_age
from erpnext.stock.report.stock_ageing.stock_ageing import (
FIFOSlots,
get_average_age,
normalize_fifo_queue,
)
from erpnext.stock.report.stock_analytics.stock_analytics import (
get_item_details,
get_items,
@@ -68,6 +72,7 @@ def execute(filters=None):
fifo_queue = item_ageing[item]["fifo_queue"]
average_age = 0.00
if fifo_queue:
fifo_queue = normalize_fifo_queue(fifo_queue)
average_age = get_average_age(fifo_queue, filters["to_date"])
row += [average_age]