Files
erpnext/erpnext/stock/report/stock_ageing/stock_ageing.py
mergify[bot] a1457c759d fix: consider batchwise valuation in stock ageing report (backport #54919) (#55230)
Co-authored-by: Mihir Kandoi <kandoimihir@gmail.com>
fix: consider batchwise valuation in stock ageing report (#54919)
2026-05-24 07:18:07 +00:00

879 lines
27 KiB
Python

# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# License: GNU General Public License v3. See license.txt
from collections.abc import Iterator
from operator import itemgetter
import frappe
from frappe import _
from frappe.query_builder.functions import Abs, Count
from frappe.utils import cint, date_diff, flt, get_datetime
from erpnext.stock.doctype.serial_no.serial_no import get_serial_nos
from erpnext.stock.valuation import round_off_if_near_zero
Filters = frappe._dict
def execute(filters: Filters = None) -> tuple:
to_date = filters["to_date"]
filters.ranges = [num.strip() for num in filters.range.split(",") if num.strip().isdigit()]
columns = get_columns(filters)
item_details = FIFOSlots(filters).generate()
data = format_report_data(filters, item_details, to_date)
chart_data = get_chart_data(data, filters)
return columns, data, None, chart_data
def format_report_data(filters: Filters, item_details: dict, to_date: str) -> list[dict]:
"Returns ordered, formatted data with ranges."
_func = itemgetter(-2)
data = []
precision = cint(frappe.db.get_single_value("System Settings", "float_precision", cache=True))
for _item, item_dict in item_details.items():
if not flt(item_dict.get("total_qty"), precision):
continue
earliest_age, latest_age = 0, 0
details = item_dict["details"]
fifo_queue = sorted(filter(_func, item_dict["fifo_queue"]), key=_func)
if not fifo_queue:
continue
if details.has_batch_no:
fifo_queue = [slot[2:] if len(slot) == 5 else slot for slot in fifo_queue]
average_age = get_average_age(fifo_queue, to_date)
earliest_age = date_diff(to_date, fifo_queue[0][1])
latest_age = date_diff(to_date, fifo_queue[-1][1])
range_values = get_range_age(filters, fifo_queue, to_date, item_dict)
row = [details.name, details.item_name, details.description, details.item_group, details.brand]
if filters.get("show_warehouse_wise_stock"):
row.append(details.warehouse)
row.extend(
[
flt(item_dict.get("total_qty"), precision),
average_age,
*range_values,
earliest_age,
latest_age,
details.stock_uom,
]
)
data.append(row)
return data
def get_average_age(fifo_queue: list, to_date: str) -> float:
batch_age = age_qty = total_qty = 0.0
for batch in fifo_queue:
batch_age = date_diff(to_date, batch[1])
if isinstance(batch[0], int | float):
age_qty += batch_age * batch[0]
total_qty += batch[0]
else:
age_qty += batch_age * 1
total_qty += 1
return flt(age_qty / total_qty, 2) if total_qty else 0.0
def get_range_age(filters: Filters, fifo_queue: list, to_date: str, item_dict: dict) -> list:
precision = cint(frappe.db.get_single_value("System Settings", "float_precision", cache=True))
range_values = [0.0] * ((len(filters.ranges) * 2) + 2)
for item in fifo_queue:
age = flt(date_diff(to_date, item[1]))
qty = flt(item[0]) if not item_dict["has_serial_no"] else 1.0
stock_value = flt(item[2])
for i, age_limit in enumerate(filters.ranges):
if age <= flt(age_limit):
i *= 2
range_values[i] = flt(range_values[i] + qty, precision)
range_values[i + 1] = flt(range_values[i + 1] + stock_value, precision)
if range_values[i] == 0.0 and round_off_if_near_zero(range_values[i + 1], 2) == 0:
range_values[i + 1] = 0.0
break
else:
range_values[-2] = flt(range_values[-2] + qty, precision)
range_values[-1] = flt(range_values[-1] + stock_value, precision)
if range_values[-2] == 0.0 and round_off_if_near_zero(range_values[-1], 2) == 0:
range_values[-1] = 0.0
return range_values
def get_columns(filters: Filters) -> list[dict]:
range_columns = []
setup_ageing_columns(filters, range_columns)
columns = [
{
"label": _("Item Code"),
"fieldname": "item_code",
"fieldtype": "Link",
"options": "Item",
"width": 100,
},
{"label": _("Item Name"), "fieldname": "item_name", "fieldtype": "Data", "width": 100},
{"label": _("Description"), "fieldname": "description", "fieldtype": "Data", "width": 200},
{
"label": _("Item Group"),
"fieldname": "item_group",
"fieldtype": "Link",
"options": "Item Group",
"width": 100,
},
{
"label": _("Brand"),
"fieldname": "brand",
"fieldtype": "Link",
"options": "Brand",
"width": 100,
},
]
if filters.get("show_warehouse_wise_stock"):
columns += [
{
"label": _("Warehouse"),
"fieldname": "warehouse",
"fieldtype": "Link",
"options": "Warehouse",
"width": 100,
}
]
columns.extend(
[
{"label": _("Available Qty"), "fieldname": "qty", "fieldtype": "Float", "width": 100},
{"label": _("Average Age"), "fieldname": "average_age", "fieldtype": "Float", "width": 100},
]
)
columns.extend(range_columns)
columns.extend(
[
{"label": _("Earliest"), "fieldname": "earliest", "fieldtype": "Int", "width": 80},
{"label": _("Latest"), "fieldname": "latest", "fieldtype": "Int", "width": 80},
{"label": _("UOM"), "fieldname": "uom", "fieldtype": "Link", "options": "UOM", "width": 100},
]
)
return columns
def get_chart_data(data: list, filters: Filters) -> dict:
if not data:
return []
labels, datapoints = [], []
if filters.get("show_warehouse_wise_stock"):
return {}
data.sort(key=lambda row: row[6], reverse=True)
if len(data) > 10:
data = data[:10]
for row in data:
labels.append(row[0])
datapoints.append(row[6])
return {
"data": {"labels": labels, "datasets": [{"name": _("Average Age"), "values": datapoints}]},
"type": "bar",
}
def setup_ageing_columns(filters: Filters, range_columns: list):
prev_range_value = 0
ranges = []
for range in filters.ranges:
ranges.append(f"{prev_range_value} - {range}")
prev_range_value = cint(range) + 1
ranges.append(f"{prev_range_value} - Above")
for i, label in enumerate(ranges):
fieldname = "range" + str(i + 1)
add_column(range_columns, label=_("Age ({0})").format(label), fieldname=fieldname)
add_column(range_columns, label=_("Value ({0})").format(label), fieldname=fieldname + "value")
def add_column(range_columns: list, label: str, fieldname: str, fieldtype: str = "Float", width: int = 140):
range_columns.append(dict(label=label, fieldname=fieldname, fieldtype=fieldtype, width=width))
class FIFOSlots:
"Returns FIFO computed slots of inwarded stock as per date."
def __init__(self, filters: dict | None = None, sle: list | None = None):
self.item_details = {}
self.transferred_item_details = {}
self.serial_no_details = {}
self.batch_no_details = {}
self.batchwise_valuation_by_batch = {}
self.filters = filters
self.sle = sle
def generate(self) -> dict:
"""
Returns dict of the foll.g structure:
Key = Item A / (Item A, Warehouse A)
Key: {
'details' -> Dict: ** item details **,
'fifo_queue' -> List: ** list of lists containing entries/slots for existing stock,
consumed/updated and maintained via FIFO. **
}
"""
from erpnext.stock.serial_batch_bundle import get_serial_nos_from_bundle
stock_ledger_entries = self.sle
bundle_wise_serial_nos = frappe._dict({})
bundle_wise_batch_nos = frappe._dict({})
if stock_ledger_entries is None:
bundle_wise_serial_nos = self.__get_bundle_wise_serial_nos()
bundle_wise_batch_nos = self.__get_bundle_wise_batch_nos()
# prepare single sle voucher detail lookup
self.prepare_stock_reco_voucher_wise_count()
with frappe.db.unbuffered_cursor():
if stock_ledger_entries is None:
stock_ledger_entries = self.__get_stock_ledger_entries()
for d in stock_ledger_entries:
key, fifo_queue, transferred_item_key = self.__init_key_stores(d)
prev_balance_qty = self.item_details[key].get("qty_after_transaction", 0)
if d.voucher_type == "Stock Reconciliation" and (
not d.batch_no or d.serial_no or d.serial_and_batch_bundle
):
if d.voucher_detail_no in self.stock_reco_voucher_wise_count:
# for legacy recon with single sle has qty_after_transaction and stock_value_difference without outward entry
# for exisitng handle emptying the existing queue and details.
d.stock_value_difference = flt(d.qty_after_transaction * d.valuation_rate)
d.actual_qty = d.qty_after_transaction
self.item_details[key]["qty_after_transaction"] = 0
self.item_details[key]["total_qty"] = 0
fifo_queue.clear()
else:
d.actual_qty = flt(d.qty_after_transaction) - flt(prev_balance_qty)
elif d.voucher_type == "Stock Reconciliation":
# get difference in qty shift as actual qty
d.actual_qty = flt(d.qty_after_transaction) - flt(prev_balance_qty)
serial_nos = get_serial_nos(d.serial_no) if d.serial_no else []
batch_nos = (
[
[
d.batch_no.upper(),
self.__get_batchwise_valuation(d.batch_no),
abs(d.actual_qty),
abs(d.stock_value_difference),
]
]
if d.batch_no
else []
)
if d.serial_and_batch_bundle:
if d.has_serial_no:
if bundle_wise_serial_nos:
serial_nos = bundle_wise_serial_nos.get(d.serial_and_batch_bundle) or []
else:
serial_nos = sorted(get_serial_nos_from_bundle(d.serial_and_batch_bundle)) or []
elif d.has_batch_no:
if bundle_wise_batch_nos:
batch_nos = bundle_wise_batch_nos.get(d.serial_and_batch_bundle) or []
else:
batch_nos = (
self.__get_bundle_wise_batch_nos(d.serial_and_batch_bundle).get(
d.serial_and_batch_bundle
)
or []
)
serial_nos = self.uppercase_serial_nos(serial_nos)
if d.actual_qty > 0:
self.__compute_incoming_stock(d, fifo_queue, transferred_item_key, serial_nos, batch_nos)
else:
self.__compute_outgoing_stock(d, fifo_queue, transferred_item_key, serial_nos, batch_nos)
self.__update_balances(d, key)
# handle serial nos misconsumption
if d.has_serial_no:
qty_after = cint(self.item_details[key]["qty_after_transaction"])
if qty_after <= 0:
fifo_queue.clear()
elif len(fifo_queue) > qty_after:
fifo_queue[:] = fifo_queue[:qty_after]
# Note that stock_ledger_entries is an iterator, you can not reuse it like a list
del stock_ledger_entries
if not self.filters.get("show_warehouse_wise_stock"):
# (Item 1, WH 1), (Item 1, WH 2) => (Item 1)
self.item_details = self.__aggregate_details_by_item(self.item_details)
return self.item_details
def uppercase_serial_nos(self, serial_nos):
"Convert serial nos to uppercase for uniformity."
return [sn.upper() for sn in serial_nos]
def __get_batchwise_valuation(self, batch_no: str):
if batch_no not in self.batchwise_valuation_by_batch:
self.batchwise_valuation_by_batch[batch_no] = frappe.db.get_value(
"Batch", batch_no, "use_batchwise_valuation"
)
return self.batchwise_valuation_by_batch[batch_no]
def __init_key_stores(self, row: dict) -> tuple:
"Initialise keys and FIFO Queue."
key = (row.name, row.warehouse)
self.item_details.setdefault(key, {"details": row, "fifo_queue": []})
fifo_queue = self.item_details[key]["fifo_queue"]
transferred_item_key = (row.voucher_no, row.name, row.warehouse)
self.transferred_item_details.setdefault(transferred_item_key, [])
return key, fifo_queue, transferred_item_key
def __compute_incoming_stock(
self, row: dict, fifo_queue: list, transfer_key: tuple, serial_nos: list, batch_nos: list
):
"Update FIFO Queue on inward stock."
def set_fifo_queue_for_serial_items():
valuation = row.stock_value_difference / row.actual_qty
for serial_no in serial_nos:
if self.serial_no_details.get(serial_no):
fifo_queue.append([serial_no, self.serial_no_details.get(serial_no), valuation])
else:
self.serial_no_details.setdefault(serial_no, row.posting_date)
fifo_queue.append([serial_no, row.posting_date, valuation])
def set_fifo_queue_for_batch_items():
for batch_no, use_batchwise_valuation, qty, stock_value_difference in batch_nos:
qty, stock_value_difference = neutralize_negative_batch_stock(
batch_no, use_batchwise_valuation, qty, stock_value_difference
)
if not qty:
continue
if self.batch_no_details.get(batch_no):
fifo_queue.append(
[
batch_no,
use_batchwise_valuation,
qty,
self.batch_no_details.get(batch_no),
stock_value_difference,
]
)
else:
self.batch_no_details.setdefault(batch_no, row.posting_date)
fifo_queue.append(
[batch_no, use_batchwise_valuation, qty, row.posting_date, stock_value_difference]
)
def neutralize_negative_batch_stock(batch_no, use_batchwise_valuation, qty, stock_value_difference):
qty = flt(qty)
stock_value_difference = flt(stock_value_difference)
if not qty:
return qty, stock_value_difference
for slot in list(fifo_queue):
if (
len(slot) != 5
or slot[0] != batch_no
or slot[1] != use_batchwise_valuation
or flt(slot[2]) >= 0
):
continue
qty_to_adjust = min(qty, abs(flt(slot[2])))
value_to_adjust = (
stock_value_difference
if qty_to_adjust == qty
else flt(stock_value_difference * (qty_to_adjust / qty))
)
slot[2] = flt(slot[2]) + qty_to_adjust
slot[3] = row.posting_date
slot[4] = flt(slot[4]) + value_to_adjust
qty = flt(qty - qty_to_adjust)
stock_value_difference = flt(stock_value_difference - value_to_adjust)
if not flt(slot[2]) and not flt(slot[4]):
fifo_queue.remove(slot)
if not qty:
break
return qty, stock_value_difference
transfer_data = self.transferred_item_details.get(transfer_key)
if transfer_data:
# inward/outward from same voucher, item & warehouse
# eg: Repack with same item, Stock reco for batch item
# consume transfer data and add stock to fifo queue
self.__adjust_incoming_transfer_qty(transfer_data, fifo_queue, row, batch_nos)
else:
if serial_nos and row.get("has_serial_no"):
set_fifo_queue_for_serial_items()
elif batch_nos and row.get("has_batch_no"):
set_fifo_queue_for_batch_items()
elif fifo_queue and flt(fifo_queue[0][0]) <= 0:
# neutralize 0/negative stock by adding positive stock
fifo_queue[0][0] += flt(row.actual_qty)
fifo_queue[0][1] = row.posting_date
fifo_queue[0][2] += flt(row.stock_value_difference)
else:
fifo_queue.append([flt(row.actual_qty), row.posting_date, flt(row.stock_value_difference)])
def __compute_outgoing_stock(
self, row: dict, fifo_queue: list, transfer_key: tuple, serial_nos: list, batch_nos: list
):
"Update FIFO Queue on outward stock."
if serial_nos:
fifo_queue[:] = [serial_no for serial_no in fifo_queue if serial_no[0] not in serial_nos]
elif batch_nos:
for batch_no, use_batchwise_valuation, qty, stock_value_difference in batch_nos:
items_to_remove = []
for slot in fifo_queue:
slot_batch_no, slot_use_batchwise_valuation, slot_qty, _, slot_stock_value = slot
if flt(slot_qty) <= 0:
continue
# Batchwise valuation: consume only from same batch
if use_batchwise_valuation:
if slot_batch_no != batch_no:
continue
# Non-batchwise valuation: consume from any non-batchwise batch
else:
if slot_use_batchwise_valuation:
continue
if flt(slot_qty) <= qty:
qty -= flt(slot_qty)
stock_value_difference -= flt(slot_stock_value)
self.transferred_item_details[transfer_key].append(
[flt(slot_qty), slot[3], flt(slot_stock_value)]
)
items_to_remove.append(slot)
else:
slot[2] = flt(slot_qty) - qty
# preserve ledger valuation (moving average / SLE value), not slot proportional value
slot[4] = flt(slot_stock_value) - stock_value_difference
self.transferred_item_details[transfer_key].append(
[qty, slot[3], stock_value_difference]
)
qty = 0
stock_value_difference = 0
break
for item in items_to_remove:
fifo_queue.remove(item)
if qty:
fifo_queue.append(
[
batch_no,
use_batchwise_valuation,
-(qty),
row.posting_date,
-(stock_value_difference),
]
)
self.transferred_item_details[transfer_key].append(
[qty, row.posting_date, stock_value_difference]
)
else:
qty_to_pop = abs(row.actual_qty)
stock_value = abs(row.stock_value_difference)
while qty_to_pop:
slot = fifo_queue[0] if fifo_queue else [0, None, 0]
if 0 < flt(slot[0]) <= qty_to_pop:
# qty to pop >= slot qty
# if +ve and not enough or exactly same balance in current slot, consume whole slot
qty_to_pop -= flt(slot[0])
stock_value -= flt(slot[2])
self.transferred_item_details[transfer_key].append(fifo_queue.pop(0))
elif not fifo_queue:
# negative stock, no balance but qty yet to consume
fifo_queue.append([-(qty_to_pop), row.posting_date, -(stock_value)])
self.transferred_item_details[transfer_key].append(
[qty_to_pop, row.posting_date, stock_value]
)
qty_to_pop = 0
stock_value = 0
else:
# qty to pop < slot qty, ample balance
# consume actual_qty from first slot
slot[0] = flt(slot[0]) - qty_to_pop
slot[2] = flt(slot[2]) - stock_value
self.transferred_item_details[transfer_key].append([qty_to_pop, slot[1], stock_value])
qty_to_pop = 0
stock_value = 0
def __adjust_incoming_transfer_qty(
self, transfer_data: dict, fifo_queue: list, row: dict, batch_nos: list | None = None
):
"Add previously removed stock back to FIFO Queue."
transfer_qty_to_pop = flt(row.actual_qty)
stock_value = flt(row.stock_value_difference)
batch_nos = [list(batch_no) for batch_no in batch_nos or []]
def is_batch_slot(slot):
return len(slot) == 5
def get_incoming_slots(qty, posting_date, value):
if not batch_nos:
return [[qty, posting_date, value]]
incoming_slots = []
remaining_qty = flt(qty)
remaining_value = flt(value)
while remaining_qty and batch_nos:
batch_no, use_batchwise_valuation, batch_qty, _ = batch_nos[0]
batch_qty = flt(batch_qty)
slot_qty = min(batch_qty, remaining_qty)
slot_value = (
remaining_value
if slot_qty == remaining_qty
else flt(remaining_value * (slot_qty / remaining_qty))
)
incoming_slots.append([batch_no, use_batchwise_valuation, slot_qty, posting_date, slot_value])
batch_nos[0][2] = flt(batch_qty - slot_qty)
if not batch_nos[0][2]:
batch_nos.pop(0)
remaining_qty = flt(remaining_qty - slot_qty)
remaining_value = flt(remaining_value - slot_value)
if remaining_qty:
incoming_slots.append([remaining_qty, posting_date, remaining_value])
return incoming_slots
def add_to_fifo_queue(slot):
matching_negative_batch_slot = next(
(
existing_slot
for existing_slot in fifo_queue
if is_batch_slot(existing_slot)
and is_batch_slot(slot)
and flt(existing_slot[2]) <= 0
and existing_slot[0] == slot[0]
and existing_slot[1] == slot[1]
),
None,
)
if (
fifo_queue
and not is_batch_slot(fifo_queue[0])
and not is_batch_slot(slot)
and flt(fifo_queue[0][0]) <= 0
):
fifo_queue[0][0] += flt(slot[0])
fifo_queue[0][1] = slot[1]
fifo_queue[0][2] += flt(slot[2])
elif matching_negative_batch_slot:
matching_negative_batch_slot[2] += flt(slot[2])
matching_negative_batch_slot[3] = slot[3]
matching_negative_batch_slot[4] += flt(slot[4])
else:
fifo_queue.append(slot)
while transfer_qty_to_pop:
if transfer_data and 0 < flt(transfer_data[0][0]) <= transfer_qty_to_pop:
# bucket qty is not enough, consume whole
transfer_qty = flt(transfer_data[0][0])
transfer_date = transfer_data[0][1]
transfer_value = flt(transfer_data[0][2])
transfer_qty_to_pop -= transfer_qty
stock_value -= transfer_value
for slot in get_incoming_slots(transfer_qty, transfer_date, transfer_value):
add_to_fifo_queue(slot)
transfer_data.pop(0)
elif not transfer_data:
# transfer bucket is empty, extra incoming qty
for slot in get_incoming_slots(transfer_qty_to_pop, row.posting_date, stock_value):
add_to_fifo_queue(slot)
transfer_qty_to_pop = 0
stock_value = 0
else:
# ample bucket qty to consume
transfer_data[0][0] -= transfer_qty_to_pop
transfer_data[0][2] -= stock_value
for slot in get_incoming_slots(transfer_qty_to_pop, transfer_data[0][1], stock_value):
add_to_fifo_queue(slot)
transfer_qty_to_pop = 0
stock_value = 0
def __update_balances(self, row: dict, key: tuple | str):
self.item_details[key]["qty_after_transaction"] = row.qty_after_transaction
if "total_qty" not in self.item_details[key]:
self.item_details[key]["total_qty"] = row.actual_qty
else:
self.item_details[key]["total_qty"] += row.actual_qty
self.item_details[key]["has_serial_no"] = row.has_serial_no
self.item_details[key]["has_batch_no"] = row.has_batch_no
self.item_details[key]["details"].valuation_rate = row.valuation_rate
def __aggregate_details_by_item(self, wh_wise_data: dict) -> dict:
"Aggregate Item-Wh wise data into single Item entry."
item_aggregated_data = {}
for key, row in wh_wise_data.items():
item = key[0]
if not item_aggregated_data.get(item):
item_aggregated_data.setdefault(
item,
{
"details": frappe._dict(),
"fifo_queue": [],
"qty_after_transaction": 0.0,
"total_qty": 0.0,
},
)
item_row = item_aggregated_data.get(item)
item_row["details"].update(row["details"])
item_row["fifo_queue"].extend(row["fifo_queue"])
item_row["qty_after_transaction"] += flt(row["qty_after_transaction"])
item_row["total_qty"] += flt(row["total_qty"])
item_row["has_serial_no"] = row["has_serial_no"]
item_row["has_batch_no"] = row["has_batch_no"]
return item_aggregated_data
def __get_stock_ledger_entries(self) -> Iterator[dict]:
sle = frappe.qb.DocType("Stock Ledger Entry")
item = self.__get_item_query() # used as derived table in sle query
to_date = get_datetime(self.filters.get("to_date") + " 23:59:59")
sle_query = (
frappe.qb.from_(sle)
.from_(item)
.select(
item.name,
item.item_name,
item.item_group,
item.brand,
item.description,
item.stock_uom,
item.has_batch_no,
item.has_serial_no,
sle.actual_qty,
sle.stock_value_difference,
sle.valuation_rate,
sle.posting_date,
sle.voucher_type,
sle.voucher_no,
sle.voucher_detail_no,
sle.serial_no,
sle.batch_no,
sle.qty_after_transaction,
sle.serial_and_batch_bundle,
sle.warehouse,
)
.where(
(sle.item_code == item.name)
& (sle.company == self.filters.get("company"))
& (sle.posting_datetime <= to_date)
& (sle.is_cancelled != 1)
)
)
if self.filters.get("warehouse"):
sle_query = self.__get_warehouse_conditions(sle, sle_query)
elif self.filters.get("warehouse_type"):
warehouses = frappe.get_all(
"Warehouse",
filters={"warehouse_type": self.filters.get("warehouse_type"), "is_group": 0},
pluck="name",
)
if warehouses:
sle_query = sle_query.where(sle.warehouse.isin(warehouses))
sle_query = sle_query.orderby(sle.posting_datetime, sle.creation)
return sle_query.run(as_dict=True, as_iterator=True)
def __get_bundle_wise_serial_nos(self) -> dict:
bundle = frappe.qb.DocType("Serial and Batch Bundle")
entry = frappe.qb.DocType("Serial and Batch Entry")
to_date = get_datetime(self.filters.get("to_date") + " 23:59:59")
query = (
frappe.qb.from_(bundle)
.join(entry)
.on(bundle.name == entry.parent)
.select(bundle.name, entry.serial_no)
.where(
(bundle.docstatus == 1)
& (entry.serial_no.isnotnull())
& (bundle.company == self.filters.get("company"))
& (bundle.posting_datetime <= to_date)
)
)
for field in ["item_code"]:
if self.filters.get(field):
query = query.where(bundle[field] == self.filters.get(field))
if self.filters.get("warehouse"):
query = self.__get_warehouse_conditions(bundle, query)
bundle_wise_serial_nos = frappe._dict({})
for bundle_name, serial_no in query.run():
bundle_wise_serial_nos.setdefault(bundle_name, []).append(serial_no)
return bundle_wise_serial_nos
def __get_bundle_wise_batch_nos(self, sabb_name=None) -> dict:
bundle = frappe.qb.DocType("Serial and Batch Bundle")
entry = frappe.qb.DocType("Serial and Batch Entry")
batch = frappe.qb.DocType("Batch")
to_date = get_datetime(self.filters.get("to_date") + " 23:59:59")
query = (
frappe.qb.from_(bundle)
.join(entry)
.on(bundle.name == entry.parent)
.join(batch)
.on(entry.batch_no == batch.name)
.select(
bundle.name,
entry.batch_no,
batch.use_batchwise_valuation,
Abs(entry.qty).as_("qty"),
Abs(entry.stock_value_difference).as_("stock_value_difference"),
)
.where(
(bundle.docstatus == 1)
& (entry.batch_no.isnotnull())
& (bundle.company == self.filters.get("company"))
& (bundle.posting_datetime <= to_date)
)
)
for field in ["item_code"]:
if self.filters.get(field):
query = query.where(bundle[field] == self.filters.get(field))
if self.filters.get("warehouse"):
query = self.__get_warehouse_conditions(bundle, query)
if sabb_name:
query = query.where(bundle.name == sabb_name)
bundle_wise_batch_nos = frappe._dict({})
for bundle_name, batch_no, use_batchwise_valuation, qty, stock_value_difference in query.run():
bundle_wise_batch_nos.setdefault(bundle_name, []).append(
[batch_no.upper(), use_batchwise_valuation, qty, stock_value_difference]
)
return bundle_wise_batch_nos
def __get_item_query(self) -> str:
item_table = frappe.qb.DocType("Item")
item = frappe.qb.from_("Item").select(
"name",
"item_name",
"description",
"stock_uom",
"brand",
"item_group",
"has_serial_no",
"has_batch_no",
)
if self.filters.get("item_code"):
item = item.where(item_table.item_code == self.filters.get("item_code"))
if self.filters.get("brand"):
item = item.where(item_table.brand == self.filters.get("brand"))
return item
def __get_warehouse_conditions(self, sle, sle_query) -> str:
warehouse = frappe.qb.DocType("Warehouse")
lft, rgt = frappe.db.get_value("Warehouse", self.filters.get("warehouse"), ["lft", "rgt"])
warehouse_results = (
frappe.qb.from_(warehouse)
.select("name")
.where((warehouse.lft >= lft) & (warehouse.rgt <= rgt))
.run()
)
warehouse_results = [x[0] for x in warehouse_results]
return sle_query.where(sle.warehouse.isin(warehouse_results))
def prepare_stock_reco_voucher_wise_count(self):
self.stock_reco_voucher_wise_count = frappe._dict()
doctype = frappe.qb.DocType("Stock Ledger Entry")
item = frappe.qb.DocType("Item")
query = (
frappe.qb.from_(doctype)
.inner_join(item)
.on(doctype.item_code == item.name)
.select(doctype.voucher_detail_no, Count(doctype.name).as_("count"))
.where(
(doctype.voucher_type == "Stock Reconciliation")
& (doctype.docstatus < 2)
& (doctype.is_cancelled == 0)
)
.groupby(doctype.voucher_detail_no)
)
data = query.run(as_dict=True)
if not data:
return
for row in data:
if row.count != 1:
continue
sr_item = frappe.db.get_value(
"Stock Reconciliation Item", row.voucher_detail_no, ["current_qty", "qty"], as_dict=True
)
if sr_item and sr_item.qty and sr_item.current_qty:
self.stock_reco_voucher_wise_count[row.voucher_detail_no] = sr_item.current_qty