diff --git a/erpnext/assets/doctype/asset_repair/asset_repair.js b/erpnext/assets/doctype/asset_repair/asset_repair.js index 5dc32d363d4..05a1aaa4e43 100644 --- a/erpnext/assets/doctype/asset_repair/asset_repair.js +++ b/erpnext/assets/doctype/asset_repair/asset_repair.js @@ -124,25 +124,46 @@ frappe.ui.form.on("Asset Repair", { frm.refresh_field("stock_items"); } }, +}); - purchase_invoice: function (frm) { - if (frm.doc.purchase_invoice) { - frappe.call({ - method: "frappe.client.get_value", - args: { - doctype: "Purchase Invoice", - fieldname: "base_net_total", - filters: { name: frm.doc.purchase_invoice }, - }, - callback: function (r) { - if (r.message) { - frm.set_value("repair_cost", r.message.base_net_total); - } - }, - }); - } else { - frm.set_value("repair_cost", 0); +frappe.ui.form.on("Asset Repair Purchase Invoice", { + purchase_invoice: function (frm, cdt, cdn) { + frappe.model.set_value(cdt, cdn, { + expense_account: "", + repair_cost: 0, + }); + }, + + expense_account: function (frm, cdt, cdn) { + var row = locals[cdt][cdn]; + + if (!row.purchase_invoice || !row.expense_account) { + frappe.model.set_value(cdt, cdn, "repair_cost", 0); + return; } + + frappe.call({ + method: "erpnext.assets.doctype.asset_repair.asset_repair.get_unallocated_repair_cost", + args: { + purchase_invoice: row.purchase_invoice, + expense_account: row.expense_account, + }, + callback: function (r) { + if (r.message !== undefined) { + frappe.model.set_value(cdt, cdn, "repair_cost", r.message); + + if (r.message <= 0) { + frappe.msgprint({ + title: __("No Available Amount"), + message: __( + "There is no available repair cost for this Purchase Invoice and Expense Account combination." + ), + indicator: "orange", + }); + } + } + }, + }); }, }); diff --git a/erpnext/assets/doctype/asset_repair/asset_repair.py b/erpnext/assets/doctype/asset_repair/asset_repair.py index dc74da28db4..401bff6beea 100644 --- a/erpnext/assets/doctype/asset_repair/asset_repair.py +++ b/erpnext/assets/doctype/asset_repair/asset_repair.py @@ -4,6 +4,7 @@ import frappe from frappe import _ from frappe.query_builder import DocType +from frappe.query_builder.functions import Sum from frappe.utils import cint, flt, get_link_to_form, getdate, time_diff_in_hours import erpnext @@ -83,10 +84,8 @@ class AssetRepair(AccountsController): def validate_purchase_invoices(self): for d in self.invoices: self.validate_purchase_invoice_status(d.purchase_invoice) - invoice_items = self.get_invoice_items(d.purchase_invoice) - self.validate_service_purchase_invoice(d.purchase_invoice, invoice_items) - self.validate_expense_account(d, invoice_items) - self.validate_purchase_invoice_repair_cost(d, invoice_items) + self.validate_expense_account(d) + self.validate_purchase_invoice_repair_cost(d) def validate_purchase_invoice_status(self, purchase_invoice): docstatus = frappe.db.get_value("Purchase Invoice", purchase_invoice, "docstatus") @@ -97,44 +96,37 @@ class AssetRepair(AccountsController): ) ) - def get_invoice_items(self, pi): - invoice_items = frappe.get_all( - "Purchase Invoice Item", - filters={"parent": pi}, - fields=["item_code", "expense_account", "base_net_amount"], + def validate_expense_account(self, row): + """Validate that the expense account exists in the purchase invoice for non-stock items.""" + valid_accounts = _get_expense_accounts_for_purchase_invoice(row.purchase_invoice) + if row.expense_account not in valid_accounts: + frappe.throw( + _( + "Row {0}: Expense account {1} is not valid for Purchase Invoice {2}. " + "Only expense accounts from non-stock items are allowed." + ).format( + row.idx, + frappe.bold(row.expense_account), + get_link_to_form("Purchase Invoice", row.purchase_invoice), + ) + ) + + def validate_purchase_invoice_repair_cost(self, row): + """Validate that repair cost doesn't exceed available amount.""" + available_amount = get_unallocated_repair_cost( + row.purchase_invoice, row.expense_account, exclude_asset_repair=self.name ) - return invoice_items - - def validate_service_purchase_invoice(self, purchase_invoice, invoice_items): - service_item_exists = False - for item in invoice_items: - if frappe.db.get_value("Item", item.item_code, "is_stock_item") == 0: - service_item_exists = True - break - - if not service_item_exists: + if flt(row.repair_cost) > available_amount: frappe.throw( - _("Service item not present in Purchase Invoice {0}").format( - get_link_to_form("Purchase Invoice", purchase_invoice) - ) - ) - - def validate_expense_account(self, row, invoice_items): - pi_expense_accounts = set([item.expense_account for item in invoice_items]) - if row.expense_account not in pi_expense_accounts: - frappe.throw( - _("Expense account {0} not present in Purchase Invoice {1}").format( - row.expense_account, get_link_to_form("Purchase Invoice", row.purchase_invoice) - ) - ) - - def validate_purchase_invoice_repair_cost(self, row, invoice_items): - pi_net_total = sum([flt(item.base_net_amount) for item in invoice_items]) - if flt(row.repair_cost) > pi_net_total: - frappe.throw( - _("Repair cost cannot be greater than purchase invoice base net total {0}").format( - pi_net_total + _( + "Row {0}: Repair cost {1} exceeds available amount {2} for Purchase Invoice {3} and Account {4}" + ).format( + row.idx, + frappe.bold(frappe.format_value(row.repair_cost, {"fieldtype": "Currency"})), + frappe.bold(frappe.format_value(available_amount, {"fieldtype": "Currency"})), + get_link_to_form("Purchase Invoice", row.purchase_invoice), + frappe.bold(row.expense_account), ) ) @@ -411,33 +403,158 @@ def get_downtime(failure_date, completion_date): @frappe.whitelist() +@frappe.validate_and_sanitize_search_inputs def get_purchase_invoice(doctype, txt, searchfield, start, page_len, filters): - PurchaseInvoice = DocType("Purchase Invoice") - PurchaseInvoiceItem = DocType("Purchase Invoice Item") - Item = DocType("Item") + """ + Get Purchase Invoices that have expense accounts for non-stock items. + Only returns invoices with at least one non-stock, non-fixed-asset item with an expense account. + """ + pi = DocType("Purchase Invoice") + pi_item = DocType("Purchase Invoice Item") + item = DocType("Item") - return ( - frappe.qb.from_(PurchaseInvoice) - .join(PurchaseInvoiceItem) - .on(PurchaseInvoiceItem.parent == PurchaseInvoice.name) - .join(Item) - .on(Item.name == PurchaseInvoiceItem.item_code) - .select(PurchaseInvoice.name) + query = ( + frappe.qb.from_(pi) + .join(pi_item) + .on(pi_item.parent == pi.name) + .left_join(item) + .on(item.name == pi_item.item_code) + .select(pi.name) + .distinct() .where( - (Item.is_stock_item == 0) - & (Item.is_fixed_asset == 0) - & (PurchaseInvoice.company == filters.get("company")) - & (PurchaseInvoice.docstatus == 1) + (pi.company == filters.get("company")) + & (pi.docstatus == 1) + & (pi_item.is_fixed_asset == 0) + & (pi_item.expense_account.isnotnull()) + & (pi_item.expense_account != "") + & ((pi_item.item_code.isnull()) | (item.is_stock_item == 0)) ) - ).run(as_list=1) + ) + + if txt: + query = query.where(pi.name.like(f"%{txt}%")) + + return query.run(as_list=1) @frappe.whitelist() +@frappe.validate_and_sanitize_search_inputs def get_expense_accounts(doctype, txt, searchfield, start, page_len, filters): - PurchaseInvoiceItem = DocType("Purchase Invoice Item") - return ( - frappe.qb.from_(PurchaseInvoiceItem) - .select(PurchaseInvoiceItem.expense_account) - .distinct() - .where(PurchaseInvoiceItem.parent == filters.get("purchase_invoice")) - ).run(as_list=1) + """ + Get expense accounts for non-stock (service) items from the purchase invoice. + Used as a query function for link fields. + """ + purchase_invoice = filters.get("purchase_invoice") + if not purchase_invoice: + return [] + + expense_accounts = _get_expense_accounts_for_purchase_invoice(purchase_invoice) + + # Filter by search text if provided + if txt: + txt = txt.lower() + expense_accounts = [acc for acc in expense_accounts if txt in acc.lower()] + + return [[account] for account in expense_accounts] + + +def _get_expense_accounts_for_purchase_invoice(purchase_invoice: str) -> list[str]: + """ + Internal function to get expense accounts for non-stock items from the purchase invoice. + + Args: + purchase_invoice: The Purchase Invoice name + + Returns: + List of expense account names + """ + pi_items = frappe.db.get_all( + "Purchase Invoice Item", + filters={"parent": purchase_invoice}, + fields=["item_code", "expense_account", "is_fixed_asset"], + ) + + if not pi_items: + return [] + + # Get list of stock item codes from the invoice + item_codes = {item.item_code for item in pi_items if item.item_code} + stock_items = set() + if item_codes: + stock_items = set( + frappe.db.get_all( + "Item", filters={"name": ["in", list(item_codes)], "is_stock_item": 1}, pluck="name" + ) + ) + + expense_accounts = set() + + for item in pi_items: + # Skip stock items - they use warehouse accounts + if item.item_code and item.item_code in stock_items: + continue + + # Skip fixed assets - they use asset accounts + if item.is_fixed_asset: + continue + + # Use expense account from Purchase Invoice Item + if item.expense_account: + expense_accounts.add(item.expense_account) + + return list(expense_accounts) + + +@frappe.whitelist() +def get_unallocated_repair_cost( + purchase_invoice: str, expense_account: str, exclude_asset_repair: str | None = None +) -> float: + """ + Calculate the unused repair cost for a purchase invoice and expense account. + """ + used_amount = get_allocated_repair_cost(purchase_invoice, expense_account, exclude_asset_repair) + total_amount = get_total_expense_amount(purchase_invoice, expense_account) + + return flt(total_amount - used_amount) + + +def get_allocated_repair_cost( + purchase_invoice: str, expense_account: str, exclude_asset_repair: str | None = None +) -> float: + """Get the total repair cost already allocated from submitted Asset Repairs.""" + asset_repair_pi = DocType("Asset Repair Purchase Invoice") + + query = ( + frappe.qb.from_(asset_repair_pi) + .select(Sum(asset_repair_pi.repair_cost).as_("total")) + .where( + (asset_repair_pi.purchase_invoice == purchase_invoice) + & (asset_repair_pi.expense_account == expense_account) + & (asset_repair_pi.docstatus == 1) + ) + ) + + if exclude_asset_repair: + query = query.where(asset_repair_pi.parent != exclude_asset_repair) + + result = query.run(as_dict=True) + + return flt(result[0].total) if result else 0.0 + + +def get_total_expense_amount(purchase_invoice: str, expense_account: str) -> float: + """Get the total expense amount from GL entries for a purchase invoice and account.""" + gl_entry = DocType("GL Entry") + + result = ( + frappe.qb.from_(gl_entry) + .select((Sum(gl_entry.debit) - Sum(gl_entry.credit)).as_("total")) + .where( + (gl_entry.voucher_type == "Purchase Invoice") + & (gl_entry.voucher_no == purchase_invoice) + & (gl_entry.account == expense_account) + & (gl_entry.is_cancelled == 0) + ) + ).run(as_dict=True) + + return flt(result[0].total) if result else 0.0 diff --git a/erpnext/assets/doctype/asset_repair/test_asset_repair.py b/erpnext/assets/doctype/asset_repair/test_asset_repair.py index 0e4ea84df86..b66dafc2a0f 100644 --- a/erpnext/assets/doctype/asset_repair/test_asset_repair.py +++ b/erpnext/assets/doctype/asset_repair/test_asset_repair.py @@ -173,6 +173,39 @@ class TestAssetRepair(IntegrationTestCase): ) self.assertTrue(asset_repair.invoices) + def test_repair_cost_exceeds_available_amount(self): + """Test that repair cost cannot exceed available amount from Purchase Invoice.""" + asset_repair1 = create_asset_repair( + capitalize_repair_cost=1, + item="_Test Non Stock Item", + submit=1, + ) + + pi_name = asset_repair1.invoices[0].purchase_invoice + expense_account = asset_repair1.invoices[0].expense_account + + asset_repair2 = frappe.new_doc("Asset Repair") + asset_repair2.update( + { + "asset": asset_repair1.asset, + "asset_name": asset_repair1.asset_name, + "failure_date": nowdate(), + "description": "Second Repair", + "company": asset_repair1.company, + "capitalize_repair_cost": 1, + } + ) + asset_repair2.append( + "invoices", + { + "purchase_invoice": pi_name, + "expense_account": expense_account, + "repair_cost": 10, # PI already fully used, so this should fail + }, + ) + + self.assertRaises(frappe.ValidationError, asset_repair2.save) + def test_gl_entries_with_perpetual_inventory(self): set_depreciation_settings_in_company(company="_Test Company with perpetual inventory")