refactor: import functions in new report instead of redundant code

(cherry picked from commit 501f07803e)
This commit is contained in:
Mihir Kandoi
2025-03-07 12:05:31 +05:30
committed by Mergify
parent c472af87b2
commit 0ec026ec7e

View File

@@ -1,13 +1,10 @@
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# License: GNU General Public License v3. See license.txt
from collections import defaultdict
import frappe
from frappe import _
from frappe.query_builder.functions import CombineDatetime, Sum
from frappe.utils import cint, flt, get_datetime
from frappe.query_builder.functions import Sum
from frappe.utils import cint, flt
from erpnext.stock.doctype.inventory_dimension.inventory_dimension import get_inventory_dimensions
from erpnext.stock.doctype.serial_no.serial_no import (
@@ -15,7 +12,14 @@ from erpnext.stock.doctype.serial_no.serial_no import (
get_serial_nos_from_serial_and_batch_bundle,
)
from erpnext.stock.doctype.stock_reconciliation.stock_reconciliation import get_stock_balance_for
from erpnext.stock.doctype.warehouse.warehouse import apply_warehouse_filter
from erpnext.stock.report.stock_ledger.stock_ledger import (
check_inventory_dimension_filters_applied,
get_item_details,
get_item_group_condition,
get_opening_balance,
get_opening_balance_from_batch,
get_stock_ledger_entries,
)
from erpnext.stock.utils import (
is_reposting_item_valuation_in_progress,
update_included_uom_in_report,
@@ -288,91 +292,6 @@ def get_columns(filters):
return columns
def get_stock_ledger_entries(filters, items):
from_date = get_datetime(filters.from_date + " 00:00:00")
to_date = get_datetime(filters.to_date + " 23:59:59")
sle = frappe.qb.DocType("Stock Ledger Entry")
query = (
frappe.qb.from_(sle)
.select(
sle.item_code,
sle.posting_datetime.as_("date"),
sle.warehouse,
sle.posting_date,
sle.posting_time,
sle.actual_qty,
sle.incoming_rate,
sle.valuation_rate,
sle.company,
sle.voucher_type,
sle.qty_after_transaction,
sle.stock_value_difference,
sle.serial_and_batch_bundle,
sle.voucher_no,
sle.stock_value,
sle.batch_no,
sle.serial_no,
sle.project,
)
.where((sle.docstatus < 2) & (sle.is_cancelled == 0) & (sle.posting_datetime[from_date:to_date]))
.orderby(sle.posting_datetime)
.orderby(sle.creation)
)
inventory_dimension_fields = get_inventory_dimension_fields()
if inventory_dimension_fields:
for fieldname in inventory_dimension_fields:
query = query.select(fieldname)
if fieldname in filters and filters.get(fieldname):
query = query.where(sle[fieldname].isin(filters.get(fieldname)))
if items:
query = query.where(sle.item_code.isin(items))
for field in ["voucher_no", "project", "company"]:
if filters.get(field) and field not in inventory_dimension_fields:
query = query.where(sle[field] == filters.get(field))
if filters.get("batch_no"):
bundles = get_serial_and_batch_bundles(filters)
if bundles:
query = query.where(
(sle.serial_and_batch_bundle.isin(bundles)) | (sle.batch_no == filters.batch_no)
)
else:
query = query.where(sle.batch_no == filters.batch_no)
query = apply_warehouse_filter(query, sle, filters)
return query.run(as_dict=True)
def get_serial_and_batch_bundles(filters):
SBB = frappe.qb.DocType("Serial and Batch Bundle")
SBE = frappe.qb.DocType("Serial and Batch Entry")
query = (
frappe.qb.from_(SBE)
.inner_join(SBB)
.on(SBE.parent == SBB.name)
.select(SBE.parent)
.where(
(SBB.docstatus == 1)
& (SBB.has_batch_no == 1)
& (SBB.voucher_no.notnull())
& (SBE.batch_no == filters.batch_no)
)
)
return query.run(pluck=SBE.parent)
def get_inventory_dimension_fields():
return [dimension.fieldname for dimension in get_inventory_dimensions()]
def get_items(filters):
item = frappe.qb.DocType("Item")
query = frappe.qb.from_(item).select(item.name).where(item.has_serial_no == 1)
@@ -394,188 +313,3 @@ def get_items(filters):
items = [r[0] for r in query.run()]
return items
def get_item_details(items, sl_entries, include_uom):
item_details = {}
if not items:
items = list(set(d.item_code for d in sl_entries))
if not items:
return item_details
item = frappe.qb.DocType("Item")
query = (
frappe.qb.from_(item)
.select(item.name, item.item_name, item.description, item.item_group, item.brand, item.stock_uom)
.where(item.name.isin(items))
)
if include_uom:
ucd = frappe.qb.DocType("UOM Conversion Detail")
query = (
query.left_join(ucd)
.on((ucd.parent == item.name) & (ucd.uom == include_uom))
.select(ucd.conversion_factor)
)
res = query.run(as_dict=True)
for item in res:
item_details.setdefault(item.name, item)
return item_details
def get_sle_conditions(filters):
conditions = []
if filters.get("warehouse"):
warehouse_condition = get_warehouse_condition(filters.get("warehouse"))
if warehouse_condition:
conditions.append(warehouse_condition)
if filters.get("voucher_no"):
conditions.append("voucher_no=%(voucher_no)s")
if filters.get("batch_no"):
conditions.append("batch_no=%(batch_no)s")
if filters.get("project"):
conditions.append("project=%(project)s")
for dimension in get_inventory_dimensions():
if filters.get(dimension.fieldname):
conditions.append(f"{dimension.fieldname} in %({dimension.fieldname})s")
return "and {}".format(" and ".join(conditions)) if conditions else ""
def get_opening_balance_from_batch(filters, columns, sl_entries):
query_filters = {
"batch_no": filters.batch_no,
"docstatus": 1,
"is_cancelled": 0,
"posting_date": ("<", filters.from_date),
"company": filters.company,
}
for fields in ["item_code", "warehouse"]:
if filters.get(fields):
query_filters[fields] = filters.get(fields)
opening_data = frappe.get_all(
"Stock Ledger Entry",
fields=["sum(actual_qty) as qty_after_transaction", "sum(stock_value_difference) as stock_value"],
filters=query_filters,
)[0]
for field in ["qty_after_transaction", "stock_value", "valuation_rate"]:
if opening_data.get(field) is None:
opening_data[field] = 0.0
table = frappe.qb.DocType("Stock Ledger Entry")
sabb_table = frappe.qb.DocType("Serial and Batch Entry")
query = (
frappe.qb.from_(table)
.inner_join(sabb_table)
.on(table.serial_and_batch_bundle == sabb_table.parent)
.select(
Sum(sabb_table.qty).as_("qty"),
Sum(sabb_table.stock_value_difference).as_("stock_value"),
)
.where(
(sabb_table.batch_no == filters.batch_no)
& (sabb_table.docstatus == 1)
& (table.posting_date < filters.from_date)
& (table.is_cancelled == 0)
)
)
for field in ["item_code", "warehouse", "company"]:
if filters.get(field):
query = query.where(table[field] == filters.get(field))
bundle_data = query.run(as_dict=True)
if bundle_data:
opening_data.qty_after_transaction += flt(bundle_data[0].qty)
opening_data.stock_value += flt(bundle_data[0].stock_value)
if opening_data.qty_after_transaction:
opening_data.valuation_rate = flt(opening_data.stock_value) / flt(
opening_data.qty_after_transaction
)
return {
"item_code": _("'Opening'"),
"qty_after_transaction": opening_data.qty_after_transaction,
"valuation_rate": opening_data.valuation_rate,
"stock_value": opening_data.stock_value,
}
def get_opening_balance(filters, columns, sl_entries):
if not (filters.item_code and filters.warehouse and filters.from_date):
return
from erpnext.stock.stock_ledger import get_previous_sle
last_entry = get_previous_sle(
{
"item_code": filters.item_code,
"warehouse_condition": get_warehouse_condition(filters.warehouse),
"posting_date": filters.from_date,
"posting_time": "00:00:00",
}
)
# check if any SLEs are actually Opening Stock Reconciliation
for sle in list(sl_entries):
if (
sle.get("voucher_type") == "Stock Reconciliation"
and sle.posting_date == filters.from_date
and frappe.db.get_value("Stock Reconciliation", sle.voucher_no, "purpose") == "Opening Stock"
):
last_entry = sle
sl_entries.remove(sle)
row = {
"item_code": _("'Opening'"),
"qty_after_transaction": last_entry.get("qty_after_transaction", 0),
"valuation_rate": last_entry.get("valuation_rate", 0),
"stock_value": last_entry.get("stock_value", 0),
}
return row
def get_warehouse_condition(warehouse):
warehouse_details = frappe.db.get_value("Warehouse", warehouse, ["lft", "rgt"], as_dict=1)
if warehouse_details:
return f" exists (select name from `tabWarehouse` wh \
where wh.lft >= {warehouse_details.lft} and wh.rgt <= {warehouse_details.rgt} and warehouse = wh.name)"
return ""
def get_item_group_condition(item_group, item_table=None):
item_group_details = frappe.db.get_value("Item Group", item_group, ["lft", "rgt"], as_dict=1)
if item_group_details:
if item_table:
ig = frappe.qb.DocType("Item Group")
return item_table.item_group.isin(
frappe.qb.from_(ig)
.select(ig.name)
.where(
(ig.lft >= item_group_details.lft)
& (ig.rgt <= item_group_details.rgt)
& (item_table.item_group == ig.name)
)
)
else:
return f"item.item_group in (select ig.name from `tabItem Group` ig \
where ig.lft >= {item_group_details.lft} and ig.rgt <= {item_group_details.rgt} and item.item_group = ig.name)"
def check_inventory_dimension_filters_applied(filters) -> bool:
for dimension in get_inventory_dimensions():
if dimension.fieldname in filters and filters.get(dimension.fieldname):
return True
return False