# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors # License: GNU General Public License v3. See license.txt from collections import defaultdict from json import loads from typing import TYPE_CHECKING, Optional import frappe import frappe.defaults from frappe import _, qb, throw from frappe.desk.reportview import build_match_conditions from frappe.model.meta import get_field_precision from frappe.model.naming import determine_consecutive_week_number from frappe.query_builder import AliasedQuery, Case, Criterion, Field, Table from frappe.query_builder.functions import Count, IfNull, Max, Round, Sum from frappe.query_builder.utils import DocType from frappe.utils import ( add_days, cint, create_batch, cstr, flt, formatdate, get_datetime, get_number_format_info, getdate, now, now_datetime, nowdate, ) from frappe.utils.caching import site_cache from pypika import Order from pypika.functions import Coalesce from pypika.terms import ExistsCriterion import erpnext # imported to enable erpnext.accounts.utils.get_account_currency from erpnext.accounts.doctype.account.account import get_account_currency from erpnext.accounts.doctype.accounting_dimension.accounting_dimension import get_dimensions from erpnext.stock import get_warehouse_account_map from erpnext.stock.utils import get_stock_value_on if TYPE_CHECKING: from erpnext.stock.doctype.repost_item_valuation.repost_item_valuation import RepostItemValuation class FiscalYearError(frappe.ValidationError): pass class PaymentEntryUnlinkError(frappe.ValidationError): pass GL_REPOSTING_CHUNK = 100 OUTSTANDING_DOCTYPES = frozenset(["Sales Invoice", "Purchase Invoice", "Fees"]) @frappe.whitelist() def get_fiscal_year( date=None, fiscal_year=None, label="Date", verbose=1, company=None, as_dict=False, boolean=None, raise_on_missing=True, truncate=False, ): if isinstance(raise_on_missing, str): raise_on_missing = loads(raise_on_missing) # backwards compat if isinstance(boolean, str): boolean = loads(boolean) if boolean is not None: raise_on_missing = not boolean fiscal_years = get_fiscal_years( date, fiscal_year, label, verbose, company, as_dict=as_dict, raise_on_missing=raise_on_missing ) if fiscal_years: fiscal_year = fiscal_years[0] if truncate: return ("-".join(y[-2:] for y in fiscal_year[0].split("-")), fiscal_year[1], fiscal_year[2]) return fiscal_year return False def get_fiscal_years( transaction_date=None, fiscal_year=None, label="Date", verbose=1, company=None, as_dict=False, boolean=None, raise_on_missing=True, ): if transaction_date: transaction_date = getdate(transaction_date) # backwards compat if boolean is not None: raise_on_missing = not boolean all_fiscal_years = _get_fiscal_years(company=company) # No restricting selectors if not transaction_date and not fiscal_year: return all_fiscal_years for fy in all_fiscal_years: if (fiscal_year and fy.name == fiscal_year) or ( transaction_date and getdate(fy.year_start_date) <= transaction_date and getdate(fy.year_end_date) >= transaction_date ): if as_dict: return (fy,) else: return ((fy.name, fy.year_start_date, fy.year_end_date),) # No match for restricting selectors if raise_on_missing: error_msg = _("""{0} {1} is not in any active Fiscal Year""").format( _(label), formatdate(transaction_date) ) if company: error_msg = _("""{0} for {1}""").format(error_msg, frappe.bold(company)) if verbose == 1: frappe.msgprint(error_msg) raise FiscalYearError(error_msg) return [] def _get_fiscal_years(company=None): fiscal_years = frappe.cache().hget("fiscal_years", company) or [] if not fiscal_years: # if year start date is 2012-04-01, year end date should be 2013-03-31 (hence subdate) FY = DocType("Fiscal Year") query = ( frappe.qb.from_(FY).select(FY.name, FY.year_start_date, FY.year_end_date).where(FY.disabled == 0) ) if company: FYC = DocType("Fiscal Year Company") query = query.where( ExistsCriterion(frappe.qb.from_(FYC).select(FYC.name).where(FYC.parent == FY.name)).negate() | ExistsCriterion( frappe.qb.from_(FYC) .select(FYC.company) .where(FYC.parent == FY.name) .where(FYC.company == company) ) ) query = query.orderby(FY.year_start_date, order=Order.desc) fiscal_years = query.run(as_dict=True) frappe.cache().hset("fiscal_years", company, fiscal_years) return fiscal_years @frappe.whitelist() def get_fiscal_year_filter_field(company=None): field = {"fieldtype": "Select", "options": [], "operator": "Between", "query_value": True} fiscal_years = get_fiscal_years(company=company) for fiscal_year in fiscal_years: field["options"].append( { "label": fiscal_year.name, "value": fiscal_year.name, "query_value": [ fiscal_year.year_start_date.strftime("%Y-%m-%d"), fiscal_year.year_end_date.strftime("%Y-%m-%d"), ], } ) return field def validate_fiscal_year(date, fiscal_year, company, label="Date", doc=None): years = [f[0] for f in get_fiscal_years(date, label=_(label), company=company)] if fiscal_year not in years: if doc: doc.fiscal_year = years[0] else: throw(_("{0} '{1}' not in Fiscal Year {2}").format(_(label), formatdate(date), fiscal_year)) @frappe.whitelist() def get_balance_on( account=None, date=None, party_type=None, party=None, company=None, in_account_currency=True, cost_center=None, ignore_account_permission=False, account_type=None, start_date=None, finance_book=None, include_default_fb_balances=False, ): if not account and frappe.form_dict.get("account"): account = frappe.form_dict.get("account") if not date and frappe.form_dict.get("date"): date = frappe.form_dict.get("date") if not party_type and frappe.form_dict.get("party_type"): party_type = frappe.form_dict.get("party_type") if not party and frappe.form_dict.get("party"): party = frappe.form_dict.get("party") if not cost_center and frappe.form_dict.get("cost_center"): cost_center = frappe.form_dict.get("cost_center") cond = ["is_cancelled=0"] if start_date: cond.append("posting_date >= %s" % frappe.db.escape(cstr(start_date))) if date: cond.append("posting_date <= %s" % frappe.db.escape(cstr(date))) else: # get balance of all entries that exist date = nowdate() if account: acc = frappe.get_doc("Account", account) try: get_fiscal_year(date, company=company, verbose=0)[1] except FiscalYearError: if getdate(date) > getdate(nowdate()): # if fiscal year not found and the date is greater than today # get fiscal year for today's date and its corresponding year start date get_fiscal_year(nowdate(), verbose=1)[1] else: # this indicates that it is a date older than any existing fiscal year. # hence, assuming balance as 0.0 return 0.0 if account: report_type = acc.report_type else: report_type = "" if cost_center and report_type == "Profit and Loss": cc = frappe.get_lazy_doc("Cost Center", cost_center) if cc.is_group: cond.append( f""" exists ( select 1 from `tabCost Center` cc where cc.name = gle.cost_center and cc.lft >= {cc.lft} and cc.rgt <= {cc.rgt} )""" ) else: cond.append(f"""gle.cost_center = {frappe.db.escape(cost_center)} """) if account: if not (frappe.flags.ignore_account_permission or ignore_account_permission): acc.check_permission("read") # different filter for group and ledger - improved performance if acc.is_group: cond.append( f"""exists ( select name from `tabAccount` ac where ac.name = gle.account and ac.lft >= {acc.lft} and ac.rgt <= {acc.rgt} )""" ) # If group and currency same as company, # always return balance based on debit and credit in company currency if acc.account_currency == frappe.get_cached_value("Company", acc.company, "default_currency"): in_account_currency = False else: cond.append(f"""gle.account = {frappe.db.escape(account)} """) if account_type: accounts = frappe.db.get_all( "Account", filters={"company": company, "account_type": account_type, "is_group": 0}, pluck="name", order_by="lft", ) cond.append( """ gle.account in (%s) """ % (", ".join([frappe.db.escape(account) for account in accounts])) ) if party_type and party: cond.append( f"""gle.party_type = {frappe.db.escape(party_type)} and gle.party = {frappe.db.escape(party)} """ ) default_finance_book = None if company: cond.append("""gle.company = %s """ % (frappe.db.escape(company))) default_finance_book = frappe.get_cached_value("Company", company, "default_finance_book") if finance_book: if default_finance_book and include_default_fb_balances: cond.append( f"""(gle.finance_book IN ( {frappe.db.escape(finance_book)}, {frappe.db.escape(default_finance_book)} ) OR gle.finance_book IS NULL )""" ) else: cond.append(f"(gle.finance_book = {frappe.db.escape(finance_book)} OR gle.finance_book IS NULL)") elif default_finance_book and include_default_fb_balances: # No finance book passed → fall back to default cond.append( f"""( gle.finance_book = {frappe.db.escape(default_finance_book)} OR gle.finance_book IS NULL )""" ) if account or (party_type and party) or account_type: precision = get_currency_precision() if in_account_currency: select_field = ( "sum(round(debit_in_account_currency, %s)) - sum(round(credit_in_account_currency, %s))" ) else: select_field = "sum(round(debit, %s)) - sum(round(credit, %s))" bal = frappe.db.sql( """ SELECT {} FROM `tabGL Entry` gle WHERE {}""".format(select_field, " and ".join(cond)), (precision, precision), )[0][0] # if bal is None, return 0 return flt(bal) def get_count_on(account, fieldname, date): cond = ["is_cancelled=0"] if date: cond.append("posting_date <= %s" % frappe.db.escape(cstr(date))) else: # get balance of all entries that exist date = nowdate() try: year_start_date = get_fiscal_year(date, verbose=0)[1] except FiscalYearError: if getdate(date) > getdate(nowdate()): # if fiscal year not found and the date is greater than today # get fiscal year for today's date and its corresponding year start date year_start_date = get_fiscal_year(nowdate(), verbose=1)[1] else: # this indicates that it is a date older than any existing fiscal year. # hence, assuming balance as 0.0 return 0.0 if account: acc = frappe.get_doc("Account", account) if not frappe.flags.ignore_account_permission: acc.check_permission("read") # for pl accounts, get balance within a fiscal year if acc.report_type == "Profit and Loss": cond.append("posting_date >= '%s' and voucher_type != 'Period Closing Voucher'" % year_start_date) # different filter for group and ledger - improved performance if acc.is_group: cond.append( f"""exists ( select name from `tabAccount` ac where ac.name = gle.account and ac.lft >= {acc.lft} and ac.rgt <= {acc.rgt} )""" ) else: cond.append(f"""gle.account = {frappe.db.escape(account)} """) entries = frappe.db.sql( """ SELECT name, posting_date, account, party_type, party,debit,credit, voucher_type, voucher_no, against_voucher_type, against_voucher FROM `tabGL Entry` gle WHERE {}""".format(" and ".join(cond)), as_dict=True, ) count = 0 for gle in entries: if fieldname not in ("invoiced_amount", "payables"): count += 1 else: dr_or_cr = "debit" if fieldname == "invoiced_amount" else "credit" cr_or_dr = "credit" if fieldname == "invoiced_amount" else "debit" select_fields = ( "ifnull(sum(credit-debit),0)" if fieldname == "invoiced_amount" else "ifnull(sum(debit-credit),0)" ) if ( (not gle.against_voucher) or (gle.against_voucher_type in ["Sales Order", "Purchase Order"]) or (gle.against_voucher == gle.voucher_no and gle.get(dr_or_cr) > 0) ): payment_amount = frappe.db.sql( f""" SELECT {select_fields} FROM `tabGL Entry` gle WHERE docstatus < 2 and posting_date <= %(date)s and against_voucher = %(voucher_no)s and party = %(party)s and name != %(name)s""", {"date": date, "voucher_no": gle.voucher_no, "party": gle.party, "name": gle.name}, )[0][0] outstanding_amount = flt(gle.get(dr_or_cr)) - flt(gle.get(cr_or_dr)) - payment_amount currency_precision = get_currency_precision() or 2 if abs(flt(outstanding_amount)) > 0.1 / 10**currency_precision: count += 1 return count @frappe.whitelist() def add_ac(args=None): from frappe.desk.treeview import make_tree_args if not args: args = frappe.local.form_dict args.doctype = "Account" args = make_tree_args(**args) ac = frappe.new_doc("Account") if args.get("ignore_permissions"): ac.flags.ignore_permissions = True args.pop("ignore_permissions") ac.update(args) if not ac.parent_account: ac.parent_account = args.get("parent") ac.old_parent = "" ac.freeze_account = "No" if cint(ac.get("is_root")): ac.parent_account = None ac.flags.ignore_mandatory = True ac.insert() return ac.name @frappe.whitelist() def add_cc(args=None): from frappe.desk.treeview import make_tree_args if not args: args = frappe.local.form_dict args.doctype = "Cost Center" args = make_tree_args(**args) if args.parent_cost_center == args.company: args.parent_cost_center = "{} - {}".format( args.parent_cost_center, frappe.get_cached_value("Company", args.company, "abbr") ) cc = frappe.new_doc("Cost Center") cc.update(args) if not cc.parent_cost_center: cc.parent_cost_center = args.get("parent") cc.old_parent = "" cc.insert() return cc.name def _build_dimensions_dict_for_exc_gain_loss( entry: dict | object = None, active_dimensions: list | None = None ): dimensions_dict = frappe._dict() if entry and active_dimensions: for dim in active_dimensions: if entry_dimension := entry.get(dim.fieldname): dimensions_dict[dim.fieldname] = entry_dimension return dimensions_dict def reconcile_against_document( args, skip_ref_details_update_for_pe=False, active_dimensions=None ): # nosemgrep """ Cancel PE or JV, Update against document, split if required and resubmit """ # To optimize making GL Entry for PE or JV with multiple references reconciled_entries = {} for row in args: if not reconciled_entries.get((row.voucher_type, row.voucher_no)): reconciled_entries[(row.voucher_type, row.voucher_no)] = [] reconciled_entries[(row.voucher_type, row.voucher_no)].append(row) for key, entries in reconciled_entries.items(): voucher_type, voucher_no = key doc = frappe.get_doc(voucher_type, voucher_no) frappe.flags.ignore_party_validation = True reposting_rows = [] for entry in entries: check_if_advance_entry_modified(entry) validate_allocated_amount(entry) dimensions_dict = _build_dimensions_dict_for_exc_gain_loss(entry, active_dimensions) if voucher_type == "Journal Entry": referenced_row = update_reference_in_journal_entry(entry, doc, do_not_save=False) # advance section in sales/purchase invoice and reconciliation tool,both pass on exchange gain/loss # amount and account in args # referenced_row is used to deduplicate gain/loss journal entry.update({"referenced_row": referenced_row.name}) doc.make_exchange_gain_loss_journal([entry], dimensions_dict) else: referenced_row = update_reference_in_payment_entry( entry, doc, do_not_save=True, skip_ref_details_update_for_pe=skip_ref_details_update_for_pe, dimensions_dict=dimensions_dict, ) if referenced_row.get("outstanding_amount") and entry.get("outstanding_amount") is None: referenced_row.outstanding_amount -= flt(entry.allocated_amount) reposting_rows.append(referenced_row) doc.save(ignore_permissions=True) if voucher_type == "Payment Entry" and doc.book_advance_payments_in_separate_party_account: for row in reposting_rows: doc.make_advance_gl_entries(entry=row) else: _delete_pl_entries(voucher_type, voucher_no) _delete_adv_pl_entries(voucher_type, voucher_no) gl_map = doc.build_gl_map() # Make sure there is no overallocation from erpnext.accounts.general_ledger import process_debit_credit_difference process_debit_credit_difference(gl_map) create_payment_ledger_entry(gl_map, update_outstanding="No", cancel=0, adv_adj=1) # Only update outstanding for newly linked vouchers for entry in entries: update_voucher_outstanding( entry.against_voucher_type, entry.against_voucher, entry.account, entry.party_type, entry.party, ) frappe.flags.ignore_party_validation = False def check_if_advance_entry_modified(args): """ check if there is already a voucher reference check if amount is same check if jv is submitted """ if not args.get("unreconciled_amount"): args.update({"unreconciled_amount": args.get("unadjusted_amount")}) ret = None if args.voucher_type == "Journal Entry": journal_entry = frappe.qb.DocType("Journal Entry") journal_acc = frappe.qb.DocType("Journal Entry Account") q = ( frappe.qb.from_(journal_entry) .inner_join(journal_acc) .on(journal_entry.name == journal_acc.parent) .select(journal_acc[args.get("dr_or_cr")]) .where( (journal_acc.account == args.get("account")) & (journal_acc.party_type == args.get("party_type")) & (journal_acc.party == args.get("party")) & ( (journal_acc.reference_type.isnull()) | (journal_acc.reference_type.isin(["", "Sales Order", "Purchase Order"])) ) & (journal_entry.name == args.get("voucher_no")) & (journal_acc.name == args.get("voucher_detail_no")) & (journal_entry.docstatus == 1) ) ) else: precision = frappe.get_precision("Payment Entry", "unallocated_amount") payment_entry = frappe.qb.DocType("Payment Entry") payment_ref = frappe.qb.DocType("Payment Entry Reference") q = ( frappe.qb.from_(payment_entry) .select(payment_entry.name) .where(payment_entry.name == args.get("voucher_no")) .where(payment_entry.docstatus == 1) .where(payment_entry.party_type == args.get("party_type")) .where(payment_entry.party == args.get("party")) ) if args.voucher_detail_no: q = ( q.inner_join(payment_ref) .on(payment_entry.name == payment_ref.parent) .where(payment_ref.name == args.get("voucher_detail_no")) .where( payment_ref.reference_doctype.isin( ("", "Sales Order", "Purchase Order", "Employee Advance") ) ) .where(payment_ref.allocated_amount == args.get("unreconciled_amount")) ) else: q = q.where( Round(payment_entry.unallocated_amount, precision) == Round(args.get("unreconciled_amount"), precision) ) ret = q.run(as_dict=True) if not ret: throw(_("""Payment Entry has been modified after you pulled it. Please pull it again.""")) def validate_allocated_amount(args): precision = args.get("precision") or frappe.db.get_single_value("System Settings", "currency_precision") if args.get("allocated_amount") < 0: throw(_("Allocated amount cannot be negative")) elif flt(args.get("allocated_amount"), precision) > flt(args.get("unadjusted_amount"), precision): throw(_("Allocated amount cannot be greater than unadjusted amount")) def update_reference_in_journal_entry(d, journal_entry, do_not_save=False): """ Updates against document, if partial amount splits into rows """ jv_detail = journal_entry.get("accounts", {"name": d["voucher_detail_no"]})[0] rev_dr_or_cr = ( "debit_in_account_currency" if d["dr_or_cr"] == "credit_in_account_currency" else "credit_in_account_currency" ) if jv_detail.get(rev_dr_or_cr): d["dr_or_cr"] = rev_dr_or_cr d["allocated_amount"] = d["allocated_amount"] * -1 d["unadjusted_amount"] = d["unadjusted_amount"] * -1 if flt(d["unadjusted_amount"]) - flt(d["allocated_amount"]) != 0: # adjust the unreconciled balance amount_in_account_currency = flt(d["unadjusted_amount"]) - flt(d["allocated_amount"]) amount_in_company_currency = amount_in_account_currency * flt(jv_detail.exchange_rate) jv_detail.set(d["dr_or_cr"], amount_in_account_currency) jv_detail.set( "debit" if d["dr_or_cr"] == "debit_in_account_currency" else "credit", amount_in_company_currency, ) else: journal_entry.remove(jv_detail) # new row with references new_row = journal_entry.append("accounts") # Copy field values into new row [ new_row.set(field, jv_detail.get(field)) for field in frappe.get_meta("Journal Entry Account").get_fieldnames_with_value() ] new_row.set(d["dr_or_cr"], d["allocated_amount"]) new_row.set( "debit" if d["dr_or_cr"] == "debit_in_account_currency" else "credit", d["allocated_amount"] * flt(jv_detail.exchange_rate), ) new_row.set( "credit_in_account_currency" if d["dr_or_cr"] == "debit_in_account_currency" else "debit_in_account_currency", 0, ) new_row.set("credit" if d["dr_or_cr"] == "debit_in_account_currency" else "debit", 0) new_row.set("reference_type", d["against_voucher_type"]) new_row.set("reference_name", d["against_voucher"]) new_row.against_account = cstr(jv_detail.against_account) new_row.is_advance = cstr(jv_detail.is_advance) new_row.docstatus = 1 if jv_detail.get("reference_type") in get_advance_payment_doctypes(): new_row.advance_voucher_type = jv_detail.get("reference_type") new_row.advance_voucher_no = jv_detail.get("reference_name") # will work as update after submit journal_entry.flags.ignore_validate_update_after_submit = True # Ledgers will be reposted by Reconciliation tool journal_entry.flags.ignore_reposting_on_reconciliation = True if not do_not_save: journal_entry.save(ignore_permissions=True) return new_row def update_reference_in_payment_entry( d, payment_entry, do_not_save=False, skip_ref_details_update_for_pe=False, dimensions_dict=None ): reference_details = { "reference_doctype": d.against_voucher_type, "reference_name": d.against_voucher, "total_amount": d.grand_total, "outstanding_amount": d.outstanding_amount, "allocated_amount": d.allocated_amount, "exchange_rate": d.exchange_rate if d.difference_amount is not None else payment_entry.get_exchange_rate(), "exchange_gain_loss": d.difference_amount, "account": d.account, "dimensions": d.dimensions, } advance_payment_doctypes = get_advance_payment_doctypes() # Update Reconciliation effect date in reference if payment_entry.book_advance_payments_in_separate_party_account: reconcile_on = get_reconciliation_effect_date( d.against_voucher_type, d.against_voucher, payment_entry.company, payment_entry.posting_date ) reference_details.update({"reconcile_effect_on": reconcile_on}) if d.voucher_detail_no: existing_row = payment_entry.get("references", {"name": d["voucher_detail_no"]})[0] if d.allocated_amount <= existing_row.allocated_amount: existing_row.allocated_amount -= d.allocated_amount new_row = payment_entry.append("references") new_row.docstatus = 1 for field in list(reference_details): new_row.set(field, reference_details[field]) if existing_row.reference_doctype in advance_payment_doctypes: new_row.advance_voucher_type = existing_row.reference_doctype new_row.advance_voucher_no = existing_row.reference_name row = new_row else: new_row = payment_entry.append("references") new_row.docstatus = 1 new_row.update(reference_details) row = new_row payment_entry.flags.ignore_validate_update_after_submit = True payment_entry.clear_unallocated_reference_document_rows() payment_entry.setup_party_account_field() payment_entry.set_missing_values() if not skip_ref_details_update_for_pe: reference_exchange_details = frappe._dict() if d.against_voucher_type == "Journal Entry" and d.exchange_rate: reference_exchange_details.update( { "reference_doctype": d.against_voucher_type, "reference_name": d.against_voucher, "exchange_rate": d.exchange_rate, } ) payment_entry.set_missing_ref_details( update_ref_details_only_for=[(d.against_voucher_type, d.against_voucher)], reference_exchange_details=reference_exchange_details, ) payment_entry.set_amounts() payment_entry.make_exchange_gain_loss_journal( frappe._dict({"difference_posting_date": d.difference_posting_date}), dimensions_dict ) # Ledgers will be reposted by Reconciliation tool payment_entry.flags.ignore_reposting_on_reconciliation = True if not do_not_save: payment_entry.save(ignore_permissions=True) return row def get_reconciliation_effect_date(against_voucher_type, against_voucher, company, posting_date): reconciliation_takes_effect_on = frappe.get_cached_value( "Company", company, "reconciliation_takes_effect_on" ) # default reconcile_on = posting_date if reconciliation_takes_effect_on == "Advance Payment Date": reconcile_on = posting_date elif reconciliation_takes_effect_on == "Oldest Of Invoice Or Advance": date_field = "posting_date" if against_voucher_type in ["Sales Order", "Purchase Order"]: date_field = "transaction_date" reconcile_on = frappe.db.get_value(against_voucher_type, against_voucher, date_field) if getdate(reconcile_on) < getdate(posting_date): reconcile_on = posting_date elif reconciliation_takes_effect_on == "Reconciliation Date": reconcile_on = nowdate() return reconcile_on def cancel_exchange_gain_loss_journal( parent_doc: dict | object, referenced_dt: str | None = None, referenced_dn: str | None = None ) -> None: """ Cancel Exchange Gain/Loss for Sales/Purchase Invoice, if they have any. """ if parent_doc.doctype in ["Sales Invoice", "Purchase Invoice", "Payment Entry", "Journal Entry"]: gain_loss_journals = get_linked_exchange_gain_loss_journal( referenced_dt=parent_doc.doctype, referenced_dn=parent_doc.name, je_docstatus=1 ) for doc in gain_loss_journals: gain_loss_je = frappe.get_doc("Journal Entry", doc) if referenced_dt and referenced_dn: references = [(x.reference_type, x.reference_name) for x in gain_loss_je.accounts] if ( len(references) == 2 and (referenced_dt, referenced_dn) in references and (parent_doc.doctype, parent_doc.name) in references ): # only cancel JE generated against parent_doc and referenced_dn gain_loss_je.cancel() else: gain_loss_je.cancel() def delete_exchange_gain_loss_journal( parent_doc: dict | object, referenced_dt: str | None = None, referenced_dn: str | None = None ) -> None: """ Delete Exchange Gain/Loss for Sales/Purchase Invoice, if they have any. """ if parent_doc.doctype in ["Sales Invoice", "Purchase Invoice", "Payment Entry", "Journal Entry"]: gain_loss_journals = get_linked_exchange_gain_loss_journal( referenced_dt=parent_doc.doctype, referenced_dn=parent_doc.name, je_docstatus=2 ) for doc in gain_loss_journals: gain_loss_je = frappe.get_doc("Journal Entry", doc) if referenced_dt and referenced_dn: references = [(x.reference_type, x.reference_name) for x in gain_loss_je.accounts] if ( len(references) == 2 and (referenced_dt, referenced_dn) in references and (parent_doc.doctype, parent_doc.name) in references ): # only delete JE generated against parent_doc and referenced_dn gain_loss_je.delete() else: gain_loss_je.delete() def get_linked_exchange_gain_loss_journal(referenced_dt: str, referenced_dn: str, je_docstatus: int) -> list: """ Get all the linked exchange gain/loss journal entries for a given document. """ gain_loss_journals = [] if journals := frappe.db.get_all( "Journal Entry Account", { "reference_type": referenced_dt, "reference_name": referenced_dn, "docstatus": je_docstatus, }, pluck="parent", ): gain_loss_journals = frappe.db.get_all( "Journal Entry", { "name": ["in", journals], "voucher_type": "Exchange Gain Or Loss", "is_system_generated": 1, "docstatus": je_docstatus, }, pluck="name", ) return gain_loss_journals def cancel_common_party_journal(self): if self.doctype not in ["Sales Invoice", "Purchase Invoice"]: return if not frappe.get_single_value("Accounts Settings", "enable_common_party_accounting"): return party_link = self.get_common_party_link() if not party_link: return journal_entry = frappe.db.get_value( "Journal Entry Account", filters={ "reference_type": self.doctype, "reference_name": self.name, "docstatus": 1, }, fieldname="parent", ) if not journal_entry: return common_party_journal = frappe.db.get_value( "Journal Entry", filters={ "name": journal_entry, "is_system_generated": True, "docstatus": 1, }, ) if not common_party_journal: return common_party_je = frappe.get_doc("Journal Entry", common_party_journal) common_party_je.cancel() def update_accounting_ledgers_after_reference_removal( ref_type: str | None = None, ref_no: str | None = None, payment_name: str | None = None ): # General Ledger gle = qb.DocType("GL Entry") gle_update_query = ( qb.update(gle) .set(gle.against_voucher_type, None) .set(gle.against_voucher, None) .set(gle.modified, now()) .set(gle.modified_by, frappe.session.user) .where((gle.against_voucher_type == ref_type) & (gle.against_voucher == ref_no)) ) if payment_name: gle_update_query = gle_update_query.where(gle.voucher_no == payment_name) gle_update_query.run() # Payment Ledger ple = qb.DocType("Payment Ledger Entry") ple_update_query = ( qb.update(ple) .set(ple.against_voucher_type, ple.voucher_type) .set(ple.against_voucher_no, ple.voucher_no) .set(ple.modified, now()) .set(ple.modified_by, frappe.session.user) .where( (ple.against_voucher_type == ref_type) & (ple.against_voucher_no == ref_no) & (ple.delinked == 0) ) ) if payment_name: ple_update_query = ple_update_query.where(ple.voucher_no == payment_name) ple_update_query.run() # Advance Payment adv = qb.DocType("Advance Payment Ledger Entry") adv_ple = ( qb.update(adv) .set(adv.delinked, 1) .set(adv.modified, now()) .set(adv.modified_by, frappe.session.user) .where(adv.delinked == 0) .where( ((adv.against_voucher_type == ref_type) & (adv.against_voucher_no == ref_no)) | ((adv.voucher_type == ref_type) & (adv.voucher_no == ref_no)) ) ) if payment_name: adv_ple = adv_ple.where(adv.voucher_no == payment_name) adv_ple.run() def remove_ref_from_advance_section(ref_doc: object = None, payment_name: str | None = None): # TODO: this might need some testing if ref_doc.doctype in ("Sales Invoice", "Purchase Invoice"): row_names = [] for adv in ref_doc.get("advances") or []: if adv.get("reference_name", None) == payment_name: row_names.append(adv.name) if not row_names: return child_table = ( "Sales Invoice Advance" if ref_doc.doctype == "Sales Invoice" else "Purchase Invoice Advance" ) frappe.db.delete(child_table, {"name": ("in", row_names)}) def unlink_ref_doc_from_payment_entries(ref_doc: object = None, payment_name: str | None = None): remove_ref_doc_link_from_jv(ref_doc.doctype, ref_doc.name, payment_name) remove_ref_doc_link_from_pe(ref_doc.doctype, ref_doc.name, payment_name) update_accounting_ledgers_after_reference_removal(ref_doc.doctype, ref_doc.name, payment_name) remove_ref_from_advance_section(ref_doc, payment_name) def remove_ref_doc_link_from_jv( ref_type: str | None = None, ref_no: str | None = None, payment_name: str | None = None ): jea = qb.DocType("Journal Entry Account") linked_jv = ( qb.from_(jea) .select(jea.parent) .where((jea.reference_type == ref_type) & (jea.reference_name == ref_no) & (jea.docstatus.lt(2))) .run(as_list=1) ) linked_jv = convert_to_list(linked_jv) # remove reference only from specified payment linked_jv = [x for x in linked_jv if x == payment_name] if payment_name else linked_jv if linked_jv: update_query = ( qb.update(jea) .set(jea.reference_type, None) .set(jea.reference_name, None) .set(jea.advance_voucher_type, None) .set(jea.advance_voucher_no, None) .set(jea.modified, now()) .set(jea.modified_by, frappe.session.user) .where((jea.reference_type == ref_type) & (jea.reference_name == ref_no)) ) if payment_name: update_query = update_query.where(jea.parent == payment_name) update_query.run() frappe.msgprint(_("Journal Entries {0} are un-linked").format("\n".join(linked_jv))) def convert_to_list(result): """ Convert tuple to list """ return [x[0] for x in result] def remove_ref_doc_link_from_pe( ref_type: str | None = None, ref_no: str | None = None, payment_name: str | None = None ): per = qb.DocType("Payment Entry Reference") pay = qb.DocType("Payment Entry") query = ( qb.from_(per) .select("*") .where( (per.reference_doctype == ref_type) & (per.reference_name == ref_no) & (per.docstatus.lt(2)) & (per.parenttype == "Payment Entry") ) ) # update reference only from specified payment if payment_name: query = query.where(per.parent == payment_name) reference_rows = query.run(as_dict=True) if not reference_rows: return linked_pe = set() row_names = set() for row in reference_rows: linked_pe.add(row.parent) row_names.add(row.name) from erpnext.accounts.doctype.payment_request.payment_request import ( update_payment_requests_as_per_pe_references, ) # Update payment request amount update_payment_requests_as_per_pe_references(reference_rows, cancel=True) # Update allocated amounts and modified fields in one go ( qb.update(per) .set(per.allocated_amount, 0) .set(per.modified, now()) .set(per.modified_by, frappe.session.user) .where(per.name.isin(row_names)) .where(per.parenttype == "Payment Entry") .run() ) for pe in linked_pe: try: pe_doc = frappe.get_doc("Payment Entry", pe) pe_doc.set_amounts() # Call cancel on only removed reference references = [x for x in pe_doc.references if x.name in row_names] [pe_doc.make_advance_gl_entries(x, cancel=1) for x in references] pe_doc.clear_unallocated_reference_document_rows() pe_doc.validate_payment_type_with_outstanding() except Exception: msg = _("There were issues unlinking payment entry {0}.").format(pe_doc.name) msg += "
" msg += _("Please cancel payment entry manually first") frappe.throw(msg, exc=PaymentEntryUnlinkError, title=_("Payment Unlink Error")) ( qb.update(pay) .set(pay.total_allocated_amount, pe_doc.total_allocated_amount) .set(pay.base_total_allocated_amount, pe_doc.base_total_allocated_amount) .set(pay.unallocated_amount, pe_doc.unallocated_amount) .set(pay.modified, now()) .set(pay.modified_by, frappe.session.user) .where(pay.name == pe) .run() ) frappe.msgprint(_("Payment Entries {0} are un-linked").format("\n".join(linked_pe))) @frappe.whitelist() def get_company_default(company, fieldname, ignore_validation=False): value = frappe.get_cached_value("Company", company, fieldname) if not ignore_validation and not value: throw( _("Please set default {0} in Company {1}").format( _(frappe.get_meta("Company").get_label(fieldname)), company ) ) return value def fix_total_debit_credit(): vouchers = frappe.db.sql( """select voucher_type, voucher_no, sum(debit) - sum(credit) as diff from `tabGL Entry` group by voucher_type, voucher_no having sum(debit) != sum(credit)""", as_dict=1, ) for d in vouchers: if abs(d.diff) > 0: dr_or_cr = d.voucher_type == "Sales Invoice" and "credit" or "debit" frappe.db.sql( """update `tabGL Entry` set {} = {} + {} where voucher_type = {} and voucher_no = {} and {} > 0 limit 1""".format( dr_or_cr, dr_or_cr, "%s", "%s", "%s", dr_or_cr ), (d.diff, d.voucher_type, d.voucher_no), ) def get_currency_precision(): precision = cint(frappe.db.get_default("currency_precision")) if not precision: number_format = frappe.db.get_default("number_format") or "#,###.##" precision = get_number_format_info(number_format)[2] return precision def get_fraction_units(currency: str) -> int: """Returns the number of fraction units for a currency.""" fraction_units = frappe.db.get_value("Currency", currency, "fraction_units") if fraction_units is None: fraction_units = 100 return fraction_units @site_cache() def get_zero_cutoff(currency: str) -> float: """Returns the zero cutoff for a currency. For example, if the Fraction Units for a currency are set to 100, then the zero cutoff is 0.005. We don't want to display values less than the zero cutoff. This value was chosen for compatibility with the previous hard-coded value of 0.005. """ fraction_units = get_fraction_units(currency) return 0.5 / (fraction_units or 1) def get_held_invoices(party_type, party): """ Returns a list of names Purchase Invoices for the given party that are on hold """ held_invoices = None if party_type == "Supplier": held_invoices = frappe.db.sql( "select name from `tabPurchase Invoice` where on_hold = 1 and release_date IS NOT NULL and release_date > CURDATE()", as_dict=1, ) held_invoices = set(d["name"] for d in held_invoices) return held_invoices def get_outstanding_invoices( party_type, party, account, common_filter=None, posting_date=None, min_outstanding=None, max_outstanding=None, accounting_dimensions=None, vouchers=None, # list of dicts [{'voucher_type': '', 'voucher_no': ''}] for filtering limit=None, # passed by reconciliation tool voucher_no=None, # filter passed by reconciliation tool ): ple = qb.DocType("Payment Ledger Entry") outstanding_invoices = [] precision = frappe.get_precision("Sales Invoice", "outstanding_amount") or 2 if account: root_type, account_type = frappe.get_cached_value( "Account", account[0], ["root_type", "account_type"] ) party_account_type = "Receivable" if root_type == "Asset" else "Payable" party_account_type = account_type or party_account_type else: party_account_type = erpnext.get_party_account_type(party_type) held_invoices = get_held_invoices(party_type, party) common_filter = common_filter or [] common_filter.append(ple.account_type == party_account_type) common_filter.append(ple.account.isin(account)) common_filter.append(ple.party_type == party_type) common_filter.append(ple.party == party) ple_query = QueryPaymentLedger() invoice_list = ple_query.get_voucher_outstandings( vouchers=vouchers, common_filter=common_filter, posting_date=posting_date, min_outstanding=min_outstanding, max_outstanding=max_outstanding, get_invoices=True, accounting_dimensions=accounting_dimensions or [], limit=limit, voucher_no=voucher_no, ) for d in invoice_list: payment_amount = d.invoice_amount_in_account_currency - d.outstanding_in_account_currency outstanding_amount = d.outstanding_in_account_currency if outstanding_amount > 0.5 / (10**precision): if ( min_outstanding and max_outstanding and (outstanding_amount < min_outstanding or outstanding_amount > max_outstanding) ): continue if d.voucher_type != "Purchase Invoice" or d.voucher_no not in held_invoices: outstanding_invoices.append( frappe._dict( { "voucher_no": d.voucher_no, "voucher_type": d.voucher_type, "posting_date": d.posting_date, "invoice_amount": flt(d.invoice_amount_in_account_currency), "payment_amount": payment_amount, "outstanding_amount": outstanding_amount, "due_date": d.due_date, "currency": d.currency, "account": d.account, } ) ) outstanding_invoices = sorted(outstanding_invoices, key=lambda k: k["due_date"] or getdate(nowdate())) return outstanding_invoices def get_account_name(account_type=None, root_type=None, is_group=None, account_currency=None, company=None): """return account based on matching conditions""" return frappe.db.get_value( "Account", { "account_type": account_type or "", "root_type": root_type or "", "is_group": is_group or 0, "account_currency": account_currency or frappe.defaults.get_defaults().currency, "company": company or frappe.defaults.get_defaults().company, }, "name", ) @frappe.whitelist() def get_companies(): """get a list of companies based on permission""" return [d.name for d in frappe.get_list("Company", fields=["name"], order_by="name")] @frappe.whitelist() def get_children(doctype, parent, company, is_root=False, include_disabled=False): if isinstance(include_disabled, str): include_disabled = loads(include_disabled) from erpnext.accounts.report.financial_statements import sort_accounts parent_fieldname = "parent_" + doctype.lower().replace(" ", "_") fields = ["name as value", "is_group as expandable"] filters = [["docstatus", "<", 2]] if frappe.db.has_column(doctype, "disabled") and not include_disabled: filters.append(["disabled", "=", False]) if is_root: filters.append(IfNull(Field(parent_fieldname), "") == "") else: filters.append([parent_fieldname, "=", parent]) if is_root: fields += ["root_type", "report_type", "account_currency"] if doctype == "Account" else [] filters.append(["company", "=", company]) else: fields += ["root_type", "account_currency"] if doctype == "Account" else [] fields += [parent_fieldname + " as parent"] acc = frappe.get_list(doctype, fields=fields, filters=filters) if doctype == "Account": sort_accounts(acc, is_root, key="value") return acc @frappe.whitelist() def get_account_balances(accounts, company, finance_book=None, include_default_fb_balances=False): if isinstance(accounts, str): accounts = loads(accounts) if not accounts: return [] company_currency = frappe.get_cached_value("Company", company, "default_currency") for account in accounts: account["company_currency"] = company_currency account["balance"] = flt( get_balance_on( account=account["value"], in_account_currency=False, company=company, finance_book=finance_book, include_default_fb_balances=include_default_fb_balances, ) ) if account["account_currency"] and account["account_currency"] != company_currency: account["balance_in_account_currency"] = flt( get_balance_on( account=account["value"], company=company, finance_book=finance_book, include_default_fb_balances=include_default_fb_balances, ) ) return accounts @frappe.whitelist() def get_account_balances_coa(company: str, include_default_fb_balances: bool = False): company_currency = frappe.get_cached_value("Company", company, "default_currency") Account = DocType("Account") account_list = ( frappe.qb.from_(Account) .select(Account.name, Account.parent_account, Account.account_currency) .where(Account.company == company) .orderby(Account.lft) .run(as_dict=True) ) account_balances_cc = {account.get("name"): 0 for account in account_list} account_balances_ac = {account.get("name"): 0 for account in account_list} GLEntry = DocType("GL Entry") precision = get_currency_precision() get_ledger_balances_query = ( frappe.qb.from_(GLEntry) .select( GLEntry.account, (Sum(Round(GLEntry.debit, precision)) - Sum(Round(GLEntry.credit, precision))).as_("balance"), ( Sum(Round(GLEntry.debit_in_account_currency, precision)) - Sum(Round(GLEntry.credit_in_account_currency, precision)) ).as_("balance_in_account_currency"), ) .groupby(GLEntry.account) ) condition_list = [GLEntry.company == company, GLEntry.is_cancelled == 0] default_finance_book = None if include_default_fb_balances: default_finance_book = frappe.get_cached_value("Company", company, "default_finance_book") if default_finance_book: condition_list.append( (GLEntry.finance_book == default_finance_book) | (GLEntry.finance_book.isnull()) ) for condition in condition_list: get_ledger_balances_query = get_ledger_balances_query.where(condition) ledger_balances = get_ledger_balances_query.run(as_dict=True) for ledger_entry in ledger_balances: account_balances_cc[ledger_entry.get("account")] = ledger_entry.get("balance") account_balances_ac[ledger_entry.get("account")] = ledger_entry.get("balance_in_account_currency") for account in reversed(account_list): parent = account.get("parent_account") if parent: account_balances_cc[parent] += account_balances_cc.get(account.get("name")) accounts_data = [ { "value": account.get("name"), "company_currency": company_currency, "balance": account_balances_cc.get(account.get("name")), "account_currency": account.get("account_currency"), "balance_in_account_currency": account_balances_ac.get(account.get("name")), } for account in account_list ] return accounts_data def create_payment_gateway_account(gateway, payment_channel="Email", company=None): from erpnext.setup.setup_wizard.operations.install_fixtures import create_bank_account if not company: company = frappe.get_cached_value("Global Defaults", "Global Defaults", "default_company") if not company: return # NOTE: we translate Payment Gateway account name because that is going to be used by the end user bank_account = frappe.db.get_value( "Account", {"account_name": _(gateway), "company": company}, ["name", "account_currency"], as_dict=1, ) if not bank_account: # check for untranslated one bank_account = frappe.db.get_value( "Account", {"account_name": gateway, "company": company}, ["name", "account_currency"], as_dict=1, ) if not bank_account: # try creating one bank_account = create_bank_account({"company_name": company, "bank_account": _(gateway)}) if not bank_account: frappe.msgprint(_("Payment Gateway Account not created, please create one manually.")) return # if payment gateway account exists, return if frappe.db.exists( "Payment Gateway Account", {"payment_gateway": gateway, "currency": bank_account.account_currency}, ): return try: frappe.get_doc( { "doctype": "Payment Gateway Account", "is_default": 1, "payment_gateway": gateway, "payment_account": bank_account.name, "currency": bank_account.account_currency, "payment_channel": payment_channel, "company": company, } ).insert(ignore_permissions=True, ignore_if_duplicate=True) except frappe.DuplicateEntryError: # already exists, due to a reinstall? pass @frappe.whitelist() def update_cost_center(docname, cost_center_name, cost_center_number, company, merge): """ Renames the document by adding the number as a prefix to the current name and updates all transaction where it was present. """ validate_field_number("Cost Center", docname, cost_center_number, company, "cost_center_number") if cost_center_number: frappe.db.set_value("Cost Center", docname, "cost_center_number", cost_center_number.strip()) else: frappe.db.set_value("Cost Center", docname, "cost_center_number", "") frappe.db.set_value("Cost Center", docname, "cost_center_name", cost_center_name.strip()) new_name = get_autoname_with_number(cost_center_number, cost_center_name, company) if docname != new_name: frappe.rename_doc("Cost Center", docname, new_name, force=1, merge=merge) return new_name def validate_field_number(doctype_name, docname, number_value, company, field_name): """Validate if the number entered isn't already assigned to some other document.""" if number_value: filters = {field_name: number_value, "name": ["!=", docname]} if company: filters["company"] = company doctype_with_same_number = frappe.db.get_value(doctype_name, filters) if doctype_with_same_number: frappe.throw( _("{0} Number {1} is already used in {2} {3}").format( doctype_name, number_value, doctype_name.lower(), doctype_with_same_number ) ) def get_autoname_with_number(number_value, doc_title, company): """append title with prefix as number and suffix as company's abbreviation separated by '-'""" company_abbr = frappe.get_cached_value("Company", company, "abbr") parts = [doc_title.strip(), company_abbr] if cstr(number_value).strip(): parts.insert(0, cstr(number_value).strip()) return " - ".join(parts) def parse_naming_series_variable(doc, variable): if variable in ["FY", "TFY"]: if doc: date = doc.get("posting_date") or doc.get("transaction_date") or getdate() company = doc.get("company") else: date = getdate() company = None return get_fiscal_year(date=date, company=company, truncate=variable == "TFY")[0] elif variable == "ABBR": if doc: company = doc.get("company") or frappe.db.get_default("company") else: company = frappe.db.get_default("company") return frappe.db.get_value("Company", company, "abbr") if company else "" else: data = {"YY": "%y", "YYYY": "%Y", "MM": "%m", "DD": "%d", "JJJ": "%j"} if doc and doc.doctype in ["Batch", "Serial No"] and doc.reference_doctype and doc.reference_name: doc = frappe.get_doc(doc.reference_doctype, doc.reference_name) date = ( ( getdate(doc.get("posting_date") or doc.get("transaction_date") or doc.get("posting_datetime")) or now_datetime() ) if doc and frappe.get_single_value("Global Defaults", "use_posting_datetime_for_naming_documents") else now_datetime() ) return date.strftime(data[variable]) if variable in data else determine_consecutive_week_number(date) @frappe.whitelist() def get_coa(doctype, parent, is_root=None, chart=None): from erpnext.accounts.doctype.account.chart_of_accounts.chart_of_accounts import ( build_tree_from_json, ) # add chart to flags to retrieve when called from expand all function chart = chart if chart else frappe.flags.chart frappe.flags.chart = chart parent = None if parent == _("All Accounts") else parent accounts = build_tree_from_json(chart) # returns alist of dict in a tree render-able form # filter out to show data for the selected node only accounts = [d for d in accounts if d["parent_account"] == parent] return accounts def update_gl_entries_after( posting_date, posting_time, for_warehouses=None, for_items=None, warehouse_account=None, company=None, ): stock_vouchers = get_future_stock_vouchers(posting_date, posting_time, for_warehouses, for_items, company) repost_gle_for_stock_vouchers(stock_vouchers, posting_date, company, warehouse_account) def repost_gle_for_stock_vouchers( stock_vouchers: list[tuple[str, str]], posting_date: str, company: str | None = None, warehouse_account=None, repost_doc: Optional["RepostItemValuation"] = None, ): from erpnext.accounts.general_ledger import toggle_debit_credit_if_negative if not stock_vouchers: return if not warehouse_account: warehouse_account = get_warehouse_account_map(company) stock_vouchers = sort_stock_vouchers_by_posting_date(stock_vouchers, company=company) if repost_doc and repost_doc.gl_reposting_index: # Restore progress stock_vouchers = stock_vouchers[cint(repost_doc.gl_reposting_index) :] precision = get_field_precision(frappe.get_meta("GL Entry").get_field("debit")) or 2 for stock_vouchers_chunk in create_batch(stock_vouchers, GL_REPOSTING_CHUNK): gle = get_voucherwise_gl_entries(stock_vouchers_chunk, posting_date) for voucher_type, voucher_no in stock_vouchers_chunk: existing_gle = gle.get((voucher_type, voucher_no), []) voucher_obj = frappe.get_lazy_doc(voucher_type, voucher_no) # Some transactions post credit as negative debit, this is handled while posting GLE # but while comparing we need to make sure it's flipped so comparisons are accurate inventory_account_map = voucher_obj.get_inventory_account_map() expected_gle = toggle_debit_credit_if_negative(voucher_obj.get_gl_entries(inventory_account_map)) if expected_gle: if not existing_gle or not compare_existing_and_expected_gle( existing_gle, expected_gle, precision ): _delete_accounting_ledger_entries(voucher_type, voucher_no) voucher_obj.make_gl_entries(gl_entries=expected_gle, from_repost=True) else: _delete_accounting_ledger_entries(voucher_type, voucher_no) if not frappe.in_test: frappe.db.commit() if repost_doc: repost_doc.db_set( "gl_reposting_index", cint(repost_doc.gl_reposting_index) + len(stock_vouchers_chunk), ) def _delete_pl_entries(voucher_type, voucher_no): ple = qb.DocType("Payment Ledger Entry") qb.from_(ple).delete().where((ple.voucher_type == voucher_type) & (ple.voucher_no == voucher_no)).run() def _delete_adv_pl_entries(voucher_type, voucher_no): adv = qb.DocType("Advance Payment Ledger Entry") qb.from_(adv).delete().where((adv.voucher_type == voucher_type) & (adv.voucher_no == voucher_no)).run() def _delete_gl_entries(voucher_type, voucher_no): gle = qb.DocType("GL Entry") qb.from_(gle).delete().where((gle.voucher_type == voucher_type) & (gle.voucher_no == voucher_no)).run() def _delete_accounting_ledger_entries(voucher_type, voucher_no): """ Remove entries from both General and Payment Ledger for specified Voucher """ _delete_gl_entries(voucher_type, voucher_no) _delete_pl_entries(voucher_type, voucher_no) def sort_stock_vouchers_by_posting_date( stock_vouchers: list[tuple[str, str]], company=None ) -> list[tuple[str, str]]: sle = frappe.qb.DocType("Stock Ledger Entry") voucher_nos = [v[1] for v in stock_vouchers] sles = ( frappe.qb.from_(sle) .select(sle.voucher_type, sle.voucher_no, sle.posting_date, sle.posting_time, sle.creation) .where((sle.is_cancelled == 0) & (sle.voucher_no.isin(voucher_nos))) .groupby(sle.voucher_type, sle.voucher_no) .orderby(sle.posting_datetime) .orderby(sle.creation) ) if company: sles = sles.where(sle.company == company) sles = sles.run(as_dict=True) sorted_vouchers = [(sle.voucher_type, sle.voucher_no) for sle in sles] unknown_vouchers = set(stock_vouchers) - set(sorted_vouchers) if unknown_vouchers: sorted_vouchers.extend(unknown_vouchers) return sorted_vouchers def get_future_stock_vouchers(posting_date, posting_time, for_warehouses=None, for_items=None, company=None): values = [] condition = "" if for_items: condition += " and item_code in ({})".format(", ".join(["%s"] * len(for_items))) values += for_items if for_warehouses: condition += " and warehouse in ({})".format(", ".join(["%s"] * len(for_warehouses))) values += for_warehouses if company: condition += " and company = %s" values.append(company) future_stock_vouchers = frappe.db.sql( f"""select distinct sle.voucher_type, sle.voucher_no from `tabStock Ledger Entry` sle where timestamp(sle.posting_date, sle.posting_time) >= timestamp(%s, %s) and is_cancelled = 0 {condition} order by timestamp(sle.posting_date, sle.posting_time) asc, creation asc for update""", tuple([posting_date, posting_time, *values]), as_dict=True, ) return [(d.voucher_type, d.voucher_no) for d in future_stock_vouchers] def get_voucherwise_gl_entries(future_stock_vouchers, posting_date): """Get voucherwise list of GL entries. Only fetches GLE fields required for comparing with new GLE. Check compare_existing_and_expected_gle function below. returns: Dict[Tuple[voucher_type, voucher_no], List[GL Entries]] """ gl_entries = {} if not future_stock_vouchers: return gl_entries voucher_nos = [d[1] for d in future_stock_vouchers] gles = frappe.db.sql( """ select name, account, credit, debit, cost_center, project, voucher_type, voucher_no from `tabGL Entry` where posting_date >= {} and voucher_no in ({})""".format("%s", ", ".join(["%s"] * len(voucher_nos))), tuple([posting_date, *voucher_nos]), as_dict=1, ) for d in gles: gl_entries.setdefault((d.voucher_type, d.voucher_no), []).append(d) return gl_entries def compare_existing_and_expected_gle(existing_gle, expected_gle, precision): if len(existing_gle) != len(expected_gle): return False matched = True for entry in expected_gle: account_existed = False for e in existing_gle: if entry.account == e.account: account_existed = True if ( entry.account == e.account and (not entry.cost_center or not e.cost_center or entry.cost_center == e.cost_center) and ( flt(entry.debit, precision) != flt(e.debit, precision) or flt(entry.credit, precision) != flt(e.credit, precision) ) ): matched = False break if not account_existed: matched = False break return matched def get_stock_accounts(company, voucher_type=None, voucher_no=None, accounts=None): stock_accounts = [ d.name for d in frappe.db.get_all("Account", {"account_type": "Stock", "company": company, "is_group": 0}) ] if accounts: stock_accounts = [row.account for row in accounts if row.account in stock_accounts] elif voucher_type and voucher_no: if voucher_type == "Journal Entry": stock_accounts = [ d.account for d in frappe.db.get_all( "Journal Entry Account", {"parent": voucher_no, "account": ["in", stock_accounts]}, "account", ) ] else: stock_accounts = [ d.account for d in frappe.db.get_all( "GL Entry", { "voucher_type": voucher_type, "voucher_no": voucher_no, "account": ["in", stock_accounts], }, "account", ) ] return list(set(stock_accounts)) def get_stock_and_account_balance(account=None, posting_date=None, company=None): if not posting_date: posting_date = nowdate() account_balance = get_balance_on( account, posting_date, in_account_currency=False, ignore_account_permission=True ) account_table = frappe.qb.DocType("Account") query = ( frappe.qb.from_(account_table) .select(Count(account_table.name)) .where( (account_table.account_type == "Stock") & (account_table.company == company) & (account_table.is_group == 0) ) ) no_of_stock_accounts = cint(query.run()[0][0]) related_warehouses = [] if no_of_stock_accounts > 1: warehouse_account = get_warehouse_account_map(company) related_warehouses = [ wh for wh, wh_details in warehouse_account.items() if wh_details.account == account and not wh_details.is_group ] total_stock_value = get_stock_value_on(related_warehouses, posting_date, company=company) precision = frappe.get_precision("Journal Entry Account", "debit_in_account_currency") return flt(account_balance, precision), flt(total_stock_value, precision), related_warehouses def get_journal_entry(account, stock_adjustment_account, amount): db_or_cr_warehouse_account = "credit_in_account_currency" if amount < 0 else "debit_in_account_currency" db_or_cr_stock_adjustment_account = ( "debit_in_account_currency" if amount < 0 else "credit_in_account_currency" ) return { "accounts": [ {"account": account, db_or_cr_warehouse_account: abs(amount)}, {"account": stock_adjustment_account, db_or_cr_stock_adjustment_account: abs(amount)}, ] } def check_and_delete_linked_reports(report): """Check if reports are referenced in Desktop Icon""" icons = frappe.get_all("Desktop Icon", fields=["name"], filters={"_report": report}) if icons: for icon in icons: frappe.delete_doc("Desktop Icon", icon) def create_err_and_its_journals(company: dict) -> None: err = frappe.new_doc("Exchange Rate Revaluation") err.company = company.name err.posting_date = nowdate() err.rounding_loss_allowance = 0.0 err.fetch_and_calculate_accounts_data() if err.accounts: err.save().submit() response = err.make_jv_entries() if company.submit_err_jv: jv = response.get("revaluation_jv", None) jv and frappe.get_doc("Journal Entry", jv).submit() jv = response.get("zero_balance_jv", None) jv and frappe.get_doc("Journal Entry", jv).submit() def _auto_create_exchange_rate_revaluation_for(frequency: str) -> None: """ Internal helper to avoid code duplication and typos. Fetches companies by frequency and triggers ERR. """ companies = frappe.db.get_all( "Company", filters={"auto_exchange_rate_revaluation": 1, "auto_err_frequency": frequency}, fields=["name", "submit_err_jv"], ) if companies: for company in companies: frappe.enqueue( "erpnext.accounts.utils.create_err_and_its_journals", company=company, queue="long", ) def auto_create_exchange_rate_revaluation_daily() -> None: """ Executed by background job """ _auto_create_exchange_rate_revaluation_for("Daily") def auto_create_exchange_rate_revaluation_weekly() -> None: """ Executed by background job """ _auto_create_exchange_rate_revaluation_for("Weekly") def auto_create_exchange_rate_revaluation_monthly() -> None: """ Executed by background job """ _auto_create_exchange_rate_revaluation_for("Monthly") def get_payment_ledger_entries(gl_entries, cancel=0): ple_map = [] if gl_entries: ple = None # companies account = qb.DocType("Account") companies = list(set([x.company for x in gl_entries])) # receivable/payable account accounts_with_types = ( qb.from_(account) .select(account.name, account.account_type) .where(account.account_type.isin(["Receivable", "Payable"]) & (account.company.isin(companies))) .run(as_dict=True) ) receivable_or_payable_accounts = [y.name for y in accounts_with_types] def get_account_type(account): for entry in accounts_with_types: if entry.name == account: return entry.account_type dr_or_cr = 0 account_type = None for gle in gl_entries: if gle.account in receivable_or_payable_accounts: account_type = get_account_type(gle.account) if account_type == "Receivable": dr_or_cr = gle.debit - gle.credit dr_or_cr_account_currency = gle.debit_in_account_currency - gle.credit_in_account_currency elif account_type == "Payable": dr_or_cr = gle.credit - gle.debit dr_or_cr_account_currency = gle.credit_in_account_currency - gle.debit_in_account_currency if cancel: dr_or_cr *= -1 dr_or_cr_account_currency *= -1 against_voucher_type = ( gle.against_voucher_type if gle.against_voucher_type else gle.voucher_type ) against_voucher_no = gle.against_voucher if gle.against_voucher else gle.voucher_no ple = frappe._dict( doctype="Payment Ledger Entry", posting_date=gle.posting_date, company=gle.company, account_type=account_type, account=gle.account, party_type=gle.party_type, party=gle.party, project=gle.project, cost_center=gle.cost_center, finance_book=gle.finance_book, due_date=gle.due_date, voucher_type=gle.voucher_type, voucher_no=gle.voucher_no, voucher_detail_no=gle.voucher_detail_no, against_voucher_type=against_voucher_type, against_voucher_no=against_voucher_no, account_currency=gle.account_currency, amount=dr_or_cr, amount_in_account_currency=dr_or_cr_account_currency, delinked=cancel, remarks=gle.remarks, ) dimensions_and_defaults = get_dimensions() if dimensions_and_defaults: for dimension in dimensions_and_defaults[0]: ple[dimension.fieldname] = gle.get(dimension.fieldname) if gle.advance_voucher_no: # create advance entry base_amount, exchange_rate = ( (dr_or_cr, gle.transaction_exchange_rate) if gle.advance_voucher_type == "Employee Advance" else (None, None) ) adv = get_advance_ledger_entry( gle, against_voucher_type, against_voucher_no, dr_or_cr_account_currency, cancel, base_amount, exchange_rate, ) ple_map.append(adv) ple_map.append(ple) return ple_map def get_advance_ledger_entry( gle, against_voucher_type, against_voucher_no, amount, cancel, base_amount=None, exchange_rate=None ): event = ( "Submit" if (against_voucher_type == gle.voucher_type and against_voucher_no == gle.voucher_no) else "Adjustment" ) aple = frappe._dict( doctype="Advance Payment Ledger Entry", company=gle.company, voucher_type=gle.voucher_type, voucher_no=gle.voucher_no, voucher_detail_no=gle.voucher_detail_no, against_voucher_type=gle.advance_voucher_type, against_voucher_no=gle.advance_voucher_no, amount=amount, currency=gle.account_currency, event=event, delinked=cancel, ) if base_amount is not None: aple.base_amount = base_amount if exchange_rate is not None: aple.exchange_rate = exchange_rate return aple def create_payment_ledger_entry( gl_entries, cancel=0, adv_adj=0, update_outstanding="Yes", from_repost=0, partial_cancel=False ): if gl_entries: ple_map = get_payment_ledger_entries(gl_entries, cancel=cancel) for entry in ple_map: ple = frappe.get_doc(entry) if cancel: delink_original_entry(ple, partial_cancel=partial_cancel) if is_immutable_ledger_enabled(): ple.delinked = 0 ple.posting_date = frappe.form_dict.get("posting_date") or getdate() ple.flags.ignore_permissions = 1 ple.flags.adv_adj = adv_adj ple.flags.from_repost = from_repost ple.flags.update_outstanding = update_outstanding ple.submit() def update_voucher_outstanding(voucher_type, voucher_no, account, party_type, party): from erpnext.accounts.doctype.dunning.dunning import update_linked_dunnings if not voucher_type or not voucher_no: return if voucher_type in get_advance_payment_doctypes(): ref_doc = frappe.get_lazy_doc(voucher_type, voucher_no) ref_doc.set_total_advance_paid() return if not (voucher_type in OUTSTANDING_DOCTYPES and party_type and party): return ple = frappe.qb.DocType("Payment Ledger Entry") vouchers = [frappe._dict({"voucher_type": voucher_type, "voucher_no": voucher_no})] common_filter = [] common_filter.append(ple.party_type == party_type) common_filter.append(ple.party == party) if account: common_filter.append(ple.account == account) ple_query = QueryPaymentLedger() # on cancellation outstanding can be an empty list voucher_outstanding = ple_query.get_voucher_outstandings(vouchers, common_filter=common_filter) if not voucher_outstanding: return outstanding = voucher_outstanding[0] ref_doc = frappe.get_lazy_doc(voucher_type, voucher_no) previous_outstanding_amount = ref_doc.outstanding_amount outstanding_amount = flt( outstanding["outstanding_in_account_currency"], ref_doc.precision("outstanding_amount") ) # Didn't use db_set for optimisation purpose ref_doc.outstanding_amount = outstanding_amount frappe.db.set_value( voucher_type, voucher_no, "outstanding_amount", outstanding_amount, ) update_linked_dunnings(ref_doc, previous_outstanding_amount) ref_doc.set_status(update=True) ref_doc.notify_update() def delink_original_entry(pl_entry, partial_cancel=False): if not pl_entry: return if pl_entry.doctype == "Advance Payment Ledger Entry": adv = qb.DocType("Advance Payment Ledger Entry") ( qb.update(adv) .set(adv.delinked, 1) .set(adv.event, "Cancel") .set(adv.modified, now()) .set(adv.modified_by, frappe.session.user) .where(adv.voucher_type == pl_entry.voucher_type) .where(adv.voucher_no == pl_entry.voucher_no) .where(adv.against_voucher_type == pl_entry.against_voucher_type) .where(adv.against_voucher_no == pl_entry.against_voucher_no) .where(adv.event == pl_entry.event) .run() ) else: ple = qb.DocType("Payment Ledger Entry") query = ( qb.update(ple) .set(ple.modified, now()) .set(ple.modified_by, frappe.session.user) .where( (ple.company == pl_entry.company) & (ple.account_type == pl_entry.account_type) & (ple.account == pl_entry.account) & (ple.party_type == pl_entry.party_type) & (ple.party == pl_entry.party) & (ple.voucher_type == pl_entry.voucher_type) & (ple.voucher_no == pl_entry.voucher_no) & (ple.against_voucher_type == pl_entry.against_voucher_type) & (ple.against_voucher_no == pl_entry.against_voucher_no) ) ) if partial_cancel: query = query.where(ple.voucher_detail_no == pl_entry.voucher_detail_no) if not is_immutable_ledger_enabled(): query = query.set(ple.delinked, True) query.run() class QueryPaymentLedger: """ Helper Class for Querying Payment Ledger Entry """ def __init__(self): self.ple = qb.DocType("Payment Ledger Entry") # query result self.voucher_outstandings = [] # query filters self.vouchers = [] self.common_filter = [] self.voucher_posting_date = [] self.min_outstanding = None self.max_outstanding = None self.limit = self.voucher_no = None def reset(self): # clear filters self.vouchers.clear() self.common_filter.clear() self.min_outstanding = self.max_outstanding = self.limit = None # clear result self.voucher_outstandings.clear() def query_for_outstanding(self): """ Database query to fetch voucher amount and voucher outstanding using Common Table Expression """ ple = self.ple filter_on_voucher_no = [] filter_on_against_voucher_no = [] if self.vouchers: voucher_types = set([x.voucher_type for x in self.vouchers]) voucher_nos = set([x.voucher_no for x in self.vouchers]) filter_on_voucher_no.append(ple.voucher_type.isin(voucher_types)) filter_on_voucher_no.append(ple.voucher_no.isin(voucher_nos)) filter_on_against_voucher_no.append(ple.against_voucher_type.isin(voucher_types)) filter_on_against_voucher_no.append(ple.against_voucher_no.isin(voucher_nos)) if self.voucher_no: filter_on_voucher_no.append(ple.voucher_no.like(f"%{self.voucher_no}%")) filter_on_against_voucher_no.append(ple.against_voucher_no.like(f"%{self.voucher_no}%")) # build outstanding amount filter filter_on_outstanding_amount = [] if self.min_outstanding: if self.min_outstanding > 0: filter_on_outstanding_amount.append( Table("outstanding").amount_in_account_currency >= self.min_outstanding ) else: filter_on_outstanding_amount.append( Table("outstanding").amount_in_account_currency <= self.min_outstanding ) if self.max_outstanding: if self.max_outstanding > 0: filter_on_outstanding_amount.append( Table("outstanding").amount_in_account_currency <= self.max_outstanding ) else: filter_on_outstanding_amount.append( Table("outstanding").amount_in_account_currency >= self.max_outstanding ) if self.limit and self.get_invoices: outstanding_vouchers = ( qb.from_(ple) .select( ple.against_voucher_no.as_("voucher_no"), Sum(ple.amount_in_account_currency).as_("amount_in_account_currency"), Max( Case().when( ( (ple.voucher_no == ple.against_voucher_no) & (ple.voucher_type == ple.against_voucher_type) ), (ple.posting_date), ) ).as_("invoice_date"), ) .where(ple.delinked == 0) .where(Criterion.all(filter_on_against_voucher_no)) .where(Criterion.all(self.common_filter)) .where(Criterion.all(self.dimensions_filter)) .where(Criterion.all(self.voucher_posting_date)) .groupby(ple.against_voucher_type, ple.against_voucher_no, ple.party_type, ple.party) .orderby(ple.invoice_date, ple.voucher_no) .having(qb.Field("amount_in_account_currency") > 0) .limit(self.limit) .run() ) if outstanding_vouchers: filter_on_voucher_no.append(ple.voucher_no.isin([x[0] for x in outstanding_vouchers])) filter_on_against_voucher_no.append( ple.against_voucher_no.isin([x[0] for x in outstanding_vouchers]) ) # build query for voucher amount query_voucher_amount = ( qb.from_(ple) .select( ple.account, ple.voucher_type, ple.voucher_no, ple.party_type, ple.party, ple.posting_date, ple.due_date, ple.account_currency.as_("currency"), ple.cost_center.as_("cost_center"), Sum(ple.amount).as_("amount"), Sum(ple.amount_in_account_currency).as_("amount_in_account_currency"), ple.remarks, ) .where(ple.delinked == 0) .where(Criterion.all(filter_on_voucher_no)) .where(Criterion.all(self.common_filter)) .where(Criterion.all(self.dimensions_filter)) .where(Criterion.all(self.voucher_posting_date)) .groupby(ple.voucher_type, ple.voucher_no, ple.party_type, ple.party) ) # build query for voucher outstanding query_voucher_outstanding = ( qb.from_(ple) .select( ple.account, ple.against_voucher_type.as_("voucher_type"), ple.against_voucher_no.as_("voucher_no"), ple.party_type, ple.party, ple.posting_date, ple.due_date, ple.account_currency.as_("currency"), Sum(ple.amount).as_("amount"), Sum(ple.amount_in_account_currency).as_("amount_in_account_currency"), ) .where(ple.delinked == 0) .where(Criterion.all(filter_on_against_voucher_no)) .where(Criterion.all(self.common_filter)) .groupby(ple.against_voucher_type, ple.against_voucher_no, ple.party_type, ple.party) ) # build CTE for combining voucher amount and outstanding self.cte_query_voucher_amount_and_outstanding = ( qb.with_(query_voucher_amount, "vouchers") .with_(query_voucher_outstanding, "outstanding") .from_(AliasedQuery("vouchers")) .left_join(AliasedQuery("outstanding")) .on( (AliasedQuery("vouchers").account == AliasedQuery("outstanding").account) & (AliasedQuery("vouchers").voucher_type == AliasedQuery("outstanding").voucher_type) & (AliasedQuery("vouchers").voucher_no == AliasedQuery("outstanding").voucher_no) & (AliasedQuery("vouchers").party_type == AliasedQuery("outstanding").party_type) & (AliasedQuery("vouchers").party == AliasedQuery("outstanding").party) ) .select( Table("vouchers").account, Table("vouchers").voucher_type, Table("vouchers").voucher_no, Table("vouchers").party_type, Table("vouchers").party, Table("vouchers").posting_date, Table("vouchers").amount.as_("invoice_amount"), Table("vouchers").amount_in_account_currency.as_("invoice_amount_in_account_currency"), Table("outstanding").amount.as_("outstanding"), Table("outstanding").amount_in_account_currency.as_("outstanding_in_account_currency"), (Table("vouchers").amount - Table("outstanding").amount).as_("paid_amount"), ( Table("vouchers").amount_in_account_currency - Table("outstanding").amount_in_account_currency ).as_("paid_amount_in_account_currency"), Table("vouchers").due_date, Table("vouchers").currency, Table("vouchers").cost_center.as_("cost_center"), Table("vouchers").remarks, ) .where(Criterion.all(filter_on_outstanding_amount)) ) # build CTE filter # only fetch invoices if self.get_invoices: self.cte_query_voucher_amount_and_outstanding = ( self.cte_query_voucher_amount_and_outstanding.having( qb.Field("outstanding_in_account_currency") > 0 ) ) # only fetch payments elif self.get_payments: self.cte_query_voucher_amount_and_outstanding = ( self.cte_query_voucher_amount_and_outstanding.having( qb.Field("outstanding_in_account_currency") < 0 ) ) if self.limit: self.cte_query_voucher_amount_and_outstanding = ( self.cte_query_voucher_amount_and_outstanding.limit(self.limit) ) # execute SQL self.voucher_outstandings = self.cte_query_voucher_amount_and_outstanding.run(as_dict=True) def get_voucher_outstandings( self, vouchers=None, common_filter=None, posting_date=None, min_outstanding=None, max_outstanding=None, get_payments=False, get_invoices=False, accounting_dimensions=None, limit=None, voucher_no=None, ): """ Fetch voucher amount and outstanding amount from Payment Ledger using Database CTE vouchers - dict of vouchers to get common_filter - array of criterions min_outstanding - filter on minimum total outstanding amount max_outstanding - filter on maximum total outstanding amount get_invoices - only fetch vouchers(ledger entries with +ve outstanding) get_payments - only fetch payments(ledger entries with -ve outstanding) """ self.reset() self.vouchers = vouchers self.common_filter = common_filter or [] self.dimensions_filter = accounting_dimensions or [] self.voucher_posting_date = posting_date or [] self.min_outstanding = min_outstanding self.max_outstanding = max_outstanding self.get_payments = get_payments self.get_invoices = get_invoices self.limit = limit self.voucher_no = voucher_no self.query_for_outstanding() return self.voucher_outstandings def create_gain_loss_journal( company, posting_date, party_type, party, party_account, gain_loss_account, exc_gain_loss, dr_or_cr, reverse_dr_or_cr, ref1_dt, ref1_dn, ref1_detail_no, ref2_dt, ref2_dn, ref2_detail_no, cost_center, dimensions, project=None, ) -> str: journal_entry = frappe.new_doc("Journal Entry") journal_entry.voucher_type = "Exchange Gain Or Loss" journal_entry.company = company journal_entry.posting_date = posting_date or nowdate() journal_entry.multi_currency = 1 journal_entry.is_system_generated = True party_account_currency = frappe.get_cached_value("Account", party_account, "account_currency") if not gain_loss_account: frappe.throw(_("Please set default Exchange Gain/Loss Account in Company {}").format(company)) gain_loss_account_currency = get_account_currency(gain_loss_account) company_currency = frappe.get_cached_value("Company", company, "default_currency") if gain_loss_account_currency != company_currency: frappe.throw(_("Currency for {0} must be {1}").format(gain_loss_account, company_currency)) journal_account = frappe._dict( { "account": party_account, "party_type": party_type, "party": party, "account_currency": party_account_currency, "exchange_rate": 0, "cost_center": cost_center or erpnext.get_default_cost_center(company), "project": project, "reference_type": ref1_dt, "reference_name": ref1_dn, "reference_detail_no": ref1_detail_no, dr_or_cr: abs(exc_gain_loss), dr_or_cr + "_in_account_currency": 0, } ) if dimensions: journal_account.update(dimensions) journal_entry.append("accounts", journal_account) journal_account = frappe._dict( { "account": gain_loss_account, "account_currency": gain_loss_account_currency, "exchange_rate": 1, "cost_center": cost_center or erpnext.get_default_cost_center(company), "project": project, "reference_type": ref2_dt, "reference_name": ref2_dn, "reference_detail_no": ref2_detail_no, reverse_dr_or_cr + "_in_account_currency": 0, reverse_dr_or_cr: abs(exc_gain_loss), } ) if dimensions: journal_account.update(dimensions) journal_entry.append("accounts", journal_account) journal_entry.save() journal_entry.submit() return journal_entry.name def get_party_types_from_account_type(account_type): return frappe.db.get_all("Party Type", {"account_type": account_type}, pluck="name") def get_advance_payment_doctypes(payment_type=None): """ Get list of advance payment doctypes based on type. :param type: Optional, can be "receivable" or "payable". If not provided, returns both. """ if payment_type: return frappe.get_hooks(f"advance_payment_{payment_type}_doctypes") or [] return (frappe.get_hooks("advance_payment_receivable_doctypes") or []) + ( frappe.get_hooks("advance_payment_payable_doctypes") or [] ) def run_ledger_health_checks(): health_monitor_settings = frappe.get_doc("Ledger Health Monitor") if health_monitor_settings.enable_health_monitor: period_end = getdate() period_start = add_days(period_end, -abs(health_monitor_settings.monitor_for_last_x_days)) run_date = get_datetime() # Debit-Credit mismatch report if health_monitor_settings.debit_credit_mismatch: for x in health_monitor_settings.companies: filters = {"company": x.company, "from_date": period_start, "to_date": period_end} voucher_wise = frappe.get_doc("Report", "Voucher-wise Balance") res = voucher_wise.execute_script_report(filters=filters) for x in res[1]: doc = frappe.new_doc("Ledger Health") doc.voucher_type = x.voucher_type doc.voucher_no = x.voucher_no doc.debit_credit_mismatch = True doc.checked_on = run_date doc.save() # General Ledger and Payment Ledger discrepancy if health_monitor_settings.general_and_payment_ledger_mismatch: for x in health_monitor_settings.companies: filters = { "company": x.company, "period_start_date": period_start, "period_end_date": period_end, } gl_pl_comparison = frappe.get_doc("Report", "General and Payment Ledger Comparison") res = gl_pl_comparison.execute_script_report(filters=filters) for x in res[1]: doc = frappe.new_doc("Ledger Health") doc.voucher_type = x.voucher_type doc.voucher_no = x.voucher_no doc.general_and_payment_ledger_mismatch = True doc.checked_on = run_date doc.save() def sync_auto_reconcile_config(auto_reconciliation_job_trigger: int = 15): auto_reconciliation_job_trigger = auto_reconciliation_job_trigger or frappe.get_single_value( "Accounts Settings", "auto_reconciliation_job_trigger" ) method = "erpnext.accounts.doctype.process_payment_reconciliation.process_payment_reconciliation.trigger_reconciliation_for_queued_docs" sch_event = frappe.get_doc( "Scheduler Event", {"scheduled_against": "Process Payment Reconciliation", "method": method} ) if frappe.db.get_value("Scheduled Job Type", {"method": method}): frappe.get_doc( "Scheduled Job Type", { "method": method, }, ).update( { "cron_format": f"0/{auto_reconciliation_job_trigger} * * * *", "scheduler_event": sch_event.name, } ).save() else: frappe.get_doc( { "doctype": "Scheduled Job Type", "method": method, "scheduler_event": sch_event.name, "cron_format": f"0/{auto_reconciliation_job_trigger} * * * *", "create_log": True, "stopped": False, "frequency": "Cron", } ).save() def get_link_fields_grouped_by_option(doctype): meta = frappe.get_meta(doctype) link_fields_map = defaultdict(list) for df in meta.fields: if df.fieldtype == "Link" and df.options and not df.ignore_user_permissions: link_fields_map[df.options].append(df.fieldname) return link_fields_map def build_qb_match_conditions(doctype, user=None) -> list: match_filters = build_match_conditions(doctype, user, False) link_fields_map = get_link_fields_grouped_by_option(doctype) criterion = [] apply_strict_user_permissions = frappe.get_system_settings("apply_strict_user_permissions") if match_filters: _dt = qb.DocType(doctype) for filter in match_filters: for link_option, allowed_values in filter.items(): fieldnames = link_fields_map.get(link_option, []) cond = None if link_option == doctype: cond = _dt["name"].isin(allowed_values) for fieldname in fieldnames: field = _dt[fieldname] cond = field.isin(allowed_values) if not apply_strict_user_permissions: cond = (Coalesce(field, "") == "") | cond if cond: criterion.append(cond) return criterion def is_immutable_ledger_enabled(): return frappe.get_single_value("Accounts Settings", "enable_immutable_ledger")