fix(tds-report): correct party type filtering and refactor

This commit is contained in:
ljain112
2026-02-25 16:30:30 +05:30
parent 3ae5de7b11
commit e5eb5406da

View File

@@ -19,18 +19,14 @@ def execute(filters=None):
validate_filters(filters) validate_filters(filters)
( (
tds_docs,
tds_accounts, tds_accounts,
tax_category_map, tax_category_map,
journal_entry_party_map,
net_total_map, net_total_map,
) = get_tds_docs(filters) ) = get_tds_docs(filters)
columns = get_columns(filters) columns = get_columns(filters)
res = get_result( res = get_result(filters, tds_accounts, tax_category_map, net_total_map)
filters, tds_docs, tds_accounts, tax_category_map, journal_entry_party_map, net_total_map
)
return columns, res return columns, res
@@ -41,27 +37,24 @@ def validate_filters(filters):
frappe.throw(_("From Date must be before To Date")) frappe.throw(_("From Date must be before To Date"))
def get_result(filters, tds_docs, tds_accounts, tax_category_map, journal_entry_party_map, net_total_map): def get_result(filters, tds_accounts, tax_category_map, net_total_map):
party_map = get_party_pan_map(filters.get("party_type")) party_names = {v.party for v in net_total_map.values() if v.party}
party_map = get_party_pan_map(filters.get("party_type"), party_names)
tax_rate_map = get_tax_rate_map(filters) tax_rate_map = get_tax_rate_map(filters)
gle_map = get_gle_map(tds_docs) gle_map = get_gle_map(net_total_map)
precision = get_currency_precision() precision = get_currency_precision()
out = []
entries = {} entries = {}
for name, details in gle_map.items(): for name, details in gle_map.items():
for entry in details: for entry in details:
tax_amount, total_amount, grand_total, base_total, base_tax_withholding_net_total = 0, 0, 0, 0, 0 tax_amount, total_amount, grand_total, base_total, base_tax_withholding_net_total = 0, 0, 0, 0, 0
tax_withholding_category, rate = None, None tax_withholding_category, rate = None, None
bill_no, bill_date = "", "" bill_no, bill_date = "", ""
party = entry.party or entry.against
posting_date = entry.posting_date posting_date = entry.posting_date
voucher_type = entry.voucher_type voucher_type = entry.voucher_type
if voucher_type == "Journal Entry": values = net_total_map.get((voucher_type, name))
party_list = journal_entry_party_map.get(name) party = values.party if values else (entry.party or entry.against)
if party_list:
party = party_list[0]
if entry.account in tds_accounts.keys(): if entry.account in tds_accounts.keys():
tax_amount += entry.credit - entry.debit tax_amount += entry.credit - entry.debit
@@ -76,12 +69,13 @@ def get_result(filters, tds_docs, tds_accounts, tax_category_map, journal_entry_
rate = get_tax_withholding_rates(tax_rate_map.get(tax_withholding_category, []), posting_date) rate = get_tax_withholding_rates(tax_rate_map.get(tax_withholding_category, []), posting_date)
values = net_total_map.get((voucher_type, name))
if values: if values:
if voucher_type == "Journal Entry" and tax_amount and rate: if voucher_type == "Journal Entry" and tax_amount and rate:
# back calculate total amount from rate and tax_amount # back calculate total amount from rate and tax_amount
base_total = min(flt(tax_amount / (rate / 100), precision=precision), values[0]) base_total = min(
flt(tax_amount / (rate / 100), precision=precision),
values.base_tax_withholding_net_total,
)
total_amount = grand_total = base_total total_amount = grand_total = base_total
base_tax_withholding_net_total = total_amount base_tax_withholding_net_total = total_amount
@@ -90,16 +84,16 @@ def get_result(filters, tds_docs, tds_accounts, tax_category_map, journal_entry_
# back calculate total amount from rate and tax_amount # back calculate total amount from rate and tax_amount
total_amount = flt((tax_amount * 100) / rate, precision=precision) total_amount = flt((tax_amount * 100) / rate, precision=precision)
else: else:
total_amount = values[0] total_amount = values.base_tax_withholding_net_total
grand_total = values[1] grand_total = values.grand_total
base_total = values[2] base_total = values.base_total
base_tax_withholding_net_total = total_amount base_tax_withholding_net_total = total_amount
if voucher_type == "Purchase Invoice": if voucher_type == "Purchase Invoice":
base_tax_withholding_net_total = values[0] base_tax_withholding_net_total = values.base_tax_withholding_net_total
bill_no = values[3] bill_no = values.bill_no
bill_date = values[4] bill_date = values.bill_date
else: else:
total_amount += entry.credit total_amount += entry.credit
@@ -152,9 +146,12 @@ def get_result(filters, tds_docs, tds_accounts, tax_category_map, journal_entry_
return out return out
def get_party_pan_map(party_type): def get_party_pan_map(party_type, party_names):
party_map = frappe._dict() party_map = frappe._dict()
if not party_names:
return party_map
fields = ["name", "tax_withholding_category"] fields = ["name", "tax_withholding_category"]
if party_type == "Supplier": if party_type == "Supplier":
fields += ["supplier_type", "supplier_name"] fields += ["supplier_type", "supplier_name"]
@@ -164,7 +161,7 @@ def get_party_pan_map(party_type):
if frappe.db.has_column(party_type, "pan"): if frappe.db.has_column(party_type, "pan"):
fields.append("pan") fields.append("pan")
party_details = frappe.db.get_all(party_type, fields=fields) party_details = frappe.db.get_all(party_type, filters={"name": ("in", list(party_names))}, fields=fields)
for party in party_details: for party in party_details:
party.party_type = party_type party.party_type = party_type
@@ -173,22 +170,39 @@ def get_party_pan_map(party_type):
return party_map return party_map
def get_gle_map(documents): def get_gle_map(net_total_map):
# create gle_map of the form if not net_total_map:
# {"purchase_invoice": list of dict of all gle created for this invoice} return {}
voucher_types = set()
voucher_nos = set()
for doctype, voucher_no in net_total_map.keys():
voucher_types.add(doctype)
voucher_nos.add(voucher_no)
gle = frappe.qb.DocType("GL Entry")
rows = (
frappe.qb.from_(gle)
.select(
gle.credit,
gle.debit,
gle.account,
gle.voucher_no,
gle.posting_date,
gle.voucher_type,
gle.against,
gle.party,
gle.party_type,
)
.where(gle.is_cancelled == 0)
.where(gle.voucher_type.isin(voucher_types))
.where(gle.voucher_no.isin(voucher_nos))
).run(as_dict=True)
gle_map = {} gle_map = {}
for d in rows:
gle = frappe.db.get_all( gle_map.setdefault(d.voucher_no, []).append(d)
"GL Entry",
{"voucher_no": ["in", documents], "is_cancelled": 0},
["credit", "debit", "account", "voucher_no", "posting_date", "voucher_type", "against", "party"],
)
for d in gle:
if d.voucher_no not in gle_map:
gle_map[d.voucher_no] = [d]
else:
gle_map[d.voucher_no].append(d)
return gle_map return gle_map
@@ -308,14 +322,9 @@ def get_columns(filters):
def get_tds_docs(filters): def get_tds_docs(filters):
tds_documents = [] vouchers = frappe._dict()
purchase_invoices = []
sales_invoices = []
payment_entries = []
journal_entries = []
tax_category_map = frappe._dict() tax_category_map = frappe._dict()
net_total_map = frappe._dict() net_total_map = frappe._dict()
journal_entry_party_map = frappe._dict()
bank_accounts = frappe.get_all("Account", {"is_group": 0, "account_type": "Bank"}, pluck="name") bank_accounts = frappe.get_all("Account", {"is_group": 0, "account_type": "Bank"}, pluck="name")
_tds_accounts = frappe.get_all( _tds_accounts = frappe.get_all(
@@ -334,35 +343,14 @@ def get_tds_docs(filters):
tds_docs = get_tds_docs_query(filters, bank_accounts, list(tds_accounts.keys())).run(as_dict=True) tds_docs = get_tds_docs_query(filters, bank_accounts, list(tds_accounts.keys())).run(as_dict=True)
for d in tds_docs: for d in tds_docs:
if d.voucher_type == "Purchase Invoice": vouchers.setdefault(d.voucher_type, set()).add(d.voucher_no)
purchase_invoices.append(d.voucher_no)
if d.voucher_type == "Sales Invoice":
sales_invoices.append(d.voucher_no)
elif d.voucher_type == "Payment Entry":
payment_entries.append(d.voucher_no)
elif d.voucher_type == "Journal Entry":
journal_entries.append(d.voucher_no)
tds_documents.append(d.voucher_no) for voucher_type, docs in vouchers.items():
get_doc_info(docs, voucher_type, tax_category_map, net_total_map, filters)
if purchase_invoices:
get_doc_info(purchase_invoices, "Purchase Invoice", tax_category_map, net_total_map)
if sales_invoices:
get_doc_info(sales_invoices, "Sales Invoice", tax_category_map, net_total_map)
if payment_entries:
get_doc_info(payment_entries, "Payment Entry", tax_category_map, net_total_map)
if journal_entries:
journal_entry_party_map = get_journal_entry_party_map(journal_entries)
get_doc_info(journal_entries, "Journal Entry", tax_category_map, net_total_map)
return ( return (
tds_documents,
tds_accounts, tds_accounts,
tax_category_map, tax_category_map,
journal_entry_party_map,
net_total_map, net_total_map,
) )
@@ -373,11 +361,16 @@ def get_tds_docs_query(filters, bank_accounts, tds_accounts):
_("No {0} Accounts found for this company.").format(frappe.bold(_("Tax Withholding"))), _("No {0} Accounts found for this company.").format(frappe.bold(_("Tax Withholding"))),
title=_("Accounts Missing Error"), title=_("Accounts Missing Error"),
) )
invoice_voucher = "Purchase Invoice" if filters.get("party_type") == "Supplier" else "Sales Invoice"
voucher_types = {"Payment Entry", "Journal Entry", invoice_voucher}
gle = frappe.qb.DocType("GL Entry") gle = frappe.qb.DocType("GL Entry")
query = ( query = (
frappe.qb.from_(gle) frappe.qb.from_(gle)
.select("voucher_no", "voucher_type", "against", "party") .select("voucher_no", "voucher_type", "against", "party")
.where(gle.is_cancelled == 0) .where(gle.is_cancelled == 0)
.where(gle.voucher_type.isin(voucher_types))
) )
if filters.get("from_date"): if filters.get("from_date"):
@@ -403,25 +396,27 @@ def get_tds_docs_query(filters, bank_accounts, tds_accounts):
return query return query
def get_journal_entry_party_map(journal_entries): def get_journal_entry_party_map(journal_entries, party_type):
journal_entry_party_map = {} journal_entry_party_map = {}
for d in frappe.db.get_all( for d in frappe.db.get_all(
"Journal Entry Account", "Journal Entry Account",
{ {
"parent": ("in", journal_entries), "parent": ("in", journal_entries),
"party_type": ("in", ("Supplier", "Customer")), "party_type": party_type,
"party": ("is", "set"), "party": ("is", "set"),
}, },
["parent", "party"], ["parent", "party"],
): ):
if d.parent not in journal_entry_party_map: journal_entry_party_map.setdefault(d.parent, []).append(d.party)
journal_entry_party_map[d.parent] = []
journal_entry_party_map[d.parent].append(d.party)
return journal_entry_party_map return journal_entry_party_map
def get_doc_info(vouchers, doctype, tax_category_map, net_total_map=None): def get_doc_info(vouchers, doctype, tax_category_map, net_total_map=None, filters=None):
journal_entry_party_map = {}
party_type = filters.get("party_type") if filters else None
party = filters.get("party") if filters else None
common_fields = ["name"] common_fields = ["name"]
fields_dict = { fields_dict = {
"Purchase Invoice": [ "Purchase Invoice": [
@@ -431,37 +426,78 @@ def get_doc_info(vouchers, doctype, tax_category_map, net_total_map=None):
"base_total", "base_total",
"bill_no", "bill_no",
"bill_date", "bill_date",
"supplier",
], ],
"Sales Invoice": ["base_net_total", "grand_total", "base_total"], "Sales Invoice": ["base_net_total", "grand_total", "base_total", "customer"],
"Payment Entry": [ "Payment Entry": [
"tax_withholding_category", "tax_withholding_category",
"paid_amount", "paid_amount",
"paid_amount_after_tax", "paid_amount_after_tax",
"base_paid_amount", "base_paid_amount",
"party",
"party_type",
], ],
"Journal Entry": ["tax_withholding_category", "total_debit"], "Journal Entry": ["tax_withholding_category", "total_debit"],
} }
party_field = {
"Purchase Invoice": "supplier",
"Sales Invoice": "customer",
"Payment Entry": "party",
}
entries = frappe.get_all( doc_filters = {"name": ("in", vouchers)}
doctype, filters={"name": ("in", vouchers)}, fields=common_fields + fields_dict[doctype]
) if party and party_field.get(doctype):
doc_filters[party_field[doctype]] = party
if doctype == "Payment Entry":
doc_filters["party_type"] = party_type
entries = frappe.get_all(doctype, filters=doc_filters, fields=common_fields + fields_dict[doctype])
if doctype == "Journal Entry":
journal_entry_party_map = get_journal_entry_party_map(vouchers, party_type=party_type)
for entry in entries: for entry in entries:
tax_category_map[(doctype, entry.name)] = entry.tax_withholding_category tax_category_map[(doctype, entry.name)] = entry.tax_withholding_category
value = frappe._dict(
party=None,
party_type=party_type,
base_tax_withholding_net_total=0,
grand_total=0,
base_total=0,
bill_no="",
bill_date="",
)
if doctype == "Purchase Invoice": if doctype == "Purchase Invoice":
value = [ value.party = entry.supplier
entry.base_tax_withholding_net_total, value.party_type = "Supplier"
entry.grand_total, value.base_tax_withholding_net_total = entry.base_tax_withholding_net_total
entry.base_total, value.grand_total = entry.grand_total
entry.bill_no, value.base_total = entry.base_total
entry.bill_date, value.bill_no = entry.bill_no
] value.bill_date = entry.bill_date
elif doctype == "Sales Invoice": elif doctype == "Sales Invoice":
value = [entry.base_net_total, entry.grand_total, entry.base_total] value.party = entry.customer
value.party_type = "Customer"
value.base_tax_withholding_net_total = entry.base_net_total
value.grand_total = entry.grand_total
value.base_total = entry.base_total
elif doctype == "Payment Entry": elif doctype == "Payment Entry":
value = [entry.paid_amount, entry.paid_amount_after_tax, entry.base_paid_amount] value.party = entry.party
value.party_type = entry.party_type
value.base_tax_withholding_net_total = entry.paid_amount
value.grand_total = entry.paid_amount_after_tax
value.base_total = entry.base_paid_amount
else: else:
value = [entry.total_debit] * 3 party_list = journal_entry_party_map.get(entry.name)
value.party = party_list[0] if party_list else None
value.party_type = party_type
value.base_tax_withholding_net_total = entry.total_debit
value.grand_total = entry.total_debit
value.base_total = entry.total_debit
net_total_map[(doctype, entry.name)] = value net_total_map[(doctype, entry.name)] = value