refactor: use flags to decide on current stage

This commit is contained in:
ruthra kumar
2024-02-03 20:21:30 +05:30
parent 8944ab8b6a
commit 6a77d86a53
5 changed files with 141 additions and 98 deletions

View File

@@ -16,9 +16,15 @@ frappe.ui.form.on("Transaction Deletion Record", {
});
}
frm.get_field("doctypes_to_be_ignored").grid.cannot_add_rows = true;
frm.fields_dict["doctypes_to_be_ignored"].grid.set_column_disp("no_of_docs", false);
frm.refresh_field("doctypes_to_be_ignored");
frm.get_field('doctypes_to_be_ignored').grid.cannot_add_rows = true;
frm.fields_dict['doctypes_to_be_ignored'].grid.set_column_disp('no_of_docs', false);
frm.fields_dict['doctypes_to_be_ignored'].grid.set_column_disp('done', false);
frm.refresh_field('doctypes_to_be_ignored');
frm.get_field('doctypes').grid.cannot_add_rows = true;
frm.fields_dict['doctypes'].grid.set_column_disp('no_of_docs', true);
frm.refresh_field('doctypes');
},
refresh: function (frm) {

View File

@@ -15,6 +15,7 @@
"reset_company_default_values",
"clear_notifications",
"delete_transactions",
"initialize_doctypes_table",
"section_break_tbej",
"doctypes",
"doctypes_to_be_ignored",
@@ -110,12 +111,19 @@
"label": "Delete Transactions",
"no_copy": 1,
"read_only": 1
},
{
"default": "0",
"fieldname": "initialize_doctypes_table",
"fieldtype": "Check",
"label": "Initialize Summary Table",
"read_only": 1
}
],
"index_web_pages_for_search": 1,
"is_submittable": 1,
"links": [],
"modified": "2024-02-03 14:40:40.207482",
"modified": "2024-02-03 20:48:34.107577",
"modified_by": "Administrator",
"module": "Setup",
"name": "Transaction Deletion Record",

View File

@@ -30,6 +30,7 @@ class TransactionDeletionRecord(Document):
delete_transactions: DF.Check
doctypes: DF.Table[TransactionDeletionRecordItem]
doctypes_to_be_ignored: DF.Table[TransactionDeletionRecordItem]
initialize_doctypes_table: DF.Check
reset_company_default_values: DF.Check
status: DF.Literal["Queued", "Running", "Failed", "Completed", "Cancelled"]
# end: auto-generated types
@@ -79,8 +80,10 @@ class TransactionDeletionRecord(Document):
self.delete_bins()
self.delete_lead_addresses()
self.reset_company_values()
clear_notifications()
self.db_set("clear_notifications", 1)
if not self.clear_notifications:
clear_notifications()
self.db_set("clear_notifications", 1)
self.initialize_doctypes_to_be_deleted_table()
self.delete_company_transactions()
def populate_doctypes_to_be_ignored_table(self):
@@ -89,92 +92,108 @@ class TransactionDeletionRecord(Document):
self.append("doctypes_to_be_ignored", {"doctype_name": doctype})
def delete_bins(self):
frappe.db.sql(
"""delete from `tabBin` where warehouse in
(select name from tabWarehouse where company=%s)""",
self.company,
)
self.db_set("delete_bin_data", 1)
if not self.delete_bin_data:
frappe.db.sql(
"""delete from `tabBin` where warehouse in
(select name from tabWarehouse where company=%s)""",
self.company,
)
self.db_set("delete_bin_data", 1)
def delete_lead_addresses(self):
"""Delete addresses to which leads are linked"""
leads = frappe.get_all("Lead", filters={"company": self.company})
leads = ["'%s'" % row.get("name") for row in leads]
addresses = []
if leads:
addresses = frappe.db.sql_list(
"""select parent from `tabDynamic Link` where link_name
in ({leads})""".format(
leads=",".join(leads)
)
)
if addresses:
addresses = ["%s" % frappe.db.escape(addr) for addr in addresses]
frappe.db.sql(
"""delete from `tabAddress` where name in ({addresses}) and
name not in (select distinct dl1.parent from `tabDynamic Link` dl1
inner join `tabDynamic Link` dl2 on dl1.parent=dl2.parent
and dl1.link_doctype<>dl2.link_doctype)""".format(
addresses=",".join(addresses)
)
)
frappe.db.sql(
"""delete from `tabDynamic Link` where link_doctype='Lead'
and parenttype='Address' and link_name in ({leads})""".format(
if not self.delete_leads_and_addresses:
leads = frappe.get_all("Lead", filters={"company": self.company})
leads = ["'%s'" % row.get("name") for row in leads]
addresses = []
if leads:
addresses = frappe.db.sql_list(
"""select parent from `tabDynamic Link` where link_name
in ({leads})""".format(
leads=",".join(leads)
)
)
frappe.db.sql(
"""update `tabCustomer` set lead_name=NULL where lead_name in ({leads})""".format(
leads=",".join(leads)
if addresses:
addresses = ["%s" % frappe.db.escape(addr) for addr in addresses]
frappe.db.sql(
"""delete from `tabAddress` where name in ({addresses}) and
name not in (select distinct dl1.parent from `tabDynamic Link` dl1
inner join `tabDynamic Link` dl2 on dl1.parent=dl2.parent
and dl1.link_doctype<>dl2.link_doctype)""".format(
addresses=",".join(addresses)
)
)
frappe.db.sql(
"""delete from `tabDynamic Link` where link_doctype='Lead'
and parenttype='Address' and link_name in ({leads})""".format(
leads=",".join(leads)
)
)
frappe.db.sql(
"""update `tabCustomer` set lead_name=NULL where lead_name in ({leads})""".format(
leads=",".join(leads)
)
)
)
self.db_set("delete_leads_and_addresses", 1)
self.db_set("delete_leads_and_addresses", 1)
def reset_company_values(self):
company_obj = frappe.get_doc("Company", self.company)
company_obj.total_monthly_sales = 0
company_obj.sales_monthly_history = None
company_obj.save()
self.db_set("reset_company_default_values", 1)
if not self.reset_company_default_values:
company_obj = frappe.get_doc("Company", self.company)
company_obj.total_monthly_sales = 0
company_obj.sales_monthly_history = None
company_obj.save()
self.db_set("reset_company_default_values", 1)
def initialize_doctypes_to_be_deleted_table(self):
if not self.initialize_doctypes_table:
doctypes_to_be_ignored_list = self.get_doctypes_to_be_ignored_list()
docfields = self.get_doctypes_with_company_field(doctypes_to_be_ignored_list)
tables = self.get_all_child_doctypes()
for docfield in docfields:
if docfield["parent"] != self.doctype:
no_of_docs = self.get_number_of_docs_linked_with_specified_company(
docfield["parent"], docfield["fieldname"]
)
if no_of_docs > 0:
# Initialize
self.populate_doctypes_table(tables, docfield["parent"], docfield["fieldname"], 0)
self.db_set("initialize_doctypes_table", 1)
def delete_company_transactions(self):
doctypes_to_be_ignored_list = self.get_doctypes_to_be_ignored_list()
docfields = self.get_doctypes_with_company_field(doctypes_to_be_ignored_list)
if not self.delete_transactions:
doctypes_to_be_ignored_list = self.get_doctypes_to_be_ignored_list()
docfields = self.get_doctypes_with_company_field(doctypes_to_be_ignored_list)
tables = self.get_all_child_doctypes()
for docfield in docfields:
if docfield["parent"] != self.doctype:
no_of_docs = self.get_number_of_docs_linked_with_specified_company(
docfield["parent"], docfield["fieldname"]
)
if no_of_docs > 0:
self.delete_version_log(docfield["parent"], docfield["fieldname"])
reference_docs = frappe.get_all(
docfield["parent"], filters={docfield["fieldname"]: self.company}
tables = self.get_all_child_doctypes()
for docfield in self.doctypes:
if docfield.doctype_name != self.doctype:
no_of_docs = self.get_number_of_docs_linked_with_specified_company(
docfield.doctype_name, docfield.docfield_name
)
reference_doc_names = [r.name for r in reference_docs]
if no_of_docs > 0:
reference_docs = frappe.get_all(
docfield.doctype_name, filters={docfield.docfield_name: self.company}, limit=self.batch_size
)
reference_doc_names = [r.name for r in reference_docs]
self.delete_communications(docfield["parent"], reference_doc_names)
self.delete_comments(docfield["parent"], reference_doc_names)
self.unlink_attachments(docfield["parent"], reference_doc_names)
self.delete_version_log(docfield.doctype_name, reference_doc_names)
self.delete_communications(docfield.doctype_name, reference_doc_names)
self.delete_comments(docfield.doctype_name, reference_doc_names)
self.unlink_attachments(docfield.doctype_name, reference_doc_names)
self.populate_doctypes_table(tables, docfield["parent"], no_of_docs)
self.delete_child_tables(docfield.doctype_name, reference_doc_names)
self.delete_docs_linked_with_specified_company(docfield.doctype_name, docfield.docfield_name)
self.delete_child_tables(docfield["parent"], docfield["fieldname"])
self.delete_docs_linked_with_specified_company(docfield["parent"], docfield["fieldname"])
naming_series = frappe.db.get_value("DocType", docfield["parent"], "autoname")
if naming_series:
if "#" in naming_series:
self.update_naming_series(naming_series, docfield["parent"])
self.db_set("delete_transactions", 1)
naming_series = frappe.db.get_value("DocType", docfield.doctype_name, "autoname")
# TODO: do this at the end of each doctype
if naming_series:
if "#" in naming_series:
self.update_naming_series(naming_series, docfield.doctype_name)
self.db_set("delete_transactions", 1)
def get_doctypes_to_be_ignored_list(self):
singles = frappe.get_all("DocType", filters={"issingle": 1}, pluck="name")
@@ -203,22 +222,21 @@ class TransactionDeletionRecord(Document):
def get_number_of_docs_linked_with_specified_company(self, doctype, company_fieldname):
return frappe.db.count(doctype, {company_fieldname: self.company})
def populate_doctypes_table(self, tables, doctype, no_of_docs):
def populate_doctypes_table(self, tables, doctype, fieldname, no_of_docs):
self.flags.ignore_validate_update_after_submit = True
if doctype not in tables:
self.append("doctypes", {"doctype_name": doctype, "no_of_docs": no_of_docs})
def delete_child_tables(self, doctype, company_fieldname):
parent_docs_to_be_deleted = frappe.get_all(
doctype, {company_fieldname: self.company}, pluck="name"
)
self.append(
"doctypes", {"doctype_name": doctype, "docfield_name": fieldname, "no_of_docs": no_of_docs}
)
self.save(ignore_permissions=True)
def delete_child_tables(self, doctype, reference_doc_names):
child_tables = frappe.get_all(
"DocField", filters={"fieldtype": "Table", "parent": doctype}, pluck="options"
)
for batch in create_batch(parent_docs_to_be_deleted, self.batch_size):
for table in child_tables:
frappe.db.delete(table, {"parent": ["in", batch]})
for table in child_tables:
frappe.db.delete(table, {"parent": ["in", reference_doc_names]})
def delete_docs_linked_with_specified_company(self, doctype, company_fieldname):
frappe.db.delete(doctype, {company_fieldname: self.company})
@@ -242,17 +260,11 @@ class TransactionDeletionRecord(Document):
frappe.db.sql("""update `tabSeries` set current = %s where name=%s""", (last, prefix))
def delete_version_log(self, doctype, company_fieldname):
dt = qb.DocType(doctype)
names = qb.from_(dt).select(dt.name).where(dt[company_fieldname] == self.company).run(as_list=1)
names = [x[0] for x in names]
if names:
versions = qb.DocType("Version")
for batch in create_batch(names, self.batch_size):
qb.from_(versions).delete().where(
(versions.ref_doctype == doctype) & (versions.docname.isin(batch))
).run()
def delete_version_log(self, doctype, docnames):
versions = qb.DocType("Version")
qb.from_(versions).delete().where(
(versions.ref_doctype == doctype) & (versions.docname.isin(docnames))
).run()
def delete_communications(self, doctype, reference_doc_names):
communications = frappe.get_all(

View File

@@ -6,7 +6,9 @@
"engine": "InnoDB",
"field_order": [
"doctype_name",
"no_of_docs"
"docfield_name",
"no_of_docs",
"done"
],
"fields": [
{
@@ -22,12 +24,24 @@
"fieldtype": "Data",
"in_list_view": 1,
"label": "Number of Docs"
},
{
"default": "0",
"fieldname": "done",
"fieldtype": "Check",
"in_list_view": 1,
"label": "Done"
},
{
"fieldname": "docfield_name",
"fieldtype": "Data",
"label": "DocField Name"
}
],
"index_web_pages_for_search": 1,
"istable": 1,
"links": [],
"modified": "2021-05-08 23:10:46.166744",
"modified": "2024-02-03 21:06:32.274445",
"modified_by": "Administrator",
"module": "Setup",
"name": "Transaction Deletion Record Item",
@@ -35,5 +49,6 @@
"permissions": [],
"sort_field": "modified",
"sort_order": "DESC",
"states": [],
"track_changes": 1
}

View File

@@ -15,7 +15,9 @@ class TransactionDeletionRecordItem(Document):
if TYPE_CHECKING:
from frappe.types import DF
docfield_name: DF.Data | None
doctype_name: DF.Link
done: DF.Check
no_of_docs: DF.Data | None
parent: DF.Data
parentfield: DF.Data