Merge pull request #49192 from diptanilsaha/sql_qb

fix: improve queries with query builder and input sanitization
This commit is contained in:
ruthra kumar
2025-08-19 10:30:13 +05:30
committed by GitHub
7 changed files with 117 additions and 90 deletions

View File

@@ -462,9 +462,8 @@ def unset_existing_data(company):
"Sales Taxes and Charges Template",
"Purchase Taxes and Charges Template",
]:
frappe.db.sql(
f'''delete from `tab{doctype}` where `company`="%s"''' % (company) # nosec
)
dt = frappe.qb.DocType(doctype)
frappe.qb.from_(dt).where(dt.company == company).delete().run()
def set_default_accounts(company):

View File

@@ -5,6 +5,7 @@
import frappe
from frappe import _
from frappe.model.document import Document
from frappe.query_builder.functions import Sum
from frappe.utils import flt, today
@@ -55,22 +56,30 @@ def get_loyalty_details(
if not expiry_date:
expiry_date = today()
condition = ""
if company:
condition = " and company=%s " % frappe.db.escape(company)
if not include_expired_entry:
condition += " and expiry_date>='%s' " % expiry_date
LoyaltyPointEntry = frappe.qb.DocType("Loyalty Point Entry")
loyalty_point_details = frappe.db.sql(
f"""select sum(loyalty_points) as loyalty_points,
sum(purchase_amount) as total_spent from `tabLoyalty Point Entry`
where customer=%s and loyalty_program=%s and posting_date <= %s
{condition}
group by customer""",
(customer, loyalty_program, expiry_date),
as_dict=1,
query = (
frappe.qb.from_(LoyaltyPointEntry)
.select(
Sum(LoyaltyPointEntry.loyalty_points).as_("loyalty_points"),
Sum(LoyaltyPointEntry.purchase_amount).as_("total_spent"),
)
.where(
(LoyaltyPointEntry.customer == customer)
& (LoyaltyPointEntry.loyalty_program == loyalty_program)
& (LoyaltyPointEntry.posting_date <= expiry_date)
)
.groupby(LoyaltyPointEntry.customer)
)
if company:
query = query.where(LoyaltyPointEntry.company == company)
if not include_expired_entry:
query = query.where(LoyaltyPointEntry.expiry_date >= expiry_date)
loyalty_point_details = query.run(as_dict=True)
if loyalty_point_details:
return loyalty_point_details[0]
else:

View File

@@ -9,6 +9,7 @@ from frappe import _
from frappe.core.doctype.communication.email import make
from frappe.desk.form.load import get_attachments
from frappe.model.mapper import get_mapped_doc
from frappe.query_builder import Order
from frappe.utils import get_url
from frappe.utils.print_format import download_pdf
from frappe.utils.user import get_user_fullname
@@ -582,35 +583,32 @@ def get_supplier_tag():
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def get_rfq_containing_supplier(doctype, txt, searchfield, start, page_len, filters):
conditions = ""
if txt:
conditions += "and rfq.name like '%%" + txt + "%%' "
rfq = frappe.qb.DocType("Request for Quotation")
rfq_supplier = frappe.qb.DocType("Request for Quotation Supplier")
if filters.get("transaction_date"):
conditions += "and rfq.transaction_date = '{}'".format(filters.get("transaction_date"))
rfq_data = frappe.db.sql(
f"""
select
distinct rfq.name, rfq.transaction_date,
rfq.company
from
`tabRequest for Quotation` rfq, `tabRequest for Quotation Supplier` rfq_supplier
where
rfq.name = rfq_supplier.parent
and rfq_supplier.supplier = %(supplier)s
and rfq.docstatus = 1
and rfq.company = %(company)s
{conditions}
order by rfq.transaction_date ASC
limit %(page_len)s offset %(start)s """,
{
"page_len": page_len,
"start": start,
"company": filters.get("company"),
"supplier": filters.get("supplier"),
},
as_dict=1,
query = (
frappe.qb.from_(rfq)
.from_(rfq_supplier)
.select(rfq.name)
.distinct()
.select(rfq.transaction_date, rfq.company)
.where(
(rfq.name == rfq_supplier.parent)
& (rfq_supplier.supplier == filters.get("supplier"))
& (rfq.docstatus == 1)
& (rfq.company == filters.get("company"))
)
.orderby(rfq.transaction_date, order=Order.asc)
.limit(page_len)
.offset(start)
)
if txt:
query = query.where(rfq.name.like(f"%%{txt}%%"))
if filters.get("transaction_date"):
query = query.where(rfq.transaction_date == filters.get("transaction_date"))
rfq_data = query.run(as_dict=1)
return rfq_data

View File

@@ -588,21 +588,27 @@ def get_account_list(doctype, txt, searchfield, start, page_len, filters):
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def get_blanket_orders(doctype, txt, searchfield, start, page_len, filters):
return frappe.db.sql(
"""select distinct bo.name, bo.blanket_order_type, bo.to_date
from `tabBlanket Order` bo, `tabBlanket Order Item` boi
where
boi.parent = bo.name
and boi.item_code = {item_code}
and bo.blanket_order_type = '{blanket_order_type}'
and bo.company = {company}
and bo.docstatus = 1""".format(
item_code=frappe.db.escape(filters.get("item")),
blanket_order_type=filters.get("blanket_order_type"),
company=frappe.db.escape(filters.get("company")),
bo = frappe.qb.DocType("Blanket Order")
bo_item = frappe.qb.DocType("Blanket Order Item")
blanket_orders = (
frappe.qb.from_(bo)
.from_(bo_item)
.select(bo.name)
.distinct()
.select(bo.blanket_order_type, bo.to_date)
.where(
(bo_item.parent == bo.name)
& (bo_item.item_code == filters.get("item"))
& (bo.blanket_order_type == filters.get("blanket_order_type"))
& (bo.company == filters.get("company"))
& (bo.docstatus == 1)
)
.run()
)
return blanket_orders
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
@@ -620,7 +626,7 @@ def get_income_account(doctype, txt, searchfield, start, page_len, filters):
if filters.get("company"):
condition += "and tabAccount.company = %(company)s"
condition += f"and tabAccount.disabled = {filters.get('disabled', 0)}"
condition += " and tabAccount.disabled = %(disabled)s"
return frappe.db.sql(
f"""select tabAccount.name from `tabAccount`
@@ -630,7 +636,11 @@ def get_income_account(doctype, txt, searchfield, start, page_len, filters):
and tabAccount.`{searchfield}` LIKE %(txt)s
{condition} {get_match_cond(doctype)}
order by idx desc, name""",
{"txt": "%" + txt + "%", "company": filters.get("company", "")},
{
"txt": "%" + txt + "%",
"company": filters.get("company", ""),
"disabled": cint(filters.get("disabled", 0)),
},
)

View File

@@ -329,12 +329,16 @@ def get_projectwise_timesheet_data(project=None, parent=None, from_time=None, to
@frappe.whitelist()
def get_timesheet_detail_rate(timelog, currency):
timelog_detail = frappe.db.sql(
f"""SELECT tsd.billing_amount as billing_amount,
ts.currency as currency FROM `tabTimesheet Detail` tsd
INNER JOIN `tabTimesheet` ts ON ts.name=tsd.parent
WHERE tsd.name = '{timelog}'""",
as_dict=1,
ts = frappe.qb.DocType("Timesheet")
ts_detail = frappe.qb.DocType("Timesheet Detail")
timelog_detail = (
frappe.qb.from_(ts_detail)
.inner_join(ts)
.on(ts.name == ts_detail.parent)
.select(ts_detail.billing_amount.as_("billing_amount"), ts.currency.as_("currency"))
.where(ts_detail.name == timelog)
.run(as_dict=1)
)[0]
if timelog_detail.currency:

View File

@@ -11,6 +11,7 @@ import frappe
import frappe.defaults
from frappe import _, msgprint
from frappe.model.mapper import get_mapped_doc
from frappe.query_builder import Order
from frappe.query_builder.functions import Sum
from frappe.utils import cint, cstr, flt, get_link_to_form, getdate, new_line_sep, nowdate
@@ -624,39 +625,44 @@ def get_items_based_on_default_supplier(supplier):
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def get_material_requests_based_on_supplier(doctype, txt, searchfield, start, page_len, filters):
conditions = ""
if txt:
conditions += "and mr.name like '%%" + txt + "%%' "
if filters.get("transaction_date"):
date = filters.get("transaction_date")[1]
conditions += f"and mr.transaction_date between '{date[0]}' and '{date[1]}' "
supplier = filters.get("supplier")
supplier_items = get_items_based_on_default_supplier(supplier)
if not supplier_items:
frappe.throw(_("{0} is not the default supplier for any items.").format(supplier))
material_requests = frappe.db.sql(
"""select distinct mr.name, transaction_date,company
from `tabMaterial Request` mr, `tabMaterial Request Item` mr_item
where mr.name = mr_item.parent
and mr_item.item_code in ({})
and mr.material_request_type = 'Purchase'
and mr.per_ordered < 99.99
and mr.docstatus = 1
and mr.status != 'Stopped'
and mr.company = %s
{}
order by mr_item.item_code ASC
limit {} offset {} """.format(
", ".join(["%s"] * len(supplier_items)), conditions, cint(page_len), cint(start)
),
(*tuple(supplier_items), filters.get("company")),
as_dict=1,
mr = frappe.qb.DocType("Material Request")
mr_item = frappe.qb.DocType("Material Request Item")
query = (
frappe.qb.from_(mr)
.from_(mr_item)
.select(mr.name)
.distinct()
.select(mr.transaction_date, mr.company)
.where(
(mr.name == mr_item.parent)
& (mr_item.item_code.isin(supplier_items))
& (mr.material_request_type == "Purchase")
& (mr.per_ordered < 99.99)
& (mr.docstatus == 1)
& (mr.status != "Stopped")
& (mr.company == filters.get("company"))
)
.orderby(mr_item.item_code, order=Order.asc)
.limit(cint(page_len))
.offset(cint(start))
)
if txt:
query = query.where(mr.name.like(f"%%{txt}%%"))
if filters.get("transaction_date"):
date = filters.get("transaction_date")[1]
query = query.where(mr.transaction_date[date[0] : date[1]])
material_requests = query.run(as_dict=True)
return material_requests

View File

@@ -126,8 +126,9 @@ def get_stock_balance(
extra_cond = ""
if inventory_dimensions_dict:
for field, value in inventory_dimensions_dict.items():
column = frappe.utils.sanitize_column(field)
args[field] = value
extra_cond += f" and {field} = %({field})s"
extra_cond += f" and {column} = %({field})s"
last_entry = get_previous_sle(args, extra_cond=extra_cond)