refactor(Payment Entry): reduce indentation (#46864)

This commit is contained in:
Raffael Meyer
2025-04-05 16:41:23 +02:00
committed by GitHub
parent 7d12e9afd4
commit e25517a3ce

View File

@@ -339,16 +339,18 @@ class PaymentEntry(AccountsController):
reference_names.add(key)
def set_bank_account_data(self):
if self.bank_account:
bank_data = get_bank_account_details(self.bank_account)
if not self.bank_account:
return
field = "paid_from" if self.payment_type == "Pay" else "paid_to"
bank_data = get_bank_account_details(self.bank_account)
self.bank = bank_data.bank
self.bank_account_no = bank_data.bank_account_no
field = "paid_from" if self.payment_type == "Pay" else "paid_to"
if not self.get(field):
self.set(field, bank_data.account)
self.bank = bank_data.bank
self.bank_account_no = bank_data.bank_account_no
if not self.get(field):
self.set(field, bank_data.account)
def validate_payment_type_with_outstanding(self):
total_outstanding = sum(d.allocated_amount for d in self.get("references"))
@@ -366,15 +368,16 @@ class PaymentEntry(AccountsController):
if self.party_type in ("Customer", "Supplier"):
self.validate_allocated_amount_with_latest_data()
else:
fail_message = _("Row #{0}: Allocated Amount cannot be greater than outstanding amount.")
for d in self.get("references"):
if (flt(d.allocated_amount)) > 0 and flt(d.allocated_amount) > flt(d.outstanding_amount):
frappe.throw(fail_message.format(d.idx))
return
# Check for negative outstanding invoices as well
if flt(d.allocated_amount) < 0 and flt(d.allocated_amount) < flt(d.outstanding_amount):
frappe.throw(fail_message.format(d.idx))
fail_message = _("Row #{0}: Allocated Amount cannot be greater than outstanding amount.")
for d in self.get("references"):
if (flt(d.allocated_amount)) > 0 and flt(d.allocated_amount) > flt(d.outstanding_amount):
frappe.throw(fail_message.format(d.idx))
# Check for negative outstanding invoices as well
if flt(d.allocated_amount) < 0 and flt(d.allocated_amount) < flt(d.outstanding_amount):
frappe.throw(fail_message.format(d.idx))
def validate_allocated_amount_as_per_payment_request(self):
"""
@@ -412,91 +415,89 @@ class PaymentEntry(AccountsController):
return False
def validate_allocated_amount_with_latest_data(self):
if self.references:
uniq_vouchers = set([(x.reference_doctype, x.reference_name) for x in self.references])
vouchers = [frappe._dict({"voucher_type": x[0], "voucher_no": x[1]}) for x in uniq_vouchers]
latest_references = get_outstanding_reference_documents(
{
"posting_date": self.posting_date,
"company": self.company,
"party_type": self.party_type,
"payment_type": self.payment_type,
"party": self.party,
"party_account": self.paid_from if self.payment_type == "Receive" else self.paid_to,
"get_outstanding_invoices": True,
"get_orders_to_be_billed": True,
"vouchers": vouchers,
"book_advance_payments_in_separate_party_account": self.book_advance_payments_in_separate_party_account,
},
validate=True,
)
if not self.references:
return
# Group latest_references by (voucher_type, voucher_no)
latest_lookup = {}
for d in latest_references:
d = frappe._dict(d)
latest_lookup.setdefault((d.voucher_type, d.voucher_no), frappe._dict())[d.payment_term] = d
uniq_vouchers = {(x.reference_doctype, x.reference_name) for x in self.references}
vouchers = [frappe._dict({"voucher_type": x[0], "voucher_no": x[1]}) for x in uniq_vouchers]
latest_references = get_outstanding_reference_documents(
{
"posting_date": self.posting_date,
"company": self.company,
"party_type": self.party_type,
"payment_type": self.payment_type,
"party": self.party,
"party_account": self.paid_from if self.payment_type == "Receive" else self.paid_to,
"get_outstanding_invoices": True,
"get_orders_to_be_billed": True,
"vouchers": vouchers,
"book_advance_payments_in_separate_party_account": self.book_advance_payments_in_separate_party_account,
},
validate=True,
)
for idx, d in enumerate(self.get("references"), start=1):
latest = latest_lookup.get((d.reference_doctype, d.reference_name)) or frappe._dict()
# Group latest_references by (voucher_type, voucher_no)
latest_lookup = {}
for d in latest_references:
d = frappe._dict(d)
latest_lookup.setdefault((d.voucher_type, d.voucher_no), frappe._dict())[d.payment_term] = d
# If term based allocation is enabled, throw
if (
d.payment_term is None or d.payment_term == ""
) and self.term_based_allocation_enabled_for_reference(d.reference_doctype, d.reference_name):
frappe.throw(
_(
"{0} has Payment Term based allocation enabled. Select a Payment Term for Row #{1} in Payment References section"
).format(frappe.bold(d.reference_name), frappe.bold(idx))
)
for idx, d in enumerate(self.get("references"), start=1):
latest = latest_lookup.get((d.reference_doctype, d.reference_name)) or frappe._dict()
# if no payment template is used by invoice and has a custom term(no `payment_term`), then invoice outstanding will be in 'None' key
latest = latest.get(d.payment_term) or latest.get(None)
# The reference has already been fully paid
if not latest:
frappe.throw(
_("{0} {1} has already been fully paid.").format(
_(d.reference_doctype), d.reference_name
)
)
# The reference has already been partly paid
elif (
latest.outstanding_amount < latest.invoice_amount
and flt(d.outstanding_amount, d.precision("outstanding_amount"))
!= flt(latest.outstanding_amount, d.precision("outstanding_amount"))
and d.payment_term == ""
):
frappe.throw(
_(
"{0} {1} has already been partly paid. Please use the 'Get Outstanding Invoice' or the 'Get Outstanding Orders' button to get the latest outstanding amounts."
).format(_(d.reference_doctype), d.reference_name)
)
# If term based allocation is enabled, throw
if (
d.payment_term is None or d.payment_term == ""
) and self.term_based_allocation_enabled_for_reference(d.reference_doctype, d.reference_name):
frappe.throw(
_(
"{0} has Payment Term based allocation enabled. Select a Payment Term for Row #{1} in Payment References section"
).format(frappe.bold(d.reference_name), frappe.bold(idx))
)
fail_message = _("Row #{0}: Allocated Amount cannot be greater than outstanding amount.")
# if no payment template is used by invoice and has a custom term(no `payment_term`), then invoice outstanding will be in 'None' key
latest = latest.get(d.payment_term) or latest.get(None)
# The reference has already been fully paid
if not latest:
frappe.throw(
_("{0} {1} has already been fully paid.").format(_(d.reference_doctype), d.reference_name)
)
# The reference has already been partly paid
elif (
latest.outstanding_amount < latest.invoice_amount
and flt(d.outstanding_amount, d.precision("outstanding_amount"))
!= flt(latest.outstanding_amount, d.precision("outstanding_amount"))
and d.payment_term == ""
):
frappe.throw(
_(
"{0} {1} has already been partly paid. Please use the 'Get Outstanding Invoice' or the 'Get Outstanding Orders' button to get the latest outstanding amounts."
).format(_(d.reference_doctype), d.reference_name)
)
if (
d.payment_term
and (
(flt(d.allocated_amount)) > 0
and latest.payment_term_outstanding
and (flt(d.allocated_amount) > flt(latest.payment_term_outstanding))
)
and self.term_based_allocation_enabled_for_reference(
d.reference_doctype, d.reference_name
)
):
frappe.throw(
_(
"Row #{0}: Allocated amount:{1} is greater than outstanding amount:{2} for Payment Term {3}"
).format(d.idx, d.allocated_amount, latest.payment_term_outstanding, d.payment_term)
)
fail_message = _("Row #{0}: Allocated Amount cannot be greater than outstanding amount.")
if (flt(d.allocated_amount)) > 0 and flt(d.allocated_amount) > flt(latest.outstanding_amount):
frappe.throw(fail_message.format(d.idx))
if (
d.payment_term
and (
(flt(d.allocated_amount)) > 0
and latest.payment_term_outstanding
and (flt(d.allocated_amount) > flt(latest.payment_term_outstanding))
)
and self.term_based_allocation_enabled_for_reference(d.reference_doctype, d.reference_name)
):
frappe.throw(
_(
"Row #{0}: Allocated amount:{1} is greater than outstanding amount:{2} for Payment Term {3}"
).format(d.idx, d.allocated_amount, latest.payment_term_outstanding, d.payment_term)
)
# Check for negative outstanding invoices as well
if flt(d.allocated_amount) < 0 and flt(d.allocated_amount) < flt(latest.outstanding_amount):
frappe.throw(fail_message.format(d.idx))
if (flt(d.allocated_amount)) > 0 and flt(d.allocated_amount) > flt(latest.outstanding_amount):
frappe.throw(fail_message.format(d.idx))
# Check for negative outstanding invoices as well
if flt(d.allocated_amount) < 0 and flt(d.allocated_amount) < flt(latest.outstanding_amount):
frappe.throw(fail_message.format(d.idx))
def delink_advance_entry_references(self):
for reference in self.references:
@@ -562,51 +563,52 @@ class PaymentEntry(AccountsController):
reference_exchange_details: dict | None = None,
) -> None:
for d in self.get("references"):
if d.allocated_amount:
if (
update_ref_details_only_for
and (d.reference_doctype, d.reference_name) not in update_ref_details_only_for
):
if not d.allocated_amount:
continue
if (
update_ref_details_only_for
and (d.reference_doctype, d.reference_name) not in update_ref_details_only_for
):
continue
ref_details = get_reference_details(
d.reference_doctype,
d.reference_name,
self.party_account_currency,
self.party_type,
self.party,
)
# Only update exchange rate when the reference is Journal Entry
if (
reference_exchange_details
and d.reference_doctype == reference_exchange_details.reference_doctype
and d.reference_name == reference_exchange_details.reference_name
):
ref_details.update({"exchange_rate": reference_exchange_details.exchange_rate})
for field, value in ref_details.items():
if d.exchange_gain_loss:
# for cases where gain/loss is booked into invoice
# exchange_gain_loss is calculated from invoice & populated
# and row.exchange_rate is already set to payment entry's exchange rate
# refer -> `update_reference_in_payment_entry()` in utils.py
continue
ref_details = get_reference_details(
d.reference_doctype,
d.reference_name,
self.party_account_currency,
self.party_type,
self.party,
)
# Only update exchange rate when the reference is Journal Entry
if (
reference_exchange_details
and d.reference_doctype == reference_exchange_details.reference_doctype
and d.reference_name == reference_exchange_details.reference_name
):
ref_details.update({"exchange_rate": reference_exchange_details.exchange_rate})
for field, value in ref_details.items():
if d.exchange_gain_loss:
# for cases where gain/loss is booked into invoice
# exchange_gain_loss is calculated from invoice & populated
# and row.exchange_rate is already set to payment entry's exchange rate
# refer -> `update_reference_in_payment_entry()` in utils.py
continue
if field == "exchange_rate" or not d.get(field) or force:
if self.get("_action") in ("submit", "cancel"):
d.db_set(field, value)
else:
d.set(field, value)
if field == "exchange_rate" or not d.get(field) or force:
if self.get("_action") in ("submit", "cancel"):
d.db_set(field, value)
else:
d.set(field, value)
def validate_payment_type(self):
if self.payment_type not in ("Receive", "Pay", "Internal Transfer"):
frappe.throw(_("Payment Type must be one of Receive, Pay and Internal Transfer"))
def validate_party_details(self):
if self.party:
if not frappe.db.exists(self.party_type, self.party):
frappe.throw(_("{0} {1} does not exist").format(_(self.party_type), self.party))
if self.party and not frappe.db.exists(self.party_type, self.party):
frappe.throw(_("{0} {1} does not exist").format(_(self.party_type), self.party))
def set_exchange_rate(self, ref_doc=None):
self.set_source_exchange_rate(ref_doc)
@@ -616,12 +618,8 @@ class PaymentEntry(AccountsController):
if self.paid_from:
if self.paid_from_account_currency == self.company_currency:
self.source_exchange_rate = 1
else:
if ref_doc:
if self.paid_from_account_currency == ref_doc.currency:
self.source_exchange_rate = ref_doc.get("exchange_rate") or ref_doc.get(
"conversion_rate"
)
elif ref_doc and self.paid_from_account_currency == ref_doc.currency:
self.source_exchange_rate = ref_doc.get("exchange_rate") or ref_doc.get("conversion_rate")
if not self.source_exchange_rate:
self.source_exchange_rate = get_exchange_rate(
@@ -632,9 +630,8 @@ class PaymentEntry(AccountsController):
if self.paid_from_account_currency == self.paid_to_account_currency:
self.target_exchange_rate = self.source_exchange_rate
elif self.paid_to and not self.target_exchange_rate:
if ref_doc:
if self.paid_to_account_currency == ref_doc.currency:
self.target_exchange_rate = ref_doc.get("exchange_rate") or ref_doc.get("conversion_rate")
if ref_doc and self.paid_to_account_currency == ref_doc.currency:
self.target_exchange_rate = ref_doc.get("exchange_rate") or ref_doc.get("conversion_rate")
if not self.target_exchange_rate:
self.target_exchange_rate = get_exchange_rate(
@@ -665,63 +662,61 @@ class PaymentEntry(AccountsController):
elif d.reference_name:
if not frappe.db.exists(d.reference_doctype, d.reference_name):
frappe.throw(_("{0} {1} does not exist").format(d.reference_doctype, d.reference_name))
else:
ref_doc = frappe.get_doc(d.reference_doctype, d.reference_name)
if d.reference_doctype != "Journal Entry":
if self.party != ref_doc.get(scrub(self.party_type)):
frappe.throw(
_("{0} {1} is not associated with {2} {3}").format(
_(d.reference_doctype), d.reference_name, _(self.party_type), self.party
)
)
else:
self.validate_journal_entry()
ref_doc = frappe.get_doc(d.reference_doctype, d.reference_name)
if d.reference_doctype in frappe.get_hooks("invoice_doctypes"):
if self.party_type == "Customer":
ref_party_account = (
get_party_account_based_on_invoice_discounting(d.reference_name)
or ref_doc.debit_to
)
elif self.party_type == "Supplier":
ref_party_account = ref_doc.credit_to
elif self.party_type == "Employee":
ref_party_account = ref_doc.payable_account
if (
ref_party_account != self.party_account
and not self.book_advance_payments_in_separate_party_account
):
frappe.throw(
_("{0} {1} is associated with {2}, but Party Account is {3}").format(
_(d.reference_doctype),
d.reference_name,
ref_party_account,
self.party_account,
)
)
if ref_doc.doctype == "Purchase Invoice" and ref_doc.get("on_hold"):
frappe.throw(
_("{0} {1} is on hold").format(_(d.reference_doctype), d.reference_name),
title=_("Invalid Purchase Invoice"),
)
if ref_doc.docstatus != 1:
if d.reference_doctype != "Journal Entry":
if self.party != ref_doc.get(scrub(self.party_type)):
frappe.throw(
_("{0} {1} must be submitted").format(_(d.reference_doctype), d.reference_name)
_("{0} {1} is not associated with {2} {3}").format(
_(d.reference_doctype), d.reference_name, _(self.party_type), self.party
)
)
else:
self.validate_journal_entry()
if d.reference_doctype in frappe.get_hooks("invoice_doctypes"):
if self.party_type == "Customer":
ref_party_account = (
get_party_account_based_on_invoice_discounting(d.reference_name)
or ref_doc.debit_to
)
elif self.party_type == "Supplier":
ref_party_account = ref_doc.credit_to
elif self.party_type == "Employee":
ref_party_account = ref_doc.payable_account
if (
ref_party_account != self.party_account
and not self.book_advance_payments_in_separate_party_account
):
frappe.throw(
_("{0} {1} is associated with {2}, but Party Account is {3}").format(
_(d.reference_doctype),
d.reference_name,
ref_party_account,
self.party_account,
)
)
if ref_doc.doctype == "Purchase Invoice" and ref_doc.get("on_hold"):
frappe.throw(
_("{0} {1} is on hold").format(_(d.reference_doctype), d.reference_name),
title=_("Invalid Purchase Invoice"),
)
if ref_doc.docstatus != 1:
frappe.throw(
_("{0} {1} must be submitted").format(_(d.reference_doctype), d.reference_name)
)
def get_valid_reference_doctypes(self):
if self.party_type == "Customer":
return ("Sales Order", "Sales Invoice", "Journal Entry", "Dunning", "Payment Entry")
elif self.party_type in ["Shareholder", "Employee"]:
return ("Journal Entry",)
elif self.party_type == "Supplier":
return ("Purchase Order", "Purchase Invoice", "Journal Entry", "Payment Entry")
elif self.party_type == "Shareholder":
return ("Journal Entry",)
elif self.party_type == "Employee":
return ("Journal Entry",)
def validate_paid_invoices(self):
no_oustanding_refs = {}
@@ -787,37 +782,39 @@ class PaymentEntry(AccountsController):
invoice_paid_amount_map = {}
for ref in self.get("references"):
if ref.payment_term and ref.reference_name:
key = (ref.payment_term, ref.reference_name, ref.reference_doctype)
invoice_payment_amount_map.setdefault(key, 0.0)
invoice_payment_amount_map[key] += ref.allocated_amount
if not ref.payment_term or not ref.reference_name:
continue
if not invoice_paid_amount_map.get(key):
payment_schedule = frappe.get_all(
"Payment Schedule",
filters={"parent": ref.reference_name},
fields=[
"paid_amount",
"payment_amount",
"payment_term",
"discount",
"outstanding",
"discount_type",
],
)
for term in payment_schedule:
invoice_key = (term.payment_term, ref.reference_name, ref.reference_doctype)
invoice_paid_amount_map.setdefault(invoice_key, {})
invoice_paid_amount_map[invoice_key]["outstanding"] = term.outstanding
if not (term.discount_type and term.discount):
continue
key = (ref.payment_term, ref.reference_name, ref.reference_doctype)
invoice_payment_amount_map.setdefault(key, 0.0)
invoice_payment_amount_map[key] += ref.allocated_amount
if term.discount_type == "Percentage":
invoice_paid_amount_map[invoice_key]["discounted_amt"] = ref.total_amount * (
term.discount / 100
)
else:
invoice_paid_amount_map[invoice_key]["discounted_amt"] = term.discount
if not invoice_paid_amount_map.get(key):
payment_schedule = frappe.get_all(
"Payment Schedule",
filters={"parent": ref.reference_name},
fields=[
"paid_amount",
"payment_amount",
"payment_term",
"discount",
"outstanding",
"discount_type",
],
)
for term in payment_schedule:
invoice_key = (term.payment_term, ref.reference_name, ref.reference_doctype)
invoice_paid_amount_map.setdefault(invoice_key, {})
invoice_paid_amount_map[invoice_key]["outstanding"] = term.outstanding
if not (term.discount_type and term.discount):
continue
if term.discount_type == "Percentage":
invoice_paid_amount_map[invoice_key]["discounted_amt"] = ref.total_amount * (
term.discount / 100
)
else:
invoice_paid_amount_map[invoice_key]["discounted_amt"] = term.discount
for idx, (key, allocated_amount) in enumerate(invoice_payment_amount_map.items(), 1):
if not invoice_paid_amount_map.get(key):
@@ -1064,14 +1061,14 @@ class PaymentEntry(AccountsController):
applicable_tax = 0
base_applicable_tax = 0
for tax in self.get("taxes"):
if not tax.included_in_paid_amount:
amount = -1 * tax.tax_amount if tax.add_deduct_tax == "Deduct" else tax.tax_amount
base_amount = (
-1 * tax.base_tax_amount if tax.add_deduct_tax == "Deduct" else tax.base_tax_amount
)
if tax.included_in_paid_amount:
continue
applicable_tax += amount
base_applicable_tax += base_amount
amount = -1 * tax.tax_amount if tax.add_deduct_tax == "Deduct" else tax.tax_amount
base_amount = -1 * tax.base_tax_amount if tax.add_deduct_tax == "Deduct" else tax.base_tax_amount
applicable_tax += amount
base_applicable_tax += base_amount
self.paid_amount_after_tax = flt(
flt(self.paid_amount) + flt(applicable_tax), self.precision("paid_amount_after_tax")
@@ -1742,25 +1739,27 @@ class PaymentEntry(AccountsController):
def add_deductions_gl_entries(self, gl_entries):
for d in self.get("deductions"):
if d.amount:
account_currency = get_account_currency(d.account)
if account_currency != self.company_currency:
frappe.throw(_("Currency for {0} must be {1}").format(d.account, self.company_currency))
if not d.amount:
continue
gl_entries.append(
self.get_gl_dict(
{
"account": d.account,
"account_currency": account_currency,
"against": self.party or self.paid_from,
"debit_in_account_currency": d.amount,
"debit_in_transaction_currency": d.amount / self.transaction_exchange_rate,
"debit": d.amount,
"cost_center": d.cost_center,
},
item=d,
)
account_currency = get_account_currency(d.account)
if account_currency != self.company_currency:
frappe.throw(_("Currency for {0} must be {1}").format(d.account, self.company_currency))
gl_entries.append(
self.get_gl_dict(
{
"account": d.account,
"account_currency": account_currency,
"against": self.party or self.paid_from,
"debit_in_account_currency": d.amount,
"debit_in_transaction_currency": d.amount / self.transaction_exchange_rate,
"debit": d.amount,
"cost_center": d.cost_center,
},
item=d,
)
)
def get_party_account_for_taxes(self):
if self.payment_type == "Receive":
@@ -1777,15 +1776,17 @@ class PaymentEntry(AccountsController):
return flt(gl_dict.get(field, 0) / (conversion_rate or 1))
def update_advance_paid(self):
if self.payment_type in ("Receive", "Pay") and self.party:
advance_payment_doctypes = frappe.get_hooks(
"advance_payment_receivable_doctypes"
) + frappe.get_hooks("advance_payment_payable_doctypes")
for d in self.get("references"):
if d.allocated_amount and d.reference_doctype in advance_payment_doctypes:
frappe.get_doc(
d.reference_doctype, d.reference_name, for_update=True
).set_total_advance_paid()
if self.payment_type not in ("Receive", "Pay") or not self.party:
return
advance_payment_doctypes = frappe.get_hooks("advance_payment_receivable_doctypes") + frappe.get_hooks(
"advance_payment_payable_doctypes"
)
for d in self.get("references"):
if d.allocated_amount and d.reference_doctype in advance_payment_doctypes:
frappe.get_doc(
d.reference_doctype, d.reference_name, for_update=True
).set_total_advance_paid()
def on_recurring(self, reference_doc, auto_repeat_doc):
self.reference_no = reference_doc.name