refactor: replace db.sql with frappe.qb

This commit is contained in:
khushi8112
2026-06-03 02:48:56 +05:30
parent cd7fa56ec4
commit 41884cfd2a

View File

@@ -3,6 +3,7 @@
import frappe
from frappe import _
from frappe.query_builder import CustomFunction
from frappe.utils import add_months, flt, formatdate
from erpnext.accounts.doctype.accounting_dimension.accounting_dimension import get_dimensions
@@ -42,39 +43,29 @@ def validate_filters(filters):
def get_budget_records(filters, dimensions):
budget_against_field = frappe.scrub(filters["budget_against"])
budget = frappe.qb.DocType("Budget")
return frappe.db.sql(
f"""
SELECT
b.name,
b.account,
b.{budget_against_field} AS dimension,
b.budget_amount,
b.from_fiscal_year,
b.to_fiscal_year,
b.budget_start_date,
b.budget_end_date
FROM
`tabBudget` b
WHERE
b.company = %s
AND b.docstatus = 1
AND b.budget_against = %s
AND b.{budget_against_field} IN ({", ".join(["%s"] * len(dimensions))})
AND (
b.from_fiscal_year <= %s
AND b.to_fiscal_year >= %s
)
""",
(
filters.company,
filters.budget_against,
*dimensions,
filters.to_fiscal_year,
filters.from_fiscal_year,
),
as_dict=True,
)
return (
frappe.qb.from_(budget)
.select(
budget.name,
budget.account,
budget[budget_against_field].as_("dimension"),
budget.budget_amount,
budget.from_fiscal_year,
budget.to_fiscal_year,
budget.budget_start_date,
budget.budget_end_date,
)
.where(
(budget.company == filters.company)
& (budget.docstatus == 1)
& (budget.budget_against == filters.budget_against)
& (budget[budget_against_field].isin(dimensions))
& (budget.from_fiscal_year <= filters.to_fiscal_year)
& (budget.to_fiscal_year >= filters.from_fiscal_year)
)
).run(as_dict=True)
def build_budget_map(budget_records, filters):
@@ -122,52 +113,41 @@ def build_budget_map(budget_records, filters):
def get_actual_transactions(dimension_name, filters):
budget_against = frappe.scrub(filters.get("budget_against"))
cost_center_filter = ""
budget_gl_dimension_join = f"and b.{budget_against} = gl.{budget_against}"
monthname = CustomFunction("MONTHNAME", ["date"])
gle = frappe.qb.DocType("GL Entry")
budget = frappe.qb.DocType("Budget")
query = (
frappe.qb.from_(gle)
.from_(budget)
.select(
gle.account,
gle.debit,
gle.credit,
gle.fiscal_year,
monthname(gle.posting_date).as_("month_name"),
budget[budget_against].as_("budget_against"),
)
.where(
(budget.docstatus == 1)
& (budget.account == gle.account)
& (gle.fiscal_year >= filters.from_fiscal_year)
& (gle.fiscal_year <= filters.to_fiscal_year)
& (gle.is_cancelled == 0)
& (budget[budget_against] == dimension_name)
)
.groupby(gle.name)
.orderby(gle.fiscal_year)
)
if filters.get("budget_against") == "Cost Center" and dimension_name:
cc_lft, cc_rgt = frappe.db.get_value("Cost Center", dimension_name, ["lft", "rgt"])
cost_center_filter = f"""
and lft >= "{cc_lft}"
and rgt <= "{cc_rgt}"
"""
budget_gl_dimension_join = ""
cost_centers = get_cost_center_with_children([dimension_name])
query = query.where(gle.cost_center.isin(cost_centers))
else:
query = query.where(budget[budget_against] == gle[budget_against])
actual_transactions = frappe.db.sql(
f"""
select
gl.account,
gl.debit,
gl.credit,
gl.fiscal_year,
MONTHNAME(gl.posting_date) as month_name,
b.{budget_against} as budget_against
from
`tabGL Entry` gl,
`tabBudget` b
where
b.docstatus = 1
and b.account=gl.account
{budget_gl_dimension_join}
and gl.fiscal_year between %s and %s
and gl.is_cancelled = 0
and b.{budget_against} = %s
and exists(
select
name
from
`tab{filters.budget_against}`
where
name = gl.{budget_against}
{cost_center_filter}
)
group by
gl.name
order by gl.fiscal_year
""",
(filters.from_fiscal_year, filters.to_fiscal_year, dimension_name),
as_dict=1,
)
actual_transactions = query.run(as_dict=True)
actual_transactions_map = {}
for transaction in actual_transactions:
@@ -388,47 +368,35 @@ def get_fiscal_years(filters):
def get_cost_center_with_children(cost_centers):
"""Expand each cost center to include itself and all its descendants."""
cc = frappe.qb.DocType("Cost Center")
all_cost_centers = set()
for cost_center in cost_centers:
result = frappe.db.get_value("Cost Center", cost_center, ["lft", "rgt"])
if not result:
continue
lft, rgt = result
children = frappe.db.sql_list(
"SELECT name FROM `tabCost Center` WHERE lft >= %s AND rgt <= %s",
(lft, rgt),
children = (
frappe.qb.from_(cc).select(cc.name).where((cc.lft >= lft) & (cc.rgt <= rgt)).run(pluck="name")
)
all_cost_centers.update(children)
return list(all_cost_centers)
def get_budget_dimensions(filters):
order_by = ""
if filters.get("budget_against") == "Cost Center":
order_by = "order by lft"
budget_against = filters.get("budget_against")
dimension = frappe.qb.DocType(budget_against)
if filters.get("budget_against") in ["Cost Center", "Project"]:
return frappe.db.sql_list(
"""
select
name
from
`tab{tab}`
where
company = %s
{order_by}
""".format(tab=filters.get("budget_against"), order_by=order_by),
filters.get("company"),
if budget_against in ["Cost Center", "Project"]:
query = (
frappe.qb.from_(dimension)
.select(dimension.name)
.where(dimension.company == filters.get("company"))
)
if budget_against == "Cost Center":
query = query.orderby(dimension.lft)
return query.run(pluck="name")
else:
return frappe.db.sql_list(
"""
select
name
from
`tab{tab}`
""".format(tab=filters.get("budget_against"))
) # nosec
return frappe.qb.from_(dimension).select(dimension.name).run(pluck="name")
def validate_budget_dimensions(filters):