fix: change in functionality

(cherry picked from commit c1874cb7d5)
This commit is contained in:
nishkagosalia
2026-03-24 12:02:09 +05:30
committed by Mergify
parent 8a5e2cc0a6
commit 6d92792634
3 changed files with 240 additions and 129 deletions

View File

@@ -28,16 +28,32 @@ frappe.query_reports["BOM Stock Analysis"] = {
default: false,
},
],
formatter: function (value, row, column, data, default_formatter) {
formatter(value, row, column, data, default_formatter) {
if (data && data.bold && column.fieldname === "item") {
return value ? `<b>${value}</b>` : "";
}
value = default_formatter(value, row, column, data);
if (column.id == "producible_fg_item") {
if (data["producible_fg_item"] >= data["required_qty"]) {
value = `<a style='color:green' href="/app/item/${data["producible_fg_item"]}" data-doctype="producible_fg_item">${data["producible_fg_item"]}</a>`;
} else {
value = `<a style='color:red' href="/app/item/${data["producible_fg_item"]}" data-doctype="producible_fg_item">${data["producible_fg_item"]}</a>`;
if (column.fieldname === "difference_qty" && value !== "" && value !== undefined) {
const numeric = parseFloat(value.replace(/,/g, "")) || 0;
if (numeric < 0) {
value = `<span style="color: red">${value}</span>`;
} else if (numeric > 0) {
value = `<span style="color: green">${value}</span>`;
}
}
if (data && data.bold) {
if (column.fieldname === "description" || column.fieldname === "item_name") {
const qty_to_make = frappe.query_report.get_filter_value("qty_to_make");
const producible = parseFloat(value) || 0;
const colour = qty_to_make && producible < qty_to_make ? "red" : "green";
return `<b style="color: ${colour}">${value}</b>`;
}
return `<b>${value}</b>`;
}
return value;
},
};

View File

@@ -4,74 +4,101 @@
import frappe
from frappe import _
from frappe.query_builder.functions import Floor, IfNull, Sum
from frappe.utils import flt, fmt_money
from frappe.utils.data import comma_and
from pypika.terms import ExistsCriterion
def execute(filters=None):
qty_to_make = filters.get("qty_to_make")
if qty_to_make:
if filters.get("qty_to_make"):
columns = get_columns_with_qty_to_make()
data = get_data_with_qty_to_make(filters)
return columns, data
else:
data = []
columns = get_columns_without_qty_to_make()
bom_data = get_producible_fg_items(filters)
for row in bom_data:
data.append(row)
data = get_data_without_qty_to_make(filters)
return columns, data
return columns, data
def fmt_qty(value):
"""Format a float quantity for display as a string, so blank rows stay blank."""
return frappe.utils.fmt_money(value, precision=2, currency=None)
def fmt_rate(value):
"""Format a currency rate for display as a string."""
currency = frappe.defaults.get_global_default("currency")
return frappe.utils.fmt_money(value, precision=2, currency=currency)
def get_data_with_qty_to_make(filters):
data = []
bom_data = get_bom_data(filters)
manufacture_details = get_manufacturer_records()
purchase_rates = batch_fetch_purchase_rates(bom_data)
qty_to_make = filters.get("qty_to_make")
data = []
for row in bom_data:
required_qty = filters.get("qty_to_make") * row.qty_per_unit
last_purchase_rate = frappe.db.get_value("Item", row.item_code, "last_purchase_rate")
qty_per_unit = flt(row.qty_per_unit) if row.qty_per_unit > 0 else 0
required_qty = qty_to_make * qty_per_unit
difference_qty = flt(row.actual_qty) - required_qty
rate = purchase_rates.get(row.item_code, 0)
data.append(get_report_data(last_purchase_rate, required_qty, row, manufacture_details))
data.append(
{
"item": row.item_code,
"description": row.description,
"from_bom_no": row.from_bom_no,
"manufacturer": comma_and(
manufacture_details.get(row.item_code, {}).get("manufacturer", []), add_quotes=False
),
"manufacturer_part_number": comma_and(
manufacture_details.get(row.item_code, {}).get("manufacturer_part", []), add_quotes=False
),
"qty_per_unit": fmt_qty(qty_per_unit),
"available_qty": fmt_qty(row.actual_qty),
"required_qty": fmt_qty(required_qty),
"difference_qty": fmt_qty(difference_qty),
"last_purchase_rate": fmt_rate(rate),
"_available_qty": flt(row.actual_qty),
"_qty_per_unit": qty_per_unit,
}
)
min_producible = (
min(int(r["_available_qty"] // r["_qty_per_unit"]) for r in data if r["_qty_per_unit"]) if data else 0
)
for row in data:
row.pop("_available_qty", None)
row.pop("_qty_per_unit", None)
# blank spacer row
data.append({})
data.append(
{
"item": _("Maximum Producible Items"),
"description": min_producible,
"from_bom_no": "",
"manufacturer": "",
"manufacturer_part_number": "",
"qty_per_unit": "",
"available_qty": "",
"required_qty": "",
"difference_qty": "",
"last_purchase_rate": "",
"bold": 1,
}
)
return data
def get_report_data(last_purchase_rate, required_qty, row, manufacture_details):
qty_per_unit = row.qty_per_unit if row.qty_per_unit > 0 else 0
difference_qty = row.actual_qty - required_qty
return [
row.item_code,
row.description,
row.from_bom_no,
comma_and(manufacture_details.get(row.item_code, {}).get("manufacturer", []), add_quotes=False),
comma_and(manufacture_details.get(row.item_code, {}).get("manufacturer_part", []), add_quotes=False),
qty_per_unit,
row.actual_qty,
required_qty,
difference_qty,
last_purchase_rate,
row.actual_qty // qty_per_unit if qty_per_unit else 0,
]
def get_columns_with_qty_to_make():
return [
{
"fieldname": "item",
"label": _("Item"),
"fieldtype": "Link",
"options": "Item",
"width": 120,
},
{
"fieldname": "description",
"label": _("Description"),
"fieldtype": "Data",
"width": 150,
},
{"fieldname": "item", "label": _("Item"), "fieldtype": "Link", "options": "Item", "width": 180},
{"fieldname": "description", "label": _("Description"), "fieldtype": "Data", "width": 160},
{
"fieldname": "from_bom_no",
"label": _("From BOM No"),
@@ -79,68 +106,89 @@ def get_columns_with_qty_to_make():
"options": "BOM",
"width": 150,
},
{
"fieldname": "manufacturer",
"label": _("Manufacturer"),
"fieldtype": "Data",
"width": 120,
},
{"fieldname": "manufacturer", "label": _("Manufacturer"), "fieldtype": "Data", "width": 130},
{
"fieldname": "manufacturer_part_number",
"label": _("Manufacturer Part Number"),
"fieldtype": "Data",
"width": 150,
},
{
"fieldname": "qty_per_unit",
"label": _("Qty Per Unit"),
"fieldtype": "Float",
"width": 110,
},
{
"fieldname": "available_qty",
"label": _("Available Qty"),
"fieldtype": "Float",
"width": 120,
},
{
"fieldname": "required_qty",
"label": _("Required Qty"),
"fieldtype": "Float",
"width": 120,
},
{
"fieldname": "difference_qty",
"label": _("Difference Qty"),
"fieldtype": "Float",
"width": 130,
"width": 170,
},
{"fieldname": "qty_per_unit", "label": _("Qty Per Unit"), "fieldtype": "Data", "width": 110},
{"fieldname": "available_qty", "label": _("Available Qty"), "fieldtype": "Data", "width": 120},
{"fieldname": "required_qty", "label": _("Required Qty"), "fieldtype": "Data", "width": 120},
{"fieldname": "difference_qty", "label": _("Difference Qty"), "fieldtype": "Data", "width": 130},
{
"fieldname": "last_purchase_rate",
"label": _("Last Purchase Rate"),
"fieldtype": "Float",
"fieldtype": "Data",
"width": 160,
},
{
"fieldname": "producible_fg_item",
"label": _("Producible FG Item"),
"fieldtype": "Float",
"width": 200,
},
]
def get_data_without_qty_to_make(filters):
raw_rows = get_producible_fg_items(filters)
data = []
for row in raw_rows:
data.append(
{
"item": row[0],
"description": row[1],
"from_bom_no": row[2],
"qty_per_unit": fmt_qty(row[3]),
"available_qty": fmt_qty(row[4]),
}
)
min_producible = min((row[5] or 0) for row in raw_rows) if raw_rows else 0
# blank spacer row
data.append({})
data.append(
{
"item": _("Maximum Producible Items"),
"description": min_producible,
"from_bom_no": "",
"qty_per_unit": "",
"available_qty": "",
"bold": 1,
}
)
return data
def get_columns_without_qty_to_make():
return [
_("Item") + ":Link/Item:150",
_("Item Name") + "::240",
_("Description") + "::300",
_("From BOM No") + "::200",
_("Required Qty") + ":Float:160",
_("Producible FG Item") + ":Float:200",
{"fieldname": "item", "label": _("Item"), "fieldtype": "Link", "options": "Item", "width": 180},
{"fieldname": "description", "label": _("Description"), "fieldtype": "Data", "width": 200},
{
"fieldname": "from_bom_no",
"label": _("From BOM No"),
"fieldtype": "Link",
"options": "BOM",
"width": 160,
},
{"fieldname": "qty_per_unit", "label": _("Qty Per Unit"), "fieldtype": "Data", "width": 120},
{"fieldname": "available_qty", "label": _("Available Qty"), "fieldtype": "Data", "width": 120},
]
def batch_fetch_purchase_rates(bom_data):
if not bom_data:
return {}
item_codes = [row.item_code for row in bom_data]
return {
r.name: r.last_purchase_rate
for r in frappe.get_all(
"Item",
filters={"name": ["in", item_codes]},
fields=["name", "last_purchase_rate"],
)
}
def get_bom_data(filters):
bom_item_table = "BOM Explosion Item" if filters.get("show_exploded_view") else "BOM Item"
@@ -167,7 +215,6 @@ def get_bom_data(filters):
warehouse_details = frappe.db.get_value(
"Warehouse", filters.get("warehouse"), ["lft", "rgt"], as_dict=1
)
if warehouse_details:
wh = frappe.qb.DocType("Warehouse")
query = query.where(
@@ -212,7 +259,6 @@ def explode_phantom_boms(data, filters):
data.pop(idx)
data[idx:idx] = children
filters["bom"] = original_bom
return data
@@ -220,13 +266,11 @@ def get_manufacturer_records():
details = frappe.get_all(
"Item Manufacturer", fields=["manufacturer", "manufacturer_part_no", "item_code"]
)
manufacture_details = frappe._dict()
for detail in details:
dic = manufacture_details.setdefault(detail.get("item_code"), {})
dic.setdefault("manufacturer", []).append(detail.get("manufacturer"))
dic.setdefault("manufacturer_part", []).append(detail.get("manufacturer_part_no"))
return manufacture_details
@@ -237,11 +281,11 @@ def get_producible_fg_items(filters):
WH = frappe.qb.DocType("Warehouse")
warehouse = filters.get("warehouse")
warehouse_details = frappe.db.get_value("Warehouse", warehouse, ["lft", "rgt"], as_dict=1)
if not warehouse:
frappe.throw(_("Warehouse is required to get producible FG Items"))
warehouse_details = frappe.db.get_value("Warehouse", warehouse, ["lft", "rgt"], as_dict=1)
if warehouse_details:
bin_subquery = (
frappe.qb.from_(BIN)
@@ -267,10 +311,10 @@ def get_producible_fg_items(filters):
.on(BOM_ITEM.item_code == bin_subquery.item_code)
.select(
BOM_ITEM.item_code,
BOM_ITEM.item_name,
BOM_ITEM.description,
BOM_ITEM.parent.as_("from_bom_no"),
(BOM_ITEM.stock_qty / BOM.quantity).as_("qty_per_unit"),
IfNull(bin_subquery.actual_qty, 0).as_("available_qty"),
Floor(bin_subquery.actual_qty / ((Sum(BOM_ITEM.stock_qty)) / BOM.quantity)),
)
.where((BOM_ITEM.parent == filters.get("bom")) & (BOM_ITEM.parenttype == "BOM"))
@@ -278,5 +322,4 @@ def get_producible_fg_items(filters):
.orderby(BOM_ITEM.idx)
)
data = query.run(as_list=True)
return data
return query.run(as_list=True)

View File

@@ -1,7 +1,7 @@
# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and contributors
# Copyright (c) 2026, Frappe Technologies Pvt. Ltd. and contributors
# For license information, please see license.txt
import frappe
from frappe.utils import fmt_money
from erpnext.manufacturing.doctype.production_plan.test_production_plan import make_bom
from erpnext.manufacturing.report.bom_stock_analysis.bom_stock_analysis import (
@@ -11,6 +11,15 @@ from erpnext.stock.doctype.item.test_item import make_item
from erpnext.tests.utils import ERPNextTestSuite
def fmt_qty(value):
return fmt_money(value, precision=2, currency=None)
def fmt_rate(value):
currency = frappe.defaults.get_global_default("currency")
return fmt_money(value, precision=2, currency=currency)
class TestBOMStockAnalysis(ERPNextTestSuite):
def setUp(self):
self.fg_item, self.rm_items = create_items()
@@ -20,34 +29,62 @@ class TestBOMStockAnalysis(ERPNextTestSuite):
qty_to_make = 10
# Case 1: When Item(s) Qty and Stock Qty are equal.
data = bom_stock_analysis_report(
raw_data = bom_stock_analysis_report(
filters={
"qty_to_make": qty_to_make,
"bom": self.boms[0].name,
}
)[1]
expected_data = get_expected_data(self.boms[0], qty_to_make)
self.assertSetEqual(set(tuple(x) for x in data), set(tuple(x) for x in expected_data))
data, footer = split_data_and_footer(raw_data)
expected_data, expected_min = get_expected_data(self.boms[0], qty_to_make)
self.assertSetEqual(
set(tuple(sorted(r.items())) for r in data),
set(tuple(sorted(r.items())) for r in expected_data),
)
self.assertEqual(footer.get("description"), expected_min)
# Case 2: When Item(s) Qty and Stock Qty are different and BOM Qty is 1.
data = bom_stock_analysis_report(
raw_data = bom_stock_analysis_report(
filters={
"qty_to_make": qty_to_make,
"bom": self.boms[1].name,
}
)[1]
expected_data = get_expected_data(self.boms[1], qty_to_make)
self.assertSetEqual(set(tuple(x) for x in data), set(tuple(x) for x in expected_data))
data, footer = split_data_and_footer(raw_data)
expected_data, expected_min = get_expected_data(self.boms[1], qty_to_make)
self.assertSetEqual(
set(tuple(sorted(r.items())) for r in data),
set(tuple(sorted(r.items())) for r in expected_data),
)
self.assertEqual(footer.get("description"), expected_min)
# Case 3: When Item(s) Qty and Stock Qty are different and BOM Qty is greater than 1.
data = bom_stock_analysis_report(
raw_data = bom_stock_analysis_report(
filters={
"qty_to_make": qty_to_make,
"bom": self.boms[2].name,
}
)[1]
expected_data = get_expected_data(self.boms[2], qty_to_make)
self.assertSetEqual(set(tuple(x) for x in data), set(tuple(x) for x in expected_data))
data, footer = split_data_and_footer(raw_data)
expected_data, expected_min = get_expected_data(self.boms[2], qty_to_make)
self.assertSetEqual(
set(tuple(sorted(r.items())) for r in data),
set(tuple(sorted(r.items())) for r in expected_data),
)
self.assertEqual(footer.get("description"), expected_min)
def split_data_and_footer(raw_data):
"""Separate component rows from the footer row. Skips blank spacer rows."""
data = [row for row in raw_data if row and not row.get("bold")]
footer = next((row for row in raw_data if row and row.get("bold")), {})
return data, footer
def create_items():
@@ -79,7 +116,6 @@ def create_boms(fg_item, rm_items):
for item in bom.items:
item.uom = uom
item.conversion_factor = conversion_factor
return bom
bom1 = make_bom(item=fg_item, quantity=1, raw_materials=rm_items, rm_qty=10)
@@ -98,22 +134,38 @@ def create_boms(fg_item, rm_items):
def get_expected_data(bom, qty_to_make):
"""
Returns (component_rows, min_producible).
Component rows are dicts matching what the report produces.
min_producible is the expected footer value.
"""
expected_data = []
producible_per_item = []
for idx, bom_item in enumerate(bom.items):
qty_per_unit = float(bom_item.stock_qty / bom.quantity)
available_qty = float(100 * (idx + 1))
required_qty = float(qty_to_make * qty_per_unit)
difference_qty = available_qty - required_qty
last_purchase_rate = float(100 * (idx + 1))
for idx in range(len(bom.items)):
expected_data.append(
[
bom.items[idx].item_code,
bom.items[idx].item_code,
bom.name,
"",
"",
float(bom.items[idx].stock_qty / bom.quantity),
float(100 * (idx + 1)),
float(qty_to_make * (bom.items[idx].stock_qty / bom.quantity)),
float((100 * (idx + 1)) - (qty_to_make * (bom.items[idx].stock_qty / bom.quantity))),
float(100 * (idx + 1)),
]
{
"item": bom_item.item_code,
"description": bom_item.item_code, # description falls back to item_code in test items
"from_bom_no": bom.name,
"manufacturer": "",
"manufacturer_part_number": "",
"qty_per_unit": fmt_qty(qty_per_unit),
"available_qty": fmt_qty(available_qty),
"required_qty": fmt_qty(required_qty),
"difference_qty": fmt_qty(difference_qty),
"last_purchase_rate": fmt_rate(last_purchase_rate),
}
)
return expected_data
producible_per_item.append(int(available_qty // qty_per_unit) if qty_per_unit else 0)
min_producible = min(producible_per_item) if producible_per_item else 0
return expected_data, min_producible