Merge pull request #54449 from vorasmit/tds-reports-refactor

refactor: tax witholding report
This commit is contained in:
Smit Vora
2026-04-23 14:57:31 +05:30
committed by GitHub
3 changed files with 328 additions and 374 deletions

View File

@@ -6,301 +6,282 @@ from frappe import _
from frappe.query_builder.functions import IfNull
def execute(filters=None):
"""Generate Tax Withholding Details report"""
validate_filters(filters)
class TaxWithholdingDetailsReport:
party_types = ("Customer", "Supplier")
document_types = ("Purchase Invoice", "Sales Invoice", "Payment Entry", "Journal Entry")
# Process and format data
data = get_tax_withholding_data(filters)
columns = get_columns(filters)
def __init__(self, filters=None):
self.filters = frappe._dict(filters or {})
self.entries = []
self.doc_info = {}
self.party_details = {}
return columns, data
@classmethod
def execute(cls, filters=None):
return cls(filters).run()
def run(self):
self.validate_filters()
return self.get_columns(), self.get_data()
def validate_filters(filters):
"""Validate report filters"""
filters = frappe._dict(filters or {})
def validate_filters(self):
if not self.filters.from_date or not self.filters.to_date:
frappe.throw(_("From Date and To Date are required"))
if not filters.from_date or not filters.to_date:
frappe.throw(_("From Date and To Date are required"))
if self.filters.from_date > self.filters.to_date:
frappe.throw(_("From Date must be before To Date"))
if filters.from_date > filters.to_date:
frappe.throw(_("From Date must be before To Date"))
def get_data(self):
self.entries = self.get_entries_query().run(as_dict=True)
if not self.entries:
return []
self.doc_info = self.fetch_additional_doc_info()
self.party_details = self.fetch_party_details()
return self.build_rows()
def get_tax_withholding_data(filters):
"""Process entries into final report format"""
data = []
entries = get_tax_withholding_entries(filters)
if not entries:
return data
def build_rows(self):
rows = []
for entry in self.entries:
doc_details = (
self.doc_info.get((entry.transaction_type, entry.ref_no), {}) if entry.ref_no else {}
)
party_info = self.party_details.get((entry.party_type, entry.party), {})
rows.append({**entry, **doc_details, **party_info})
doc_info = get_additional_doc_info(entries)
party_details = get_party_details(entries)
rows.sort(
key=lambda x: (
x["tax_withholding_category"] or "",
x["transaction_date"] or "",
x["withholding_name"] or "",
)
)
return rows
for entry in entries:
doc_details = frappe._dict()
if entry.taxable_name:
doc_details = doc_info.get((entry.taxable_doctype, entry.taxable_name), {})
def get_entries_query(self):
twe = frappe.qb.DocType("Tax Withholding Entry")
query = (
frappe.qb.from_(twe)
.select(
twe.party_type,
twe.party,
IfNull(twe.tax_id, "").as_("tax_id"),
twe.tax_withholding_category,
twe.taxable_amount.as_("total_amount"),
twe.tax_rate.as_("rate"),
twe.withholding_amount.as_("tax_amount"),
IfNull(twe.taxable_doctype, "").as_("transaction_type"),
IfNull(twe.taxable_name, "").as_("ref_no"),
twe.taxable_date,
IfNull(twe.withholding_doctype, "").as_("withholding_doctype"),
IfNull(twe.withholding_name, "").as_("withholding_name"),
twe.withholding_date.as_("transaction_date"),
)
.where(twe.docstatus == 1)
.where(twe.withholding_date >= self.filters.from_date)
.where(twe.withholding_date <= self.filters.to_date)
.where(IfNull(twe.withholding_name, "") != "")
.where(twe.status != "Duplicate")
)
party_info = party_details.get((entry.party_type, entry.party), {})
if self.filters.company:
query = query.where(twe.company == self.filters.company)
if self.filters.party_type:
query = query.where(twe.party_type == self.filters.party_type)
if self.filters.party:
query = query.where(twe.party == self.filters.party)
row = {
"section_code": entry.tax_withholding_category,
"entity_type": party_info.get("entity_type"),
"rate": entry.tax_rate,
"total_amount": entry.taxable_amount,
"grand_total": doc_details.get("grand_total", 0),
"base_total": doc_details.get("base_total", 0),
"tax_amount": entry.withholding_amount,
"transaction_date": entry.withholding_date,
"transaction_type": entry.taxable_doctype,
"ref_no": entry.taxable_name,
"taxable_date": entry.taxable_date,
"supplier_invoice_no": doc_details.get("bill_no"),
"supplier_invoice_date": doc_details.get("bill_date"),
"withholding_doctype": entry.withholding_doctype,
"withholding_name": entry.withholding_name,
"party_name": party_info.get("party_name"),
"tax_id": entry.tax_id,
"party": entry.party,
"party_type": entry.party_type,
}
data.append(row)
return query
# Sort by section code, transaction date, then withholding_name for deterministic ordering
data.sort(
key=lambda x: (x["section_code"] or "", x["transaction_date"] or "", x["withholding_name"] or "")
)
return data
def fetch_party_details(self):
parties_by_type = {pt: set() for pt in self.party_types}
for entry in self.entries:
if entry.party_type in parties_by_type and entry.party:
parties_by_type[entry.party_type].add(entry.party)
party_map = {}
for party_type, party_set in parties_by_type.items():
if not party_set:
continue
def get_party_details(entries):
"""Fetch party details in batch for all entries"""
party_map = frappe._dict()
parties_by_type = {"Customer": set(), "Supplier": set()}
query = self.get_party_query(party_type, party_set)
if query is None:
continue
# Group parties by type
for entry in entries:
if entry.party_type in parties_by_type and entry.party:
parties_by_type[entry.party_type].add(entry.party)
for row in query.run(as_dict=True):
party_map[(party_type, row.pop("name"))] = row
# Batch fetch for each party type
for party_type, party_set in parties_by_type.items():
if not party_type or not party_set:
continue
return party_map
def get_party_query(self, party_type, party_set):
doctype = frappe.qb.DocType(party_type)
fields = [doctype.name]
if party_type == "Supplier":
fields.extend([doctype.supplier_type.as_("entity_type"), doctype.supplier_name.as_("party_name")])
fields.extend(
[
doctype.supplier_type.as_("party_entity_type"),
doctype.supplier_name.as_("party_name"),
]
)
elif party_type == "Customer":
fields.extend([doctype.customer_type.as_("entity_type"), doctype.customer_name.as_("party_name")])
fields.extend(
[
doctype.customer_type.as_("party_entity_type"),
doctype.customer_name.as_("party_name"),
]
)
else:
return None
query = frappe.qb.from_(doctype).select(*fields).where(doctype.name.isin(party_set))
party_details = query.run(as_dict=True)
return frappe.qb.from_(doctype).select(*fields).where(doctype.name.isin(party_set))
for party in party_details:
party_map[(party_type, party.name)] = party
def fetch_additional_doc_info(self):
docs_by_type = {dt: set() for dt in self.document_types}
for entry in self.entries:
if entry.ref_no and entry.transaction_type in docs_by_type:
docs_by_type[entry.transaction_type].add(entry.ref_no)
return party_map
doc_info = {}
for doctype_name, voucher_set in docs_by_type.items():
if not voucher_set:
continue
query = self.get_doc_info_query(doctype_name, voucher_set)
if query is None:
continue
for row in query.run(as_dict=True):
doc_info[(doctype_name, row.pop("name"))] = row
return doc_info
def get_doc_info_query(self, doctype_name, voucher_set):
if doctype_name == "Purchase Invoice":
get_doc_fields = self.get_purchase_invoice_fields
elif doctype_name == "Sales Invoice":
get_doc_fields = self.get_sales_invoice_fields
elif doctype_name == "Payment Entry":
get_doc_fields = self.get_payment_entry_fields
elif doctype_name == "Journal Entry":
get_doc_fields = self.get_journal_entry_fields
else:
return None
doctype = frappe.qb.DocType(doctype_name)
fields = [doctype.name, *get_doc_fields(doctype)]
return frappe.qb.from_(doctype).select(*fields).where(doctype.name.isin(voucher_set))
def get_purchase_invoice_fields(self, doctype):
return [
doctype.grand_total,
doctype.base_total,
doctype.bill_no.as_("supplier_invoice_no"),
doctype.bill_date.as_("supplier_invoice_date"),
]
def get_sales_invoice_fields(self, doctype):
return [doctype.grand_total, doctype.base_total]
def get_payment_entry_fields(self, doctype):
return [
doctype.paid_amount_after_tax.as_("grand_total"),
doctype.base_paid_amount.as_("base_total"),
]
def get_journal_entry_fields(self, doctype):
return [doctype.total_debit.as_("grand_total"), doctype.total_debit.as_("base_total")]
def get_columns(self):
party_type = self.filters.get("party_type", "Party")
return [
{
"label": _("Tax Withholding Category"),
"options": "Tax Withholding Category",
"fieldname": "tax_withholding_category",
"fieldtype": "Link",
"width": 90,
},
{"label": _("Tax Id"), "fieldname": "tax_id", "fieldtype": "Data", "width": 60},
{
"label": _(f"{party_type} Name"),
"fieldname": "party_name",
"fieldtype": "Data",
"width": 180,
},
{
"label": _(party_type),
"fieldname": "party",
"fieldtype": "Dynamic Link",
"options": "party_type",
"width": 180,
},
{
"label": _(f"{party_type} Type"),
"fieldname": "party_entity_type",
"fieldtype": "Data",
"width": 100,
},
{
"label": _("Supplier Invoice No"),
"fieldname": "supplier_invoice_no",
"fieldtype": "Data",
"width": 120,
},
{
"label": _("Supplier Invoice Date"),
"fieldname": "supplier_invoice_date",
"fieldtype": "Date",
"width": 120,
},
{"label": _("Tax Rate %"), "fieldname": "rate", "fieldtype": "Percent", "width": 60},
{
"label": _("Taxable Amount"),
"fieldname": "total_amount",
"fieldtype": "Currency",
"width": 120,
},
{"label": _("Tax Amount"), "fieldname": "tax_amount", "fieldtype": "Currency", "width": 120},
{
"label": _("Grand Total (Company Currency)"),
"fieldname": "base_total",
"fieldtype": "Currency",
"width": 150,
},
{
"label": _("Grand Total (Transaction Currency)"),
"fieldname": "grand_total",
"fieldtype": "Currency",
"width": 170,
},
{"label": _("Reference Date"), "fieldname": "taxable_date", "fieldtype": "Date", "width": 100},
{
"label": _("Transaction Type"),
"fieldname": "transaction_type",
"fieldtype": "Data",
"width": 130,
},
{
"label": _("Reference No."),
"fieldname": "ref_no",
"fieldtype": "Dynamic Link",
"options": "transaction_type",
"width": 180,
},
{
"label": _("Date of Transaction"),
"fieldname": "transaction_date",
"fieldtype": "Date",
"width": 100,
},
{
"label": _("Withholding Document"),
"fieldname": "withholding_name",
"fieldtype": "Dynamic Link",
"options": "withholding_doctype",
"width": 150,
},
]
def get_columns(filters):
"""Generate report columns based on filters"""
columns = [
{
"label": _("Section Code"),
"options": "Tax Withholding Category",
"fieldname": "section_code",
"fieldtype": "Link",
"width": 90,
},
{"label": _("Tax Id"), "fieldname": "tax_id", "fieldtype": "Data", "width": 60},
{
"label": _(f"{filters.get('party_type', 'Party')} Name"),
"fieldname": "party_name",
"fieldtype": "Data",
"width": 180,
},
{
"label": _(filters.get("party_type", "Party")),
"fieldname": "party",
"fieldtype": "Dynamic Link",
"options": "party_type",
"width": 180,
},
{
"label": _("Entity Type"),
"fieldname": "entity_type",
"fieldtype": "Data",
"width": 100,
},
{
"label": _("Supplier Invoice No"),
"fieldname": "supplier_invoice_no",
"fieldtype": "Data",
"width": 120,
},
{
"label": _("Supplier Invoice Date"),
"fieldname": "supplier_invoice_date",
"fieldtype": "Date",
"width": 120,
},
{
"label": _("Tax Rate %"),
"fieldname": "rate",
"fieldtype": "Percent",
"width": 60,
},
{
"label": _("Taxable Amount"),
"fieldname": "total_amount",
"fieldtype": "Currency",
"width": 120,
},
{
"label": _("Tax Amount"),
"fieldname": "tax_amount",
"fieldtype": "Currency",
"width": 120,
},
{
"label": _("Grand Total (Company Currency)"),
"fieldname": "base_total",
"fieldtype": "Currency",
"width": 150,
},
{
"label": _("Grand Total (Transaction Currency)"),
"fieldname": "grand_total",
"fieldtype": "Currency",
"width": 170,
},
{
"label": _("Reference Date"),
"fieldname": "taxable_date",
"fieldtype": "Date",
"width": 100,
},
{
"label": _("Transaction Type"),
"fieldname": "transaction_type",
"fieldtype": "Data",
"width": 130,
},
{
"label": _("Reference No."),
"fieldname": "ref_no",
"fieldtype": "Dynamic Link",
"options": "transaction_type",
"width": 180,
},
{
"label": _("Date of Transaction"),
"fieldname": "transaction_date",
"fieldtype": "Date",
"width": 100,
},
{
"label": _("Withholding Document"),
"fieldname": "withholding_name",
"fieldtype": "Dynamic Link",
"options": "withholding_doctype",
"width": 150,
},
]
return columns
def get_tax_withholding_entries(filters):
twe = frappe.qb.DocType("Tax Withholding Entry")
query = (
frappe.qb.from_(twe)
.select(
twe.company,
twe.party_type,
twe.party,
IfNull(twe.tax_id, "").as_("tax_id"),
twe.tax_withholding_category,
IfNull(twe.tax_withholding_group, "").as_("tax_withholding_group"),
twe.taxable_amount,
twe.tax_rate,
twe.withholding_amount,
IfNull(twe.taxable_doctype, "").as_("taxable_doctype"),
IfNull(twe.taxable_name, "").as_("taxable_name"),
twe.taxable_date,
IfNull(twe.under_withheld_reason, "").as_("under_withheld_reason"),
IfNull(twe.lower_deduction_certificate, "").as_("lower_deduction_certificate"),
IfNull(twe.withholding_doctype, "").as_("withholding_doctype"),
IfNull(twe.withholding_name, "").as_("withholding_name"),
twe.withholding_date,
twe.status,
)
.where(twe.docstatus == 1)
.where(twe.withholding_date >= filters.from_date)
.where(twe.withholding_date <= filters.to_date)
.where(IfNull(twe.withholding_name, "") != "")
.where(twe.status != "Duplicate")
)
if filters.get("company"):
query = query.where(twe.company == filters.get("company"))
if filters.get("party_type"):
query = query.where(twe.party_type == filters.get("party_type"))
if filters.get("party"):
query = query.where(twe.party == filters.get("party"))
return query.run(as_dict=True)
def get_additional_doc_info(entries):
"""Fetch additional document information in batch"""
doc_info = {}
docs_by_type = {
"Purchase Invoice": set(),
"Sales Invoice": set(),
"Payment Entry": set(),
"Journal Entry": set(),
}
# Group documents by type
for entry in entries:
if entry.taxable_name and entry.taxable_doctype in docs_by_type:
docs_by_type[entry.taxable_doctype].add(entry.taxable_name)
for doctype_name, voucher_set in docs_by_type.items():
if voucher_set:
_fetch_doc_info(doctype_name, voucher_set, doc_info)
return doc_info
def _fetch_doc_info(doctype_name, voucher_set, doc_info):
doctype = frappe.qb.DocType(doctype_name)
fields = [doctype.name]
# Add doctype-specific fields
if doctype_name == "Purchase Invoice":
fields.extend([doctype.grand_total, doctype.base_total, doctype.bill_no, doctype.bill_date])
elif doctype_name == "Sales Invoice":
fields.extend([doctype.grand_total, doctype.base_total])
elif doctype_name == "Payment Entry":
fields.extend(
[doctype.paid_amount_after_tax.as_("grand_total"), doctype.base_paid_amount.as_("base_total")]
)
elif doctype_name == "Journal Entry":
fields.extend([doctype.total_debit.as_("grand_total"), doctype.total_debit.as_("base_total")])
else:
return
query = frappe.qb.from_(doctype).select(*fields).where(doctype.name.isin(voucher_set))
entries = query.run(as_dict=True)
for entry in entries:
doc_info[(doctype_name, entry.name)] = entry
execute = TaxWithholdingDetailsReport.execute

View File

@@ -40,7 +40,7 @@ class TestTaxWithholdingDetails(ERPNextTestSuite, AccountsTestMixin):
expected_values = [
[jv.name, "TCS", 0.075, 1000.75, 0.75, 1000.75],
["", "TCS", 0.075, 0, 0.75, 0],
["", "TCS", 0.075, None, 0.75, None],
[si.name, "TCS", 0.075, 1000.0, 0.75, 1000.75],
]
self.check_expected_values(result, expected_values)
@@ -124,7 +124,7 @@ class TestTaxWithholdingDetails(ERPNextTestSuite, AccountsTestMixin):
voucher_expected_values = expected_values[i]
voucher_actual_values = (
voucher.ref_no,
voucher.section_code,
voucher.tax_withholding_category,
voucher.rate,
voucher.base_total,
voucher.tax_amount,

View File

@@ -2,121 +2,94 @@ import frappe
from frappe import _
from erpnext.accounts.report.tax_withholding_details.tax_withholding_details import (
get_tax_withholding_data,
TaxWithholdingDetailsReport,
)
from erpnext.accounts.utils import get_fiscal_year
def execute(filters=None):
validate_filters(filters)
class TDSComputationSummaryReport(TaxWithholdingDetailsReport):
GROUP_BY_FIELDS = ("party_type", "party", "tax_withholding_category")
CARRY_OVER_FIELDS = (
"tax_id",
"party",
"party_type",
"party_name",
"tax_withholding_category",
"party_entity_type",
"rate",
)
AGGREGATE_FIELDS = ("total_amount", "tax_amount")
data = get_tax_withholding_data(filters)
columns = get_columns(filters)
def validate_filters(self):
if self.filters.from_date > self.filters.to_date:
frappe.throw(_("From Date must be before To Date"))
final_result = group_by_party_and_category(data, filters)
from_year = get_fiscal_year(self.filters.from_date)[0]
to_year = get_fiscal_year(self.filters.to_date)[0]
if from_year != to_year:
frappe.throw(_("From Date and To Date lie in different Fiscal Year"))
return columns, final_result
self.filters.fiscal_year = from_year
def get_data(self):
return self.group_rows(super().get_data())
def validate_filters(filters):
"""Validate if dates are properly set and lie in the same fiscal year"""
if filters.from_date > filters.to_date:
frappe.throw(_("From Date must be before To Date"))
def group_rows(self, data):
grouped = {}
for row in data:
key = tuple(row.get(f) for f in self.GROUP_BY_FIELDS)
bucket = grouped.setdefault(
key,
{
**{f: row.get(f) for f in self.CARRY_OVER_FIELDS},
**{f: 0.0 for f in self.AGGREGATE_FIELDS},
},
)
from_year = get_fiscal_year(filters.from_date)[0]
to_year = get_fiscal_year(filters.to_date)[0]
if from_year != to_year:
frappe.throw(_("From Date and To Date lie in different Fiscal Year"))
for f in self.AGGREGATE_FIELDS:
bucket[f] += row.get(f) or 0.0
filters["fiscal_year"] = from_year
return list(grouped.values())
def group_by_party_and_category(data, filters):
party_category_wise_map = {}
for row in data:
party_category_wise_map.setdefault(
(row.get("party"), row.get("section_code")),
def get_columns(self):
party_type = self.filters.get("party_type", "Party")
return [
{"label": _("Tax Id"), "fieldname": "tax_id", "fieldtype": "Data", "width": 90},
{
"tax_id": row.get("tax_id"),
"party": row.get("party"),
"party_name": row.get("party_name"),
"section_code": row.get("section_code"),
"entity_type": row.get("entity_type"),
"rate": row.get("rate"),
"total_amount": 0.0,
"tax_amount": 0.0,
"label": _(party_type),
"fieldname": "party",
"fieldtype": "Dynamic Link",
"options": "party_type",
"width": 180,
},
)
party_category_wise_map.get((row.get("party"), row.get("section_code")))["total_amount"] += row.get(
"total_amount", 0.0
)
party_category_wise_map.get((row.get("party"), row.get("section_code")))["tax_amount"] += row.get(
"tax_amount", 0.0
)
final_result = get_final_result(party_category_wise_map)
return final_result
{
"label": _(f"{party_type} Name"),
"fieldname": "party_name",
"fieldtype": "Data",
"width": 180,
},
{
"label": _("Tax Withholding Category"),
"options": "Tax Withholding Category",
"fieldname": "tax_withholding_category",
"fieldtype": "Link",
"width": 180,
},
{
"label": _(f"{party_type} Type"),
"fieldname": "party_entity_type",
"fieldtype": "Data",
"width": 180,
},
{"label": _("Tax Rate %"), "fieldname": "rate", "fieldtype": "Percent", "width": 120},
{
"label": _("Total Taxable Amount"),
"fieldname": "total_amount",
"fieldtype": "Float",
"width": 120,
},
{"label": _("Tax Amount"), "fieldname": "tax_amount", "fieldtype": "Float", "width": 120},
]
def get_final_result(party_category_wise_map):
out = []
for _key, value in party_category_wise_map.items():
out.append(value)
return out
def get_columns(filters):
columns = [
{"label": _("Tax Id"), "fieldname": "tax_id", "fieldtype": "Data", "width": 90},
{
"label": _(filters.get("party_type")),
"fieldname": "party",
"fieldtype": "Dynamic Link",
"options": "party_type",
"width": 180,
},
{
"label": _(f"{filters.get('party_type', 'Party')} Name"),
"fieldname": "party_name",
"fieldtype": "Data",
"width": 180,
},
{
"label": _("Section Code"),
"options": "Tax Withholding Category",
"fieldname": "section_code",
"fieldtype": "Link",
"width": 180,
},
{
"label": _("Entity Type"),
"fieldname": "entity_type",
"fieldtype": "Data",
"width": 180,
},
{
"label": _("Tax Rate %"),
"fieldname": "rate",
"fieldtype": "Percent",
"width": 120,
},
{
"label": _("Total Taxable Amount"),
"fieldname": "total_amount",
"fieldtype": "Float",
"width": 120,
},
{
"label": _("Tax Amount"),
"fieldname": "tax_amount",
"fieldtype": "Float",
"width": 120,
},
]
return columns
execute = TDSComputationSummaryReport.execute