refactor(treewide): formatting and ruff fixes, + manually enabled F401

Signed-off-by: Akhil Narang <me@akhilnarang.dev>
This commit is contained in:
Akhil Narang
2024-03-27 11:37:26 +05:30
parent c28d19cf7f
commit 4d34b1ead7
618 changed files with 4188 additions and 6384 deletions

View File

@@ -12,7 +12,7 @@ from erpnext.utilities.transaction_base import TransactionBase
class AuthorizationControl(TransactionBase):
def get_appr_user_role(self, det, doctype_name, total, based_on, condition, item, company):
amt_list, appr_users, appr_roles = [], [], []
users, roles = "", ""
_users, _roles = "", ""
if det:
for x in det:
amt_list.append(flt(x[0]))
@@ -20,18 +20,20 @@ class AuthorizationControl(TransactionBase):
app_dtl = frappe.db.sql(
"""select approving_user, approving_role from `tabAuthorization Rule`
where transaction = %s and (value = %s or value > %s)
and docstatus != 2 and based_on = %s and company = %s %s"""
% ("%s", "%s", "%s", "%s", "%s", condition),
where transaction = {} and (value = {} or value > {})
and docstatus != 2 and based_on = {} and company = {} {}""".format(
"%s", "%s", "%s", "%s", "%s", condition
),
(doctype_name, flt(max_amount), total, based_on, company),
)
if not app_dtl:
app_dtl = frappe.db.sql(
"""select approving_user, approving_role from `tabAuthorization Rule`
where transaction = %s and (value = %s or value > %s) and docstatus != 2
and based_on = %s and ifnull(company,'') = '' %s"""
% ("%s", "%s", "%s", "%s", condition),
where transaction = {} and (value = {} or value > {}) and docstatus != 2
and based_on = {} and ifnull(company,'') = '' {}""".format(
"%s", "%s", "%s", "%s", condition
),
(doctype_name, flt(max_amount), total, based_on),
)
@@ -54,18 +56,20 @@ class AuthorizationControl(TransactionBase):
add_cond1 += " and master_name = " + frappe.db.escape(cstr(item))
itemwise_exists = frappe.db.sql(
"""select value from `tabAuthorization Rule`
where transaction = %s and value <= %s
and based_on = %s and company = %s and docstatus != 2 %s %s"""
% ("%s", "%s", "%s", "%s", cond, add_cond1),
where transaction = {} and value <= {}
and based_on = {} and company = {} and docstatus != 2 {} {}""".format(
"%s", "%s", "%s", "%s", cond, add_cond1
),
(doctype_name, total, based_on, company),
)
if not itemwise_exists:
itemwise_exists = frappe.db.sql(
"""select value from `tabAuthorization Rule`
where transaction = %s and value <= %s and based_on = %s
and ifnull(company,'') = '' and docstatus != 2 %s %s"""
% ("%s", "%s", "%s", cond, add_cond1),
where transaction = {} and value <= {} and based_on = {}
and ifnull(company,'') = '' and docstatus != 2 {} {}""".format(
"%s", "%s", "%s", cond, add_cond1
),
(doctype_name, total, based_on),
)
@@ -80,18 +84,18 @@ class AuthorizationControl(TransactionBase):
appr = frappe.db.sql(
"""select value from `tabAuthorization Rule`
where transaction = %s and value <= %s and based_on = %s
and company = %s and docstatus != 2 %s %s"""
% ("%s", "%s", "%s", "%s", cond, add_cond2),
where transaction = {} and value <= {} and based_on = {}
and company = {} and docstatus != 2 {} {}""".format("%s", "%s", "%s", "%s", cond, add_cond2),
(doctype_name, total, based_on, company),
)
if not appr:
appr = frappe.db.sql(
"""select value from `tabAuthorization Rule`
where transaction = %s and value <= %s and based_on = %s
and ifnull(company,'') = '' and docstatus != 2 %s %s"""
% ("%s", "%s", "%s", cond, add_cond2),
where transaction = {} and value <= {} and based_on = {}
and ifnull(company,'') = '' and docstatus != 2 {} {}""".format(
"%s", "%s", "%s", cond, add_cond2
),
(doctype_name, total, based_on),
)
@@ -116,7 +120,7 @@ class AuthorizationControl(TransactionBase):
customer = doc_obj.customer
else:
customer = doc_obj.customer_name
add_cond = " and master_name = {}".format(frappe.db.escape(customer))
add_cond = f" and master_name = {frappe.db.escape(customer)}"
if based_on == "Itemwise Discount":
if doc_obj:
for t in doc_obj.get("items"):
@@ -175,11 +179,10 @@ class AuthorizationControl(TransactionBase):
for x in frappe.db.sql(
"""select based_on
from `tabAuthorization Rule`
where transaction = %s and system_role IN (%s) and based_on IN (%s)
and (company = %s or ifnull(company,'')='')
where transaction = {} and system_role IN ({}) and based_on IN ({})
and (company = {} or ifnull(company,'')='')
and docstatus != 2
"""
% (
""".format(
"%s",
"'" + "','".join(frappe.get_roles()) + "'",
"'" + "','".join(final_based_on) + "'",

View File

@@ -48,9 +48,7 @@ class AuthorizationRule(Document):
"Customerwise Discount",
"Itemwise Discount",
]:
frappe.throw(
_("Cannot set authorization on basis of Discount for {0}").format(self.transaction)
)
frappe.throw(_("Cannot set authorization on basis of Discount for {0}").format(self.transaction))
elif self.based_on == "Average Discount" and flt(self.value) > 100.00:
frappe.throw(_("Discount must be less than 100"))
elif self.based_on == "Customerwise Discount" and not self.master_name:

View File

@@ -38,9 +38,8 @@ class Company(NestedSet):
"Supplier Quotation",
]:
if frappe.db.sql(
"""select name from `tab%s` where company=%s and docstatus=1
limit 1"""
% (doctype, "%s"),
"""select name from `tab{}` where company={} and docstatus=1
limit 1""".format(doctype, "%s"),
self.name,
):
exists = True
@@ -73,9 +72,7 @@ class Company(NestedSet):
if not self.abbr.strip():
frappe.throw(_("Abbreviation is mandatory"))
if frappe.db.sql(
"select abbr from tabCompany where name!=%s and abbr=%s", (self.name, self.abbr)
):
if frappe.db.sql("select abbr from tabCompany where name!=%s and abbr=%s", (self.name, self.abbr)):
frappe.throw(_("Abbreviation already used for another company"))
@frappe.whitelist()
@@ -99,7 +96,9 @@ class Company(NestedSet):
for_company = frappe.db.get_value("Account", self.get(account[1]), "company")
if for_company != self.name:
frappe.throw(
_("Account {0} does not belong to company: {1}").format(self.get(account[1]), self.name)
_("Account {0} does not belong to company: {1}").format(
self.get(account[1]), self.name
)
)
if get_account_currency(self.get(account[1])) != self.default_currency:
@@ -111,9 +110,7 @@ class Company(NestedSet):
def validate_currency(self):
if self.is_new():
return
self.previous_default_currency = frappe.get_cached_value(
"Company", self.name, "default_currency"
)
self.previous_default_currency = frappe.get_cached_value("Company", self.name, "default_currency")
if (
self.default_currency
and self.previous_default_currency
@@ -177,20 +174,19 @@ class Company(NestedSet):
{"warehouse_name": _("Finished Goods"), "is_group": 0},
{"warehouse_name": _("Goods In Transit"), "is_group": 0, "warehouse_type": "Transit"},
]:
if not frappe.db.exists(
"Warehouse", "{0} - {1}".format(wh_detail["warehouse_name"], self.abbr)
):
if not frappe.db.exists("Warehouse", "{} - {}".format(wh_detail["warehouse_name"], self.abbr)):
warehouse = frappe.get_doc(
{
"doctype": "Warehouse",
"warehouse_name": wh_detail["warehouse_name"],
"is_group": wh_detail["is_group"],
"company": self.name,
"parent_warehouse": "{0} - {1}".format(_("All Warehouses"), self.abbr)
"parent_warehouse": "{} - {}".format(_("All Warehouses"), self.abbr)
if not wh_detail["is_group"]
else "",
"warehouse_type": wh_detail["warehouse_type"] if "warehouse_type" in wh_detail else None,
"warehouse_type": wh_detail["warehouse_type"]
if "warehouse_type" in wh_detail
else None,
}
)
warehouse.flags.ignore_permissions = True
@@ -212,9 +208,7 @@ class Company(NestedSet):
self.db_set(
"default_payable_account",
frappe.db.get_value(
"Account", {"company": self.name, "account_type": "Payable", "is_group": 0}
),
frappe.db.get_value("Account", {"company": self.name, "account_type": "Payable", "is_group": 0}),
)
def create_default_departments(self):
@@ -341,7 +335,9 @@ class Company(NestedSet):
and not self.default_provisional_account
):
frappe.throw(
_("Set default {0} account for non stock items").format(frappe.bold("Provisional Account"))
_("Set default {0} account for non stock items").format(
frappe.bold("Provisional Account")
)
)
make_property_setter(
@@ -356,9 +352,7 @@ class Company(NestedSet):
def check_country_change(self):
frappe.flags.country_change = False
if not self.is_new() and self.country != frappe.get_cached_value(
"Company", self.name, "country"
):
if not self.is_new() and self.country != frappe.get_cached_value("Company", self.name, "country"):
frappe.flags.country_change = True
def set_chart_of_accounts(self):
@@ -519,14 +513,14 @@ class Company(NestedSet):
)
for doctype in ["Account", "Cost Center", "Budget", "Party Account"]:
frappe.db.sql("delete from `tab{0}` where company = %s".format(doctype), self.name)
frappe.db.sql(f"delete from `tab{doctype}` where company = %s", self.name)
if not frappe.db.get_value("Stock Ledger Entry", {"company": self.name}):
frappe.db.sql("""delete from `tabWarehouse` where company=%s""", self.name)
frappe.defaults.clear_default("company", value=self.name)
for doctype in ["Mode of Payment Account", "Item Default"]:
frappe.db.sql("delete from `tab{0}` where company = %s".format(doctype), self.name)
frappe.db.sql(f"delete from `tab{doctype}` where company = %s", self.name)
# clear default accounts, warehouses from item
warehouses = frappe.db.sql_list("select name from tabWarehouse where company=%s", self.name)
@@ -559,7 +553,7 @@ class Company(NestedSet):
frappe.db.sql("delete from tabBOM where company=%s", self.name)
for dt in ("BOM Operation", "BOM Item", "BOM Scrap Item", "BOM Explosion Item"):
frappe.db.sql(
"delete from `tab%s` where parent in (%s)" "" % (dt, ", ".join(["%s"] * len(boms))),
"delete from `tab{}` where parent in ({})" "".format(dt, ", ".join(["%s"] * len(boms))),
tuple(boms),
)
@@ -615,7 +609,7 @@ def update_company_current_month_sales(company):
current_month_year = formatdate(today(), "MM-yyyy")
results = frappe.db.sql(
"""
f"""
SELECT
SUM(base_grand_total) AS total,
DATE_FORMAT(`posting_date`, '%m-%Y') AS month_year
@@ -624,12 +618,10 @@ def update_company_current_month_sales(company):
WHERE
DATE_FORMAT(`posting_date`, '%m-%Y') = '{current_month_year}'
AND docstatus = 1
AND company = {company}
AND company = {frappe.db.escape(company)}
GROUP BY
month_year
""".format(
current_month_year=current_month_year, company=frappe.db.escape(company)
),
""",
as_dict=True,
)
@@ -644,9 +636,7 @@ def update_company_monthly_sales(company):
from frappe.utils.goal import get_monthly_results
filter_str = "company = {0} and status != 'Draft' and docstatus=1".format(
frappe.db.escape(company)
)
filter_str = f"company = {frappe.db.escape(company)} and status != 'Draft' and docstatus=1"
month_to_value_dict = get_monthly_results(
"Sales Invoice", "base_grand_total", "posting_date", filter_str, "sum"
)
@@ -656,9 +646,7 @@ def update_company_monthly_sales(company):
def update_transactions_annual_history(company, commit=False):
transactions_history = get_all_transactions_annual_history(company)
frappe.db.set_value(
"Company", company, "transactions_annual_history", json.dumps(transactions_history)
)
frappe.db.set_value("Company", company, "transactions_annual_history", json.dumps(transactions_history))
if commit:
frappe.db.commit()
@@ -674,21 +662,19 @@ def cache_companies_monthly_sales_history():
@frappe.whitelist()
def get_children(doctype, parent=None, company=None, is_root=False):
if parent == None or parent == "All Companies":
if parent is None or parent == "All Companies":
parent = ""
return frappe.db.sql(
"""
f"""
select
name as value,
is_group as expandable
from
`tabCompany` comp
where
ifnull(parent_company, "")={parent}
""".format(
parent=frappe.db.escape(parent)
),
ifnull(parent_company, "")={frappe.db.escape(parent)}
""",
as_dict=1,
)
@@ -764,7 +750,6 @@ def get_all_transactions_annual_history(company):
def get_timeline_data(doctype, name):
"""returns timeline data based on linked records in dashboard"""
out = {}
date_to_value_dict = {}
history = frappe.get_cached_value("Company", name, "transactions_annual_history")
@@ -789,14 +774,13 @@ def get_default_company_address(name, sort_key="is_primary_address", existing_ad
out = frappe.db.sql(
""" SELECT
addr.name, addr.%s
addr.name, addr.{}
FROM
`tabAddress` addr, `tabDynamic Link` dl
WHERE
dl.parent = addr.name and dl.link_doctype = 'Company' and
dl.link_name = %s and ifnull(addr.disabled, 0) = 0
"""
% (sort_key, "%s"),
dl.link_name = {} and ifnull(addr.disabled, 0) = 0
""".format(sort_key, "%s"),
(name),
) # nosec

View File

@@ -22,7 +22,7 @@ class CurrencyExchange(Document):
if cint(self.for_buying) == 1 and cint(self.for_selling) == 0:
purpose = "Buying"
self.name = "{0}-{1}-{2}{3}".format(
self.name = "{}-{}-{}{}".format(
formatdate(get_datetime_str(self.date), "yyyy-MM-dd"),
self.from_currency,
self.to_currency,

View File

@@ -49,6 +49,7 @@ def save_new_records(test_records):
test_exchange_values = {"2015-12-15": "66.999", "2016-01-15": "65.1"}
# Removing API call from get_exchange_rate
def patched_requests_get(*args, **kwargs):
class PatchResponse:
@@ -83,7 +84,7 @@ class TestCurrencyExchange(unittest.TestCase):
def clear_cache(self):
cache = frappe.cache()
for date in test_exchange_values.keys():
key = "currency_exchange_rate_{0}:{1}:{2}".format(date, "USD", "INR")
key = "currency_exchange_rate_{}:{}:{}".format(date, "USD", "INR")
cache.delete(key)
def tearDown(self):

View File

@@ -16,7 +16,7 @@ class CustomerGroup(NestedSet):
def on_update(self):
self.validate_name_with_customer()
super(CustomerGroup, self).on_update()
super().on_update()
self.validate_one_root()
def validate_name_with_customer(self):

View File

@@ -26,17 +26,17 @@ class Department(NestedSet):
def before_rename(self, old, new, merge=False):
# renaming consistency with abbreviation
if not frappe.get_cached_value("Company", self.company, "abbr") in new:
if frappe.get_cached_value("Company", self.company, "abbr") not in new:
new = get_abbreviated_name(new, self.company)
return new
def on_update(self):
if not (frappe.local.flags.ignore_update_nsm or frappe.flags.in_setup_wizard):
super(Department, self).on_update()
super().on_update()
def on_trash(self):
super(Department, self).on_trash()
super().on_trash()
delete_events(self.doctype, self.name)
@@ -46,7 +46,7 @@ def on_doctype_update():
def get_abbreviated_name(name, company):
abbr = frappe.get_cached_value("Company", company, "abbr")
new_name = "{0} - {1}".format(name, abbr)
new_name = f"{name} - {abbr}"
return new_name

View File

@@ -31,7 +31,7 @@ from frappe.model.document import Document
class EmailDigest(Document):
def __init__(self, *args, **kwargs):
super(EmailDigest, self).__init__(*args, **kwargs)
super().__init__(*args, **kwargs)
self.from_date, self.to_date = self.get_from_to_date()
self.set_dates()
@@ -46,9 +46,7 @@ class EmailDigest(Document):
select name, enabled from tabUser
where name not in ({})
and user_type != "Website User"
order by enabled desc, name asc""".format(
", ".join(["%s"] * len(STANDARD_USERS))
),
order by enabled desc, name asc""".format(", ".join(["%s"] * len(STANDARD_USERS))),
STANDARD_USERS,
as_dict=1,
)
@@ -151,15 +149,11 @@ class EmailDigest(Document):
context.text_color = "#36414C"
context.h1 = "margin-bottom: 30px; margin-top: 40px; font-weight: 400; font-size: 30px;"
context.h2 = "margin-bottom: 30px; margin-top: -20px; font-weight: 400; font-size: 20px;"
context.label_css = """display: inline-block; color: {text_muted};
padding: 3px 7px; margin-right: 7px;""".format(
text_muted=context.text_muted
)
context.label_css = f"""display: inline-block; color: {context.text_muted};
padding: 3px 7px; margin-right: 7px;"""
context.section_head = "margin-top: 60px; font-size: 16px;"
context.line_item = "padding: 5px 0px; margin: 0; border-bottom: 1px solid #d1d8dd;"
context.link_css = "color: {text_color}; text-decoration: none;".format(
text_color=context.text_color
)
context.link_css = f"color: {context.text_color}; text-decoration: none;"
def get_notifications(self):
"""Get notifications for user"""
@@ -182,7 +176,7 @@ class EmailDigest(Document):
events = get_events(from_date, to_date)
event_count = 0
for i, e in enumerate(events):
for _i, e in enumerate(events):
e.starts_on_label = format_time(e.starts_on)
e.ends_on_label = format_time(e.ends_on) if e.ends_on else None
e.date = formatdate(e.starts)
@@ -298,11 +292,8 @@ class EmailDigest(Document):
"new_quotations",
"pending_quotations",
):
if self.get(key):
cache_key = "email_digest:card:{0}:{1}:{2}:{3}".format(
self.company, self.frequency, key, self.from_date
)
cache_key = f"email_digest:card:{self.company}:{self.frequency}:{key}:{self.from_date}"
card = cache.get(cache_key)
if card:
@@ -421,9 +412,7 @@ class EmailDigest(Document):
return self.get_type_balance("invoiced_amount", "Receivable")
def get_expenses_booked(self):
expenses, past_expenses, count = self.get_period_amounts(
self.get_roots("expense"), "expenses_booked"
)
expenses, past_expenses, count = self.get_period_amounts(self.get_roots("expense"), "expenses_booked")
expense_account = frappe.db.get_all(
"Account",
@@ -559,7 +548,6 @@ class EmailDigest(Document):
return {"label": label, "value": value, "count": count}
def get_type_balance(self, fieldname, account_type, root_type=None):
if root_type:
accounts = [
d.name
@@ -645,57 +633,47 @@ class EmailDigest(Document):
]
def get_root_type_accounts(self, root_type):
if not root_type in self._accounts:
if root_type not in self._accounts:
self._accounts[root_type] = [
d.name
for d in frappe.db.get_all(
"Account", filters={"root_type": root_type.title(), "company": self.company, "is_group": 0}
"Account",
filters={"root_type": root_type.title(), "company": self.company, "is_group": 0},
)
]
return self._accounts[root_type]
def get_purchase_order(self):
return self.get_summary_of_doc("Purchase Order", "purchase_order")
def get_sales_order(self):
return self.get_summary_of_doc("Sales Order", "sales_order")
def get_pending_purchase_orders(self):
return self.get_summary_of_pending("Purchase Order", "pending_purchase_orders", "per_received")
def get_pending_sales_orders(self):
return self.get_summary_of_pending("Sales Order", "pending_sales_orders", "per_delivered")
def get_sales_invoice(self):
return self.get_summary_of_doc("Sales Invoice", "sales_invoice")
def get_purchase_invoice(self):
return self.get_summary_of_doc("Purchase Invoice", "purchase_invoice")
def get_new_quotations(self):
return self.get_summary_of_doc("Quotation", "new_quotations")
def get_pending_quotations(self):
return self.get_summary_of_pending_quotations("pending_quotations")
def get_summary_of_pending(self, doc_type, fieldname, getfield):
value, count, billed_value, delivered_value = frappe.db.sql(
"""select ifnull(sum(grand_total),0), count(*),
ifnull(sum(grand_total*per_billed/100),0), ifnull(sum(grand_total*{0}/100),0) from `tab{1}`
ifnull(sum(grand_total*per_billed/100),0), ifnull(sum(grand_total*{}/100),0) from `tab{}`
where (transaction_date <= %(to_date)s)
and status not in ('Closed','Cancelled', 'Completed')
and company = %(company)s """.format(
getfield, doc_type
),
and company = %(company)s """.format(getfield, doc_type),
{"to_date": self.future_to_date, "company": self.company},
)[0]
@@ -708,7 +686,6 @@ class EmailDigest(Document):
}
def get_summary_of_pending_quotations(self, fieldname):
value, count = frappe.db.sql(
"""select ifnull(sum(grand_total),0), count(*) from `tabQuotation`
where (transaction_date <= %(to_date)s)
@@ -741,19 +718,14 @@ class EmailDigest(Document):
return {"label": label, "value": value, "last_value": last_value, "count": count}
def get_summary_of_doc(self, doc_type, fieldname):
date_field = (
"posting_date" if doc_type in ["Sales Invoice", "Purchase Invoice"] else "transaction_date"
)
value = flt(
self.get_total_on(doc_type, self.future_from_date, self.future_to_date)[0].grand_total
)
value = flt(self.get_total_on(doc_type, self.future_from_date, self.future_to_date)[0].grand_total)
count = self.get_total_on(doc_type, self.future_from_date, self.future_to_date)[0].count
last_value = flt(
self.get_total_on(doc_type, self.past_from_date, self.past_to_date)[0].grand_total
)
last_value = flt(self.get_total_on(doc_type, self.past_from_date, self.past_to_date)[0].grand_total)
filters = {
date_field: [[">=", self.future_from_date], ["<=", self.future_to_date]],
@@ -772,7 +744,6 @@ class EmailDigest(Document):
return {"label": label, "value": value, "last_value": last_value, "count": count}
def get_total_on(self, doc_type, from_date, to_date):
date_field = (
"posting_date" if doc_type in ["Sales Invoice", "Purchase Invoice"] else "transaction_date"
)
@@ -853,20 +824,16 @@ class EmailDigest(Document):
"received_qty, qty - received_qty as missing_qty, rate, amount"
)
sql_po = """select {fields} from `tabPurchase Order Item`
sql_po = f"""select {fields_po} from `tabPurchase Order Item`
left join `tabPurchase Order` on `tabPurchase Order`.name = `tabPurchase Order Item`.parent
where status<>'Closed' and `tabPurchase Order Item`.docstatus=1 and CURRENT_DATE > `tabPurchase Order Item`.schedule_date
and received_qty < qty order by `tabPurchase Order Item`.parent DESC,
`tabPurchase Order Item`.schedule_date DESC""".format(
fields=fields_po
)
`tabPurchase Order Item`.schedule_date DESC"""
sql_poi = """select {fields} from `tabPurchase Order Item`
sql_poi = f"""select {fields_poi} from `tabPurchase Order Item`
left join `tabPurchase Order` on `tabPurchase Order`.name = `tabPurchase Order Item`.parent
where status<>'Closed' and `tabPurchase Order Item`.docstatus=1 and CURRENT_DATE > `tabPurchase Order Item`.schedule_date
and received_qty < qty order by `tabPurchase Order Item`.idx""".format(
fields=fields_poi
)
and received_qty < qty order by `tabPurchase Order Item`.idx"""
purchase_order_list = frappe.db.sql(sql_po, as_dict=True)
purchase_order_items_overdue_list = frappe.db.sql(sql_poi, as_dict=True)

View File

@@ -265,16 +265,12 @@ def validate_employee_role(doc, method=None, ignore_emp_check=False):
user_roles = [d.role for d in doc.get("roles")]
if "Employee" in user_roles:
frappe.msgprint(
_("User {0}: Removed Employee role as there is no mapped employee.").format(doc.name)
)
frappe.msgprint(_("User {0}: Removed Employee role as there is no mapped employee.").format(doc.name))
doc.get("roles").remove(doc.get("roles", {"role": "Employee"})[0])
if "Employee Self Service" in user_roles:
frappe.msgprint(
_("User {0}: Removed Employee Self Service role as there is no mapped employee.").format(
doc.name
)
_("User {0}: Removed Employee Self Service role as there is no mapped employee.").format(doc.name)
)
doc.get("roles").remove(doc.get("roles", {"role": "Employee Self Service"})[0])
@@ -290,17 +286,13 @@ def update_user_permissions(doc, method):
def get_employee_email(employee_doc):
return (
employee_doc.get("user_id")
or employee_doc.get("personal_email")
or employee_doc.get("company_email")
employee_doc.get("user_id") or employee_doc.get("personal_email") or employee_doc.get("company_email")
)
def get_holiday_list_for_employee(employee, raise_exception=True):
if employee:
holiday_list, company = frappe.get_cached_value(
"Employee", employee, ["holiday_list", "company"]
)
holiday_list, company = frappe.get_cached_value("Employee", employee, ["holiday_list", "company"])
else:
holiday_list = ""
company = frappe.db.get_single_value("Global Defaults", "default_company")
@@ -316,9 +308,7 @@ def get_holiday_list_for_employee(employee, raise_exception=True):
return holiday_list
def is_holiday(
employee, date=None, raise_exception=True, only_non_weekly=False, with_description=False
):
def is_holiday(employee, date=None, raise_exception=True, only_non_weekly=False, with_description=False):
"""
Returns True if given Employee has an holiday on the given date
:param employee: Employee `name`
@@ -428,7 +418,6 @@ def get_employee_emails(employee_list):
@frappe.whitelist()
def get_children(doctype, parent=None, company=None, is_root=False, is_tree=False):
filters = [["status", "=", "Active"]]
if company and company != "All Companies":
filters.append(["company", "=", company])

View File

@@ -161,9 +161,7 @@ def is_holiday(holiday_list, date=None):
if date is None:
date = today()
if holiday_list:
return bool(
frappe.db.exists("Holiday", {"parent": holiday_list, "holiday_date": date}, cache=True)
)
return bool(frappe.db.exists("Holiday", {"parent": holiday_list, "holiday_date": date}, cache=True))
else:
return False

View File

@@ -112,9 +112,7 @@ class TestHolidayList(unittest.TestCase):
frappe.local.lang = lang
def make_holiday_list(
name, from_date=getdate() - timedelta(days=10), to_date=getdate(), holiday_dates=None
):
def make_holiday_list(name, from_date=getdate() - timedelta(days=10), to_date=getdate(), holiday_dates=None):
frappe.delete_doc_if_exists("Holiday List", name, force=1)
doc = frappe.get_doc(
{

View File

@@ -14,7 +14,7 @@ def create_incoterms():
import os
from csv import DictReader
with open(os.path.join(os.path.dirname(__file__), "incoterms.csv"), "r") as f:
with open(os.path.join(os.path.dirname(__file__), "incoterms.csv")) as f:
for incoterm in DictReader(f):
if frappe.db.exists("Incoterm", incoterm["code"]):
continue

View File

@@ -25,7 +25,7 @@ class ItemGroup(NestedSet, WebsiteGenerator):
)
def validate(self):
super(ItemGroup, self).validate()
super().validate()
if not self.parent_item_group and not frappe.flags.in_test:
if frappe.db.exists("Item Group", _("All Item Groups")):
@@ -45,7 +45,7 @@ class ItemGroup(NestedSet, WebsiteGenerator):
frappe.throw(
_("{0} entered twice {1} in Item Taxes").format(
frappe.bold(d.item_tax_template),
"for tax category {0}".format(frappe.bold(d.tax_category)) if d.tax_category else "",
f"for tax category {frappe.bold(d.tax_category)}" if d.tax_category else "",
)
)
else:

View File

@@ -124,9 +124,7 @@ class TestItem(unittest.TestCase):
def print_tree(self):
import json
print(
json.dumps(frappe.db.sql("select name, lft, rgt from `tabItem Group` order by lft"), indent=1)
)
print(json.dumps(frappe.db.sql("select name, lft, rgt from `tabItem Group` order by lft"), indent=1))
def test_move_leaf_into_another_group(self):
# before move
@@ -156,17 +154,13 @@ class TestItem(unittest.TestCase):
def test_delete_leaf(self):
# for checking later
parent_item_group = frappe.db.get_value(
"Item Group", "_Test Item Group B - 3", "parent_item_group"
)
rgt = frappe.db.get_value("Item Group", parent_item_group, "rgt")
parent_item_group = frappe.db.get_value("Item Group", "_Test Item Group B - 3", "parent_item_group")
frappe.db.get_value("Item Group", parent_item_group, "rgt")
ancestors = get_ancestors_of("Item Group", "_Test Item Group B - 3")
ancestors = frappe.db.sql(
"""select name, rgt from `tabItem Group`
where name in ({})""".format(
", ".join(["%s"] * len(ancestors))
),
where name in ({})""".format(", ".join(["%s"] * len(ancestors))),
tuple(ancestors),
as_dict=True,
)
@@ -188,9 +182,7 @@ class TestItem(unittest.TestCase):
def test_delete_group(self):
# cannot delete group with child, but can delete leaf
self.assertRaises(
NestedSetChildExistsError, frappe.delete_doc, "Item Group", "_Test Item Group B"
)
self.assertRaises(NestedSetChildExistsError, frappe.delete_doc, "Item Group", "_Test Item Group B")
def test_merge_groups(self):
frappe.rename_doc("Item Group", "_Test Item Group B", "_Test Item Group C", merge=True)
@@ -207,7 +199,6 @@ class TestItem(unittest.TestCase):
"""select name from `tabItem Group`
where parent_item_group='_Test Item Group C'"""
):
doc = frappe.get_doc("Item Group", name)
doc.parent_item_group = "_Test Item Group B"
doc.save()

View File

@@ -19,10 +19,8 @@ def get_party_type(doctype, txt, searchfield, start, page_len, filters):
cond = "and account_type = '%s'" % account_type
return frappe.db.sql(
"""select name from `tabParty Type`
where `{key}` LIKE %(txt)s {cond}
order by name limit %(page_len)s offset %(start)s""".format(
key=searchfield, cond=cond
),
f"""select name from `tabParty Type`
where `{searchfield}` LIKE %(txt)s {cond}
order by name limit %(page_len)s offset %(start)s""",
{"txt": "%" + txt + "%", "start": start, "page_len": page_len},
)

View File

@@ -25,7 +25,7 @@ class SalesPartner(WebsiteGenerator):
def validate(self):
if not self.route:
self.route = "partners/" + self.scrub(self.partner_name)
super(SalesPartner, self).validate()
super().validate()
if self.partner_website and not self.partner_website.startswith("http"):
self.partner_website = "http://" + self.partner_website

View File

@@ -52,7 +52,7 @@ class SalesPerson(NestedSet):
self.set_onload("dashboard_info", info)
def on_update(self):
super(SalesPerson, self).on_update()
super().on_update()
self.validate_one_root()
def get_email_id(self):
@@ -78,7 +78,6 @@ def on_doctype_update():
def get_timeline_data(doctype, name):
out = {}
out.update(

View File

@@ -15,12 +15,7 @@ class TermsandConditions(Document):
def validate(self):
if self.terms:
validate_template(self.terms)
if (
not cint(self.buying)
and not cint(self.selling)
and not cint(self.hr)
and not cint(self.disabled)
):
if not cint(self.buying) and not cint(self.selling) and not cint(self.hr) and not cint(self.disabled):
throw(_("At least one of the Applicable Modules should be selected"))

View File

@@ -20,7 +20,7 @@ class Territory(NestedSet):
frappe.throw(_("Either target qty or target amount is mandatory"))
def on_update(self):
super(Territory, self).on_update()
super().on_update()
self.validate_one_root()

View File

@@ -25,7 +25,7 @@ class TestTransactionDeletionRecord(unittest.TestCase):
self.assertTrue(contains_company)
def test_no_of_docs_is_correct(self):
for i in range(5):
for _i in range(5):
create_task("Dunder Mifflin Paper Co")
tdr = create_transaction_deletion_request("Dunder Mifflin Paper Co")
tdr.reload()
@@ -41,9 +41,7 @@ class TestTransactionDeletionRecord(unittest.TestCase):
def create_company(company_name):
company = frappe.get_doc(
{"doctype": "Company", "company_name": company_name, "default_currency": "INR"}
)
company = frappe.get_doc({"doctype": "Company", "company_name": company_name, "default_currency": "INR"})
company.insert(ignore_if_duplicate=True)

View File

@@ -13,7 +13,7 @@ from frappe.utils.background_jobs import create_job_id, is_job_enqueued
class TransactionDeletionRecord(Document):
def __init__(self, *args, **kwargs):
super(TransactionDeletionRecord, self).__init__(*args, **kwargs)
super().__init__(*args, **kwargs)
self.batch_size = 5000
# Tasks are listed by their execution order
self.task_to_internal_method_map = OrderedDict(
@@ -127,7 +127,7 @@ class TransactionDeletionRecord(Document):
if task := getattr(self, method, None):
try:
task()
except Exception as err:
except Exception:
frappe.db.rollback()
traceback = frappe.get_traceback(with_context=True)
if traceback:
@@ -147,7 +147,7 @@ class TransactionDeletionRecord(Document):
for doctype in doctypes_to_be_ignored_list:
self.append("doctypes_to_be_ignored", {"doctype_name": doctype})
def validate_running_task_for_doc(self, job_names: list = None):
def validate_running_task_for_doc(self, job_names: list | None = None):
# at most only one task should be runnning
running_tasks = []
for x in job_names:
@@ -196,9 +196,7 @@ class TransactionDeletionRecord(Document):
if leads:
addresses = frappe.db.sql_list(
"""select parent from `tabDynamic Link` where link_name
in ({leads})""".format(
leads=",".join(leads)
)
in ({leads})""".format(leads=",".join(leads))
)
if addresses:
@@ -208,16 +206,12 @@ class TransactionDeletionRecord(Document):
"""delete from `tabAddress` where name in ({addresses}) and
name not in (select distinct dl1.parent from `tabDynamic Link` dl1
inner join `tabDynamic Link` dl2 on dl1.parent=dl2.parent
and dl1.link_doctype<>dl2.link_doctype)""".format(
addresses=",".join(addresses)
)
and dl1.link_doctype<>dl2.link_doctype)""".format(addresses=",".join(addresses))
)
frappe.db.sql(
"""delete from `tabDynamic Link` where link_doctype='Lead'
and parenttype='Address' and link_name in ({leads})""".format(
leads=",".join(leads)
)
and parenttype='Address' and link_name in ({leads})""".format(leads=",".join(leads))
)
frappe.db.sql(
@@ -259,9 +253,9 @@ class TransactionDeletionRecord(Document):
self.validate_doc_status()
if not self.delete_transactions:
doctypes_to_be_ignored_list = self.get_doctypes_to_be_ignored_list()
docfields = self.get_doctypes_with_company_field(doctypes_to_be_ignored_list)
self.get_doctypes_with_company_field(doctypes_to_be_ignored_list)
tables = self.get_all_child_doctypes()
self.get_all_child_doctypes()
for docfield in self.doctypes:
if docfield.doctype_name != self.doctype and not docfield.done:
no_of_docs = self.get_number_of_docs_linked_with_specified_company(
@@ -269,7 +263,9 @@ class TransactionDeletionRecord(Document):
)
if no_of_docs > 0:
reference_docs = frappe.get_all(
docfield.doctype_name, filters={docfield.docfield_name: self.company}, limit=self.batch_size
docfield.doctype_name,
filters={docfield.docfield_name: self.company},
limit=self.batch_size,
)
reference_doc_names = [r.name for r in reference_docs]
@@ -278,7 +274,9 @@ class TransactionDeletionRecord(Document):
self.delete_comments(docfield.doctype_name, reference_doc_names)
self.unlink_attachments(docfield.doctype_name, reference_doc_names)
self.delete_child_tables(docfield.doctype_name, reference_doc_names)
self.delete_docs_linked_with_specified_company(docfield.doctype_name, reference_doc_names)
self.delete_docs_linked_with_specified_company(
docfield.doctype_name, reference_doc_names
)
processed = int(docfield.no_of_docs) + len(reference_doc_names)
frappe.db.set_value(docfield.doctype, docfield.name, "no_of_docs", processed)
else:
@@ -355,10 +353,8 @@ class TransactionDeletionRecord(Document):
else:
prefix, hashes = naming_series.rsplit("{", 1)
last = frappe.db.sql(
"""select max(name) from `tab{0}`
where name like %s""".format(
doctype_name
),
f"""select max(name) from `tab{doctype_name}`
where name like %s""",
prefix + "%",
)
if last and last[0][0]: