refactor: Sales Partner Commission Summary and Sales Partner Transaction Summary report (backport #54268) (#54430)

Co-authored-by: diptanilsaha <diptanil@frappe.io>
This commit is contained in:
mergify[bot]
2026-05-04 11:22:32 +05:30
committed by GitHub
parent 581529fd00
commit d07d7feb3f
3 changed files with 216 additions and 222 deletions

View File

@@ -1,122 +1,176 @@
# Copyright (c) 2013, Frappe Technologies Pvt. Ltd. and contributors
# For license information, please see license.txt
import frappe
from frappe import _, msgprint
from frappe import _
from frappe.query_builder import DocType, Field, Order
from frappe.query_builder.custom import ConstantColumn
from frappe.query_builder.utils import QueryBuilder
from frappe.utils.data import comma_or
SALES_TRANSACTION_DOCTYPES = ["Sales Order", "Sales Invoice", "Delivery Note", "POS Invoice"]
def execute(filters=None):
if not filters:
filters = {}
columns = get_columns(filters)
data = get_entries(filters)
return columns, data
return SalesPartnerCommissionSummaryReport(filters).run()
def get_columns(filters):
if not filters.get("doctype"):
msgprint(_("Please select the document type first"), raise_exception=1)
class SalesPartnerSummaryReport:
"""
Base class to generate Sales Partner Summary related Reports.
"""
columns = [
{
"label": _(filters["doctype"]),
"options": filters["doctype"],
"fieldname": "name",
"fieldtype": "Link",
"width": 140,
},
{
"label": _("Customer"),
"options": "Customer",
"fieldname": "customer",
"fieldtype": "Link",
"width": 140,
},
{
"label": _("Currency"),
"fieldname": "currency",
"fieldtype": "Data",
"width": 80,
},
{
"label": _("Territory"),
"options": "Territory",
"fieldname": "territory",
"fieldtype": "Link",
"width": 100,
},
{"label": _("Posting Date"), "fieldname": "posting_date", "fieldtype": "Date", "width": 100},
{
"label": _("Amount"),
"fieldname": "amount",
"fieldtype": "Currency",
"options": "currency",
"width": 120,
},
{
"label": _("Sales Partner"),
"options": "Sales Partner",
"fieldname": "sales_partner",
"fieldtype": "Link",
"width": 140,
},
{
"label": _("Commission Rate %"),
"fieldname": "commission_rate",
"fieldtype": "Data",
"width": 100,
},
{
"label": _("Total Commission"),
"fieldname": "total_commission",
"fieldtype": "Currency",
"options": "currency",
"width": 120,
},
]
dt: DocType
date_field: str
date_label: str
columns: list
data: list
query: QueryBuilder
filters: dict
return columns
def __init__(self, filters: dict):
self.filters = filters
self.columns = []
def run(self):
self.validate_filters()
self.prepare_columns()
self.get_data()
def get_entries(filters):
date_field = "transaction_date" if filters.get("doctype") == "Sales Order" else "posting_date"
company_currency = frappe.db.get_value("Company", filters.get("company"), "default_currency")
conditions = get_conditions(filters, date_field)
entries = frappe.db.sql(
return self.columns, self.data
def validate_filters(self):
if not self.filters.get("doctype"):
frappe.throw(_("Please select the document type first."))
if self.filters.get("doctype") not in SALES_TRANSACTION_DOCTYPES:
frappe.throw(_("DocType can be one of them {0}").format(comma_or(SALES_TRANSACTION_DOCTYPES)))
if not self.filters.get("company"):
frappe.throw(_("Please select a company."))
if (
self.filters.get("from_date")
and self.filters.get("to_date")
and self.filters.get("from_date") > self.filters.get("to_date")
):
frappe.throw(_("From Date cannot be greater than To Date."))
self._set_date_field_and_label()
def _set_date_field_and_label(self):
self.date_field = (
"transaction_date" if self.filters.get("doctype") == "Sales Order" else "posting_date"
)
self.date_label = _("Order Date") if self.date_field == "transaction_date" else _("Posting Date")
def prepare_columns(self):
"""
SELECT
name, customer, territory, {} as posting_date, base_net_total as amount,
sales_partner, commission_rate, total_commission, '{}' as currency
FROM
`tab{}`
WHERE
{} and docstatus = 1 and sales_partner is not null
and sales_partner != '' order by name desc, sales_partner
""".format(date_field, company_currency, filters.get("doctype"), conditions),
filters,
as_dict=1,
)
Extend this method to add columns on the report. Use `make_column` to add more columns.
"""
raise NotImplementedError
return entries
def get_data(self):
self.build_report_query()
self.data = self.query.run(as_dict=1)
def build_report_query(self):
self._build_report_base_query()
self.extend_report_query()
self._apply_common_filters()
self.apply_filters()
def _build_report_base_query(self):
self.dt = DocType(self.filters.get("doctype"))
company_currency = frappe.get_cached_value("Company", self.filters.get("company"), "default_currency")
self.query = (
frappe.qb.from_(self.dt)
.select(
self.dt.name,
self.dt.customer,
self.dt.territory,
Field(self.date_field, "posting_date", table=self.dt),
self.dt.sales_partner,
self.dt.commission_rate,
ConstantColumn(company_currency).as_("currency"),
)
.where(
(self.dt.docstatus == 1) & (self.dt.sales_partner.notnull()) & (self.dt.sales_partner != "")
)
.orderby(self.dt.name, order=Order.desc)
.orderby(self.dt.sales_partner)
)
def extend_report_query(self):
"""
Extend this method to select more columns on the query.
"""
pass
def _apply_common_filters(self):
for field in ["company", "customer", "territory", "sales_partner"]:
if self.filters.get(field):
self.query = self.query.where(Field(field, table=self.dt) == self.filters.get(field))
if self.filters.get("from_date"):
self.query = self.query.where(
Field(self.date_field, table=self.dt) >= self.filters.get("from_date")
)
if self.filters.get("to_date"):
self.query = self.query.where(
Field(self.date_field, table=self.dt) <= self.filters.get("to_date")
)
def apply_filters(self):
"""
Extend this method to add more conditions on the query.
"""
pass
def make_column(
self, label: str, fieldname: str, fieldtype: str, width: int = 140, options: str = "", hidden: int = 0
):
self.columns.append(
dict(
label=label,
fieldname=fieldname,
fieldtype=fieldtype,
options=options,
width=width,
hidden=hidden,
)
)
def get_conditions(filters, date_field):
conditions = "1=1"
class SalesPartnerCommissionSummaryReport(SalesPartnerSummaryReport):
def prepare_columns(self):
self.make_column(_(self.filters.get("doctype")), "name", "Link", options=self.filters.get("doctype"))
for field in ["company", "customer", "territory"]:
if filters.get(field):
conditions += f" and {field} = %({field})s"
self.make_column(_("Customer"), "customer", "Link", options="Customer")
if filters.get("sales_partner"):
conditions += " and sales_partner = %(sales_partner)s"
self.make_column(_("Currency"), "currency", "Data", 80, hidden=1)
if filters.get("from_date"):
conditions += f" and {date_field} >= %(from_date)s"
self.make_column(_("Territory"), "territory", "Link", 100, "Territory")
if filters.get("to_date"):
conditions += f" and {date_field} <= %(to_date)s"
self.make_column(self.date_label, "posting_date", "Date")
return conditions
self.make_column(_("Amount"), "amount", "Currency", 120, "currency")
self.make_column(_("Sales Partner"), "sales_partner", "Link", options="Sales Partner")
self.make_column(_("Commission Rate %"), "commission_rate", "Data", 100)
self.make_column(_("Total Commission"), "total_commission", "Currency", 120, "currency")
def extend_report_query(self):
self.query = self.query.select(
self.dt.base_net_total.as_("amount"),
self.dt.total_commission,
)

View File

@@ -3,6 +3,14 @@
frappe.query_reports["Sales Partner Transaction Summary"] = {
filters: [
{
fieldname: "company",
label: __("Company"),
fieldtype: "Link",
options: "Company",
default: frappe.defaults.get_user_default("Company"),
reqd: 1,
},
{
fieldname: "sales_partner",
label: __("Sales Partner"),
@@ -28,14 +36,6 @@ frappe.query_reports["Sales Partner Transaction Summary"] = {
fieldtype: "Date",
default: frappe.datetime.get_today(),
},
{
fieldname: "company",
label: __("Company"),
fieldtype: "Link",
options: "Company",
default: frappe.defaults.get_user_default("Company"),
reqd: 1,
},
{
fieldname: "item_group",
label: __("Item Group"),

View File

@@ -3,144 +3,84 @@
import frappe
from frappe import _, msgprint
from frappe import _
from frappe.query_builder import Case
from erpnext.selling.report.sales_partner_commission_summary.sales_partner_commission_summary import (
SalesPartnerSummaryReport,
)
def execute(filters=None):
if not filters:
filters = {}
columns = get_columns(filters)
data = get_entries(filters)
return columns, data
return SalesPartnerTransactionSummaryReport(filters=filters).run()
def get_columns(filters):
if not filters.get("doctype"):
msgprint(_("Please select the document type first"), raise_exception=1)
class SalesPartnerTransactionSummaryReport(SalesPartnerSummaryReport):
def prepare_columns(self):
self.make_column(_(self.filters.get("doctype")), "name", "Link", options=self.filters.get("doctype"))
columns = [
{
"label": _(filters["doctype"]),
"options": filters["doctype"],
"fieldname": "name",
"fieldtype": "Link",
"width": 140,
},
{
"label": _("Customer"),
"options": "Customer",
"fieldname": "customer",
"fieldtype": "Link",
"width": 140,
},
{
"label": _("Territory"),
"options": "Territory",
"fieldname": "territory",
"fieldtype": "Link",
"width": 100,
},
{"label": _("Posting Date"), "fieldname": "posting_date", "fieldtype": "Date", "width": 100},
{
"label": _("Item Code"),
"fieldname": "item_code",
"fieldtype": "Link",
"options": "Item",
"width": 100,
},
{
"label": _("Item Group"),
"fieldname": "item_group",
"fieldtype": "Link",
"options": "Item Group",
"width": 100,
},
{
"label": _("Brand"),
"fieldname": "brand",
"fieldtype": "Link",
"options": "Brand",
"width": 100,
},
{"label": _("Quantity"), "fieldname": "qty", "fieldtype": "Float", "width": 120},
{"label": _("Rate"), "fieldname": "rate", "fieldtype": "Currency", "width": 120},
{"label": _("Amount"), "fieldname": "amount", "fieldtype": "Currency", "width": 120},
{
"label": _("Sales Partner"),
"options": "Sales Partner",
"fieldname": "sales_partner",
"fieldtype": "Link",
"width": 140,
},
{
"label": _("Commission Rate %"),
"fieldname": "commission_rate",
"fieldtype": "Data",
"width": 100,
},
{"label": _("Commission"), "fieldname": "commission", "fieldtype": "Currency", "width": 120},
{
"label": _("Currency"),
"fieldname": "currency",
"fieldtype": "Link",
"options": "Currency",
"width": 120,
},
]
self.make_column(_("Customer"), "customer", "Link", options="Customer")
return columns
self.make_column(_("Currency"), "currency", "Data", 80, hidden=1)
self.make_column(_("Territory"), "territory", "Link", 100, "Territory")
def get_entries(filters):
date_field = "transaction_date" if filters.get("doctype") == "Sales Order" else "posting_date"
self.make_column(self.date_label, "posting_date", "Date")
conditions = get_conditions(filters, date_field)
entries = frappe.db.sql(
"""
SELECT
dt.name, dt.customer, dt.territory, dt.{date_field} as posting_date, dt.currency,
dt_item.base_net_rate as rate, dt_item.qty, dt_item.base_net_amount as amount,
((dt_item.base_net_amount * dt.commission_rate) / 100) as commission,
dt_item.brand, dt.sales_partner, dt.commission_rate, dt_item.item_group, dt_item.item_code
FROM
`tab{doctype}` dt, `tab{doctype} Item` dt_item
WHERE
{cond} and dt.name = dt_item.parent and dt.docstatus = 1
and dt.sales_partner is not null and dt.sales_partner != ''
order by dt.name desc, dt.sales_partner
""".format(date_field=date_field, doctype=filters.get("doctype"), cond=conditions),
filters,
as_dict=1,
)
self.make_column(_("Item Code"), "item_code", "Link", 100, "Item")
return entries
self.make_column(_("Item Group"), "item_group", "Link", 100, "Item Group")
self.make_column(_("Brand"), "brand", "Link", 100, "Brand")
def get_conditions(filters, date_field):
conditions = "1=1"
self.make_column(_("Quantity"), "qty", "Float", 120)
for field in ["company", "customer", "territory", "sales_partner"]:
if filters.get(field):
conditions += f" and dt.{field} = %({field})s"
self.make_column(_("Rate"), "rate", "Currency", 120, "currency")
if filters.get("from_date"):
conditions += f" and dt.{date_field} >= %(from_date)s"
self.make_column(_("Amount"), "amount", "Currency", 120, "currency")
if filters.get("to_date"):
conditions += f" and dt.{date_field} <= %(to_date)s"
self.make_column(_("Sales Partner"), "sales_partner", "Link", options="Sales Partner")
if not filters.get("show_return_entries"):
conditions += " and dt_item.qty > 0.0"
self.make_column(_("Commission Rate %"), "commission_rate", "Data", 100)
if filters.get("brand"):
conditions += " and dt_item.brand = %(brand)s"
self.make_column(_("Commission"), "commission", "Currency", 120, "currency")
if filters.get("item_group"):
lft, rgt = frappe.get_cached_value("Item Group", filters.get("item_group"), ["lft", "rgt"])
def extend_report_query(self):
self.dt_item = frappe.qb.DocType(f"{self.filters['doctype']} Item")
conditions += f""" and dt_item.item_group in (select name from
`tabItem Group` where lft >= {lft} and rgt <= {rgt})"""
self.query = (
self.query.join(self.dt_item)
.on(self.dt.name == self.dt_item.parent)
.select(
self.dt_item.base_net_rate.as_("rate"),
self.dt_item.qty,
self.dt_item.base_net_amount.as_("amount"),
Case()
.when(
self.dt_item.grant_commission.eq(1),
((self.dt_item.base_net_amount * self.dt.commission_rate) / 100),
)
.else_(0)
.as_("commission"),
self.dt_item.brand,
self.dt_item.item_group,
self.dt_item.item_code,
)
)
return conditions
def apply_filters(self):
if not self.filters.get("show_return_entries"):
self.query = self.query.where(self.dt_item.qty > 0.0)
if self.filters.get("brand"):
self.query = self.query.where(self.dt_item.brand == self.filters.get("brand"))
if self.filters.get("item_group"):
lft, rgt = frappe.get_cached_value("Item Group", self.filters.get("item_group"), ["lft", "rgt"])
if item_groups := frappe.get_all(
"Item Group", filters=[["lft", ">=", lft], ["rgt", "<=", rgt]], pluck="name"
):
self.query = self.query.where(self.dt_item.item_group.isin(item_groups))