style: format code with black

This commit is contained in:
Ankush Menat
2022-03-28 18:52:46 +05:30
parent 21e00da3d6
commit 494bd9ef78
1395 changed files with 91704 additions and 62532 deletions

View File

@@ -12,17 +12,17 @@ from frappe.utils.verified_command import get_signed_params
class Appointment(Document):
def find_lead_by_email(self):
lead_list = frappe.get_list(
'Lead', filters={'email_id': self.customer_email}, ignore_permissions=True)
"Lead", filters={"email_id": self.customer_email}, ignore_permissions=True
)
if lead_list:
return lead_list[0].name
return None
def find_customer_by_email(self):
customer_list = frappe.get_list(
'Customer', filters={'email_id': self.customer_email}, ignore_permissions=True
"Customer", filters={"email_id": self.customer_email}, ignore_permissions=True
)
if customer_list:
return customer_list[0].name
@@ -30,11 +30,12 @@ class Appointment(Document):
def before_insert(self):
number_of_appointments_in_same_slot = frappe.db.count(
'Appointment', filters={'scheduled_time': self.scheduled_time})
number_of_agents = frappe.db.get_single_value('Appointment Booking Settings', 'number_of_agents')
"Appointment", filters={"scheduled_time": self.scheduled_time}
)
number_of_agents = frappe.db.get_single_value("Appointment Booking Settings", "number_of_agents")
if not number_of_agents == 0:
if (number_of_appointments_in_same_slot >= number_of_agents):
frappe.throw(_('Time slot is not available'))
if number_of_appointments_in_same_slot >= number_of_agents:
frappe.throw(_("Time slot is not available"))
# Link lead
if not self.party:
lead = self.find_lead_by_email()
@@ -53,45 +54,46 @@ class Appointment(Document):
self.create_calendar_event()
else:
# Set status to unverified
self.status = 'Unverified'
self.status = "Unverified"
# Send email to confirm
self.send_confirmation_email()
def send_confirmation_email(self):
verify_url = self._get_verify_url()
template = 'confirm_appointment'
template = "confirm_appointment"
args = {
"link":verify_url,
"site_url":frappe.utils.get_url(),
"full_name":self.customer_name,
"link": verify_url,
"site_url": frappe.utils.get_url(),
"full_name": self.customer_name,
}
frappe.sendmail(recipients=[self.customer_email],
template=template,
args=args,
subject=_('Appointment Confirmation'))
frappe.sendmail(
recipients=[self.customer_email],
template=template,
args=args,
subject=_("Appointment Confirmation"),
)
if frappe.session.user == "Guest":
frappe.msgprint(_("Please check your email to confirm the appointment"))
else:
frappe.msgprint(
_('Please check your email to confirm the appointment'))
else :
frappe.msgprint(
_('Appointment was created. But no lead was found. Please check the email to confirm'))
_("Appointment was created. But no lead was found. Please check the email to confirm")
)
def on_change(self):
# Sync Calendar
if not self.calendar_event:
return
cal_event = frappe.get_doc('Event', self.calendar_event)
cal_event = frappe.get_doc("Event", self.calendar_event)
cal_event.starts_on = self.scheduled_time
cal_event.save(ignore_permissions=True)
def set_verified(self, email):
if not email == self.customer_email:
frappe.throw(_('Email verification failed.'))
frappe.throw(_("Email verification failed."))
# Create new lead
self.create_lead_and_link()
# Remove unverified status
self.status = 'Open'
self.status = "Open"
# Create calender event
self.auto_assign()
self.create_calendar_event()
@@ -102,58 +104,53 @@ class Appointment(Document):
# Return if already linked
if self.party:
return
lead = frappe.get_doc({
'doctype': 'Lead',
'lead_name': self.customer_name,
'email_id': self.customer_email,
'notes': self.customer_details,
'phone': self.customer_phone_number,
})
lead = frappe.get_doc(
{
"doctype": "Lead",
"lead_name": self.customer_name,
"email_id": self.customer_email,
"notes": self.customer_details,
"phone": self.customer_phone_number,
}
)
lead.insert(ignore_permissions=True)
# Link lead
self.party = lead.name
def auto_assign(self):
from frappe.desk.form.assign_to import add as add_assignemnt
existing_assignee = self.get_assignee_from_latest_opportunity()
if existing_assignee:
# If the latest opportunity is assigned to someone
# Assign the appointment to the same
add_assignemnt({
'doctype': self.doctype,
'name': self.name,
'assign_to': [existing_assignee]
})
add_assignemnt({"doctype": self.doctype, "name": self.name, "assign_to": [existing_assignee]})
return
if self._assign:
return
available_agents = _get_agents_sorted_by_asc_workload(
getdate(self.scheduled_time))
available_agents = _get_agents_sorted_by_asc_workload(getdate(self.scheduled_time))
for agent in available_agents:
if(_check_agent_availability(agent, self.scheduled_time)):
if _check_agent_availability(agent, self.scheduled_time):
agent = agent[0]
add_assignemnt({
'doctype': self.doctype,
'name': self.name,
'assign_to': [agent]
})
add_assignemnt({"doctype": self.doctype, "name": self.name, "assign_to": [agent]})
break
def get_assignee_from_latest_opportunity(self):
if not self.party:
return None
if not frappe.db.exists('Lead', self.party):
if not frappe.db.exists("Lead", self.party):
return None
opporutnities = frappe.get_list(
'Opportunity',
"Opportunity",
filters={
'party_name': self.party,
"party_name": self.party,
},
ignore_permissions=True,
order_by='creation desc')
order_by="creation desc",
)
if not opporutnities:
return None
latest_opportunity = frappe.get_doc('Opportunity', opporutnities[0].name )
latest_opportunity = frappe.get_doc("Opportunity", opporutnities[0].name)
assignee = latest_opportunity._assign
if not assignee:
return None
@@ -163,35 +160,36 @@ class Appointment(Document):
def create_calendar_event(self):
if self.calendar_event:
return
appointment_event = frappe.get_doc({
'doctype': 'Event',
'subject': ' '.join(['Appointment with', self.customer_name]),
'starts_on': self.scheduled_time,
'status': 'Open',
'type': 'Public',
'send_reminder': frappe.db.get_single_value('Appointment Booking Settings', 'email_reminders'),
'event_participants': [dict(reference_doctype=self.appointment_with, reference_docname=self.party)]
})
appointment_event = frappe.get_doc(
{
"doctype": "Event",
"subject": " ".join(["Appointment with", self.customer_name]),
"starts_on": self.scheduled_time,
"status": "Open",
"type": "Public",
"send_reminder": frappe.db.get_single_value("Appointment Booking Settings", "email_reminders"),
"event_participants": [
dict(reference_doctype=self.appointment_with, reference_docname=self.party)
],
}
)
employee = _get_employee_from_user(self._assign)
if employee:
appointment_event.append('event_participants', dict(
reference_doctype='Employee',
reference_docname=employee.name))
appointment_event.append(
"event_participants", dict(reference_doctype="Employee", reference_docname=employee.name)
)
appointment_event.insert(ignore_permissions=True)
self.calendar_event = appointment_event.name
self.save(ignore_permissions=True)
def _get_verify_url(self):
verify_route = '/book_appointment/verify'
params = {
'email': self.customer_email,
'appointment': self.name
}
return get_url(verify_route + '?' + get_signed_params(params))
verify_route = "/book_appointment/verify"
params = {"email": self.customer_email, "appointment": self.name}
return get_url(verify_route + "?" + get_signed_params(params))
def _get_agents_sorted_by_asc_workload(date):
appointments = frappe.db.get_list('Appointment', fields='*')
appointments = frappe.db.get_list("Appointment", fields="*")
agent_list = _get_agent_list_as_strings()
if not appointments:
return agent_list
@@ -209,7 +207,7 @@ def _get_agents_sorted_by_asc_workload(date):
def _get_agent_list_as_strings():
agent_list_as_strings = []
agent_list = frappe.get_doc('Appointment Booking Settings').agent_list
agent_list = frappe.get_doc("Appointment Booking Settings").agent_list
for agent in agent_list:
agent_list_as_strings.append(agent.user)
return agent_list_as_strings
@@ -217,7 +215,8 @@ def _get_agent_list_as_strings():
def _check_agent_availability(agent_email, scheduled_time):
appointemnts_at_scheduled_time = frappe.get_list(
'Appointment', filters={'scheduled_time': scheduled_time})
"Appointment", filters={"scheduled_time": scheduled_time}
)
for appointment in appointemnts_at_scheduled_time:
if appointment._assign == agent_email:
return False
@@ -225,7 +224,7 @@ def _check_agent_availability(agent_email, scheduled_time):
def _get_employee_from_user(user):
employee_docname = frappe.db.get_value('Employee', {'user_id': user})
employee_docname = frappe.db.get_value("Employee", {"user_id": user})
if employee_docname:
return frappe.get_doc('Employee', employee_docname)
return frappe.get_doc("Employee", employee_docname)
return None

View File

@@ -10,8 +10,8 @@ from frappe.model.document import Document
class AppointmentBookingSettings(Document):
agent_list = [] #Hack
min_date = '01/01/1970 '
agent_list = [] # Hack
min_date = "01/01/1970 "
format_string = "%d/%m/%Y %H:%M:%S"
def validate(self):
@@ -23,21 +23,22 @@ class AppointmentBookingSettings(Document):
def validate_availability_of_slots(self):
for record in self.availability_of_slots:
from_time = datetime.datetime.strptime(
self.min_date+record.from_time, self.format_string)
to_time = datetime.datetime.strptime(
self.min_date+record.to_time, self.format_string)
timedelta = to_time-from_time
from_time = datetime.datetime.strptime(self.min_date + record.from_time, self.format_string)
to_time = datetime.datetime.strptime(self.min_date + record.to_time, self.format_string)
timedelta = to_time - from_time
self.validate_from_and_to_time(from_time, to_time, record)
self.duration_is_divisible(from_time, to_time)
def validate_from_and_to_time(self, from_time, to_time, record):
if from_time > to_time:
err_msg = _('<b>From Time</b> cannot be later than <b>To Time</b> for {0}').format(record.day_of_week)
err_msg = _("<b>From Time</b> cannot be later than <b>To Time</b> for {0}").format(
record.day_of_week
)
frappe.throw(_(err_msg))
def duration_is_divisible(self, from_time, to_time):
timedelta = to_time - from_time
if timedelta.total_seconds() % (self.appointment_duration * 60):
frappe.throw(
_('The difference between from time and To Time must be a multiple of Appointment'))
_("The difference between from time and To Time must be a multiple of Appointment")
)

View File

@@ -8,7 +8,7 @@ from frappe.model.naming import set_name_by_naming_series
class Campaign(Document):
def autoname(self):
if frappe.defaults.get_global_default('campaign_naming_by') != 'Naming Series':
if frappe.defaults.get_global_default("campaign_naming_by") != "Naming Series":
self.name = self.campaign_name
else:
set_name_by_naming_series(self)

View File

@@ -75,11 +75,11 @@ def get_status(start_date, end_date):
Get a Contract's status based on the start, current and end dates
Args:
start_date (str): The start date of the contract
end_date (str): The end date of the contract
start_date (str): The start date of the contract
end_date (str): The end date of the contract
Returns:
str: 'Active' if within range, otherwise 'Inactive'
str: 'Active' if within range, otherwise 'Inactive'
"""
if not end_date:
@@ -98,13 +98,13 @@ def update_status_for_contracts():
and submitted Contracts
"""
contracts = frappe.get_all("Contract",
filters={"is_signed": True,
"docstatus": 1},
fields=["name", "start_date", "end_date"])
contracts = frappe.get_all(
"Contract",
filters={"is_signed": True, "docstatus": 1},
fields=["name", "start_date", "end_date"],
)
for contract in contracts:
status = get_status(contract.get("start_date"),
contract.get("end_date"))
status = get_status(contract.get("start_date"), contract.get("end_date"))
frappe.db.set_value("Contract", contract.get("name"), "status", status)

View File

@@ -8,7 +8,6 @@ from frappe.utils import add_days, nowdate
class TestContract(unittest.TestCase):
def setUp(self):
frappe.db.sql("delete from `tabContract`")
self.contract_doc = get_contract()
@@ -65,10 +64,7 @@ class TestContract(unittest.TestCase):
# Mark all the terms as fulfilled
self.contract_doc.requires_fulfilment = 1
fulfilment_terms = []
fulfilment_terms.append({
"requirement": "This is a test requirement.",
"fulfilled": 0
})
fulfilment_terms.append({"requirement": "This is a test requirement.", "fulfilled": 0})
self.contract_doc.set("fulfilment_terms", fulfilment_terms)
for term in self.contract_doc.fulfilment_terms:
@@ -85,14 +81,8 @@ class TestContract(unittest.TestCase):
# Mark only the first term as fulfilled
self.contract_doc.save()
fulfilment_terms = []
fulfilment_terms.append({
"requirement": "This is a test requirement.",
"fulfilled": 0
})
fulfilment_terms.append({
"requirement": "This is another test requirement.",
"fulfilled": 0
})
fulfilment_terms.append({"requirement": "This is a test requirement.", "fulfilled": 0})
fulfilment_terms.append({"requirement": "This is another test requirement.", "fulfilled": 0})
self.contract_doc.set("fulfilment_terms", fulfilment_terms)
self.contract_doc.fulfilment_terms[0].fulfilled = 1
@@ -110,6 +100,7 @@ class TestContract(unittest.TestCase):
self.assertEqual(self.contract_doc.fulfilment_status, "Lapsed")
def get_contract():
doc = frappe.new_doc("Contract")
doc.party_type = "Customer"

View File

@@ -14,6 +14,7 @@ class ContractTemplate(Document):
if self.contract_terms:
validate_template(self.contract_terms)
@frappe.whitelist()
def get_contract_template(template_name, doc):
if isinstance(doc, str):
@@ -25,7 +26,4 @@ def get_contract_template(template_name, doc):
if contract_template.contract_terms:
contract_terms = frappe.render_template(contract_template.contract_terms, doc)
return {
'contract_template': contract_template,
'contract_terms': contract_terms
}
return {"contract_template": contract_template, "contract_terms": contract_terms}

View File

@@ -12,7 +12,7 @@ from frappe.utils import add_days, getdate, today
class EmailCampaign(Document):
def validate(self):
self.set_date()
#checking if email is set for lead. Not checking for contact as email is a mandatory field for contact.
# checking if email is set for lead. Not checking for contact as email is a mandatory field for contact.
if self.email_campaign_for == "Lead":
self.validate_lead()
self.validate_email_campaign_already_exists()
@@ -21,7 +21,7 @@ class EmailCampaign(Document):
def set_date(self):
if getdate(self.start_date) < getdate(today()):
frappe.throw(_("Start Date cannot be before the current date"))
#set the end date as start date + max(send after days) in campaign schedule
# set the end date as start date + max(send after days) in campaign schedule
send_after_days = []
campaign = frappe.get_doc("Campaign", self.campaign_name)
for entry in campaign.get("campaign_schedules"):
@@ -29,23 +29,32 @@ class EmailCampaign(Document):
try:
self.end_date = add_days(getdate(self.start_date), max(send_after_days))
except ValueError:
frappe.throw(_("Please set up the Campaign Schedule in the Campaign {0}").format(self.campaign_name))
frappe.throw(
_("Please set up the Campaign Schedule in the Campaign {0}").format(self.campaign_name)
)
def validate_lead(self):
lead_email_id = frappe.db.get_value("Lead", self.recipient, 'email_id')
lead_email_id = frappe.db.get_value("Lead", self.recipient, "email_id")
if not lead_email_id:
lead_name = frappe.db.get_value("Lead", self.recipient, 'lead_name')
lead_name = frappe.db.get_value("Lead", self.recipient, "lead_name")
frappe.throw(_("Please set an email id for the Lead {0}").format(lead_name))
def validate_email_campaign_already_exists(self):
email_campaign_exists = frappe.db.exists("Email Campaign", {
"campaign_name": self.campaign_name,
"recipient": self.recipient,
"status": ("in", ["In Progress", "Scheduled"]),
"name": ("!=", self.name)
})
email_campaign_exists = frappe.db.exists(
"Email Campaign",
{
"campaign_name": self.campaign_name,
"recipient": self.recipient,
"status": ("in", ["In Progress", "Scheduled"]),
"name": ("!=", self.name),
},
)
if email_campaign_exists:
frappe.throw(_("The Campaign '{0}' already exists for the {1} '{2}'").format(self.campaign_name, self.email_campaign_for, self.recipient))
frappe.throw(
_("The Campaign '{0}' already exists for the {1} '{2}'").format(
self.campaign_name, self.email_campaign_for, self.recipient
)
)
def update_status(self):
start_date = getdate(self.start_date)
@@ -58,51 +67,63 @@ class EmailCampaign(Document):
elif end_date < today_date:
self.status = "Completed"
#called through hooks to send campaign mails to leads
# called through hooks to send campaign mails to leads
def send_email_to_leads_or_contacts():
email_campaigns = frappe.get_all("Email Campaign", filters = { 'status': ('not in', ['Unsubscribed', 'Completed', 'Scheduled']) })
email_campaigns = frappe.get_all(
"Email Campaign", filters={"status": ("not in", ["Unsubscribed", "Completed", "Scheduled"])}
)
for camp in email_campaigns:
email_campaign = frappe.get_doc("Email Campaign", camp.name)
campaign = frappe.get_cached_doc("Campaign", email_campaign.campaign_name)
for entry in campaign.get("campaign_schedules"):
scheduled_date = add_days(email_campaign.get('start_date'), entry.get('send_after_days'))
scheduled_date = add_days(email_campaign.get("start_date"), entry.get("send_after_days"))
if scheduled_date == getdate(today()):
send_mail(entry, email_campaign)
def send_mail(entry, email_campaign):
recipient_list = []
if email_campaign.email_campaign_for == "Email Group":
for member in frappe.db.get_list("Email Group Member", filters={"email_group": email_campaign.get("recipient")}, fields=["email"]):
recipient_list.append(member['email'])
for member in frappe.db.get_list(
"Email Group Member", filters={"email_group": email_campaign.get("recipient")}, fields=["email"]
):
recipient_list.append(member["email"])
else:
recipient_list.append(frappe.db.get_value(email_campaign.email_campaign_for, email_campaign.get("recipient"), "email_id"))
recipient_list.append(
frappe.db.get_value(
email_campaign.email_campaign_for, email_campaign.get("recipient"), "email_id"
)
)
email_template = frappe.get_doc("Email Template", entry.get("email_template"))
sender = frappe.db.get_value("User", email_campaign.get("sender"), "email")
context = {"doc": frappe.get_doc(email_campaign.email_campaign_for, email_campaign.recipient)}
# send mail and link communication to document
comm = make(
doctype = "Email Campaign",
name = email_campaign.name,
subject = frappe.render_template(email_template.get("subject"), context),
content = frappe.render_template(email_template.get("response"), context),
sender = sender,
recipients = recipient_list,
communication_medium = "Email",
sent_or_received = "Sent",
send_email = True,
email_template = email_template.name
doctype="Email Campaign",
name=email_campaign.name,
subject=frappe.render_template(email_template.get("subject"), context),
content=frappe.render_template(email_template.get("response"), context),
sender=sender,
recipients=recipient_list,
communication_medium="Email",
sent_or_received="Sent",
send_email=True,
email_template=email_template.name,
)
return comm
#called from hooks on doc_event Email Unsubscribe
# called from hooks on doc_event Email Unsubscribe
def unsubscribe_recipient(unsubscribe, method):
if unsubscribe.reference_doctype == 'Email Campaign':
if unsubscribe.reference_doctype == "Email Campaign":
frappe.db.set_value("Email Campaign", unsubscribe.reference_name, "status", "Unsubscribed")
#called through hooks to update email campaign status daily
# called through hooks to update email campaign status daily
def set_email_campaign_status():
email_campaigns = frappe.get_all("Email Campaign", filters = { 'status': ('!=', 'Unsubscribed')})
email_campaigns = frappe.get_all("Email Campaign", filters={"status": ("!=", "Unsubscribed")})
for entry in email_campaigns:
email_campaign = frappe.get_doc("Email Campaign", entry.name)
email_campaign.update_status()

View File

@@ -23,7 +23,7 @@ from erpnext.controllers.selling_controller import SellingController
class Lead(SellingController):
def get_feed(self):
return '{0}: {1}'.format(_(self.status), self.lead_name)
return "{0}: {1}".format(_(self.status), self.lead_name)
def onload(self):
customer = frappe.db.get_value("Customer", {"lead_name": self.name})
@@ -62,8 +62,7 @@ class Lead(SellingController):
if self.contact_date and getdate(self.contact_date) < getdate(nowdate()):
frappe.throw(_("Next Contact Date cannot be in the past"))
if (self.ends_on and self.contact_date and
(getdate(self.ends_on) < getdate(self.contact_date))):
if self.ends_on and self.contact_date and (getdate(self.ends_on) < getdate(self.contact_date)):
frappe.throw(_("Ends On date cannot be before Next Contact Date."))
def on_update(self):
@@ -72,13 +71,11 @@ class Lead(SellingController):
def set_prev(self):
if self.is_new():
self._prev = frappe._dict({
"contact_date": None,
"ends_on": None,
"contact_by": None
})
self._prev = frappe._dict({"contact_date": None, "ends_on": None, "contact_by": None})
else:
self._prev = frappe.db.get_value("Lead", self.name, ["contact_date", "ends_on", "contact_by"], as_dict=1)
self._prev = frappe.db.get_value(
"Lead", self.name, ["contact_date", "ends_on", "contact_by"], as_dict=1
)
def before_insert(self):
self.contact_doc = self.create_contact()
@@ -89,39 +86,47 @@ class Lead(SellingController):
def update_links(self):
# update contact links
if self.contact_doc:
self.contact_doc.append("links", {
"link_doctype": "Lead",
"link_name": self.name,
"link_title": self.lead_name
})
self.contact_doc.append(
"links", {"link_doctype": "Lead", "link_name": self.name, "link_title": self.lead_name}
)
self.contact_doc.save()
def add_calendar_event(self, opts=None, force=False):
if frappe.db.get_single_value('CRM Settings', 'create_event_on_next_contact_date'):
super(Lead, self).add_calendar_event({
"owner": self.lead_owner,
"starts_on": self.contact_date,
"ends_on": self.ends_on or "",
"subject": ('Contact ' + cstr(self.lead_name)),
"description": ('Contact ' + cstr(self.lead_name)) + (self.contact_by and ('. By : ' + cstr(self.contact_by)) or '')
}, force)
if frappe.db.get_single_value("CRM Settings", "create_event_on_next_contact_date"):
super(Lead, self).add_calendar_event(
{
"owner": self.lead_owner,
"starts_on": self.contact_date,
"ends_on": self.ends_on or "",
"subject": ("Contact " + cstr(self.lead_name)),
"description": ("Contact " + cstr(self.lead_name))
+ (self.contact_by and (". By : " + cstr(self.contact_by)) or ""),
},
force,
)
def update_prospects(self):
prospects = frappe.get_all('Prospect Lead', filters={'lead': self.name}, fields=['parent'])
prospects = frappe.get_all("Prospect Lead", filters={"lead": self.name}, fields=["parent"])
for row in prospects:
prospect = frappe.get_doc('Prospect', row.parent)
prospect = frappe.get_doc("Prospect", row.parent)
prospect.save(ignore_permissions=True)
def check_email_id_is_unique(self):
if self.email_id:
# validate email is unique
if not frappe.db.get_single_value('CRM Settings', 'allow_lead_duplication_based_on_emails'):
duplicate_leads = frappe.get_all("Lead", filters={"email_id": self.email_id, "name": ["!=", self.name]})
duplicate_leads = [frappe.bold(get_link_to_form('Lead', lead.name)) for lead in duplicate_leads]
if not frappe.db.get_single_value("CRM Settings", "allow_lead_duplication_based_on_emails"):
duplicate_leads = frappe.get_all(
"Lead", filters={"email_id": self.email_id, "name": ["!=", self.name]}
)
duplicate_leads = [
frappe.bold(get_link_to_form("Lead", lead.name)) for lead in duplicate_leads
]
if duplicate_leads:
frappe.throw(_("Email Address must be unique, already exists for {0}")
.format(comma_and(duplicate_leads)), frappe.DuplicateEntryError)
frappe.throw(
_("Email Address must be unique, already exists for {0}").format(comma_and(duplicate_leads)),
frappe.DuplicateEntryError,
)
def on_trash(self):
frappe.db.sql("""update `tabIssue` set lead='' where lead=%s""", self.name)
@@ -130,16 +135,20 @@ class Lead(SellingController):
self.delete_events()
def unlink_dynamic_links(self):
links = frappe.get_all('Dynamic Link', filters={'link_doctype': self.doctype, 'link_name': self.name}, fields=['parent', 'parenttype'])
links = frappe.get_all(
"Dynamic Link",
filters={"link_doctype": self.doctype, "link_name": self.name},
fields=["parent", "parenttype"],
)
for link in links:
linked_doc = frappe.get_doc(link['parenttype'], link['parent'])
linked_doc = frappe.get_doc(link["parenttype"], link["parent"])
if len(linked_doc.get('links')) == 1:
if len(linked_doc.get("links")) == 1:
linked_doc.delete(ignore_permissions=True)
else:
to_remove = None
for d in linked_doc.get('links'):
for d in linked_doc.get("links"):
if d.link_doctype == self.doctype and d.link_name == self.name:
to_remove = d
if to_remove:
@@ -153,18 +162,14 @@ class Lead(SellingController):
return frappe.db.get_value("Opportunity", {"party_name": self.name, "status": ["!=", "Lost"]})
def has_quotation(self):
return frappe.db.get_value("Quotation", {
"party_name": self.name,
"docstatus": 1,
"status": ["!=", "Lost"]
})
return frappe.db.get_value(
"Quotation", {"party_name": self.name, "docstatus": 1, "status": ["!=", "Lost"]}
)
def has_lost_quotation(self):
return frappe.db.get_value("Quotation", {
"party_name": self.name,
"docstatus": 1,
"status": "Lost"
})
return frappe.db.get_value(
"Quotation", {"party_name": self.name, "docstatus": 1, "status": "Lost"}
)
def set_lead_name(self):
if not self.lead_name:
@@ -180,44 +185,38 @@ class Lead(SellingController):
self.title = self.company_name or self.lead_name
def create_contact(self):
if frappe.db.get_single_value('CRM Settings', 'auto_creation_of_contact'):
if frappe.db.get_single_value("CRM Settings", "auto_creation_of_contact"):
if not self.lead_name:
self.set_full_name()
self.set_lead_name()
contact = frappe.new_doc("Contact")
contact.update({
"first_name": self.first_name or self.lead_name,
"last_name": self.last_name,
"salutation": self.salutation,
"gender": self.gender,
"designation": self.designation,
"company_name": self.company_name,
})
contact.update(
{
"first_name": self.first_name or self.lead_name,
"last_name": self.last_name,
"salutation": self.salutation,
"gender": self.gender,
"designation": self.designation,
"company_name": self.company_name,
}
)
if self.email_id:
contact.append("email_ids", {
"email_id": self.email_id,
"is_primary": 1
})
contact.append("email_ids", {"email_id": self.email_id, "is_primary": 1})
if self.phone:
contact.append("phone_nos", {
"phone": self.phone,
"is_primary_phone": 1
})
contact.append("phone_nos", {"phone": self.phone, "is_primary_phone": 1})
if self.mobile_no:
contact.append("phone_nos", {
"phone": self.mobile_no,
"is_primary_mobile_no":1
})
contact.append("phone_nos", {"phone": self.mobile_no, "is_primary_mobile_no": 1})
contact.insert(ignore_permissions=True)
contact.reload() # load changes by hooks on contact
contact.reload() # load changes by hooks on contact
return contact
@frappe.whitelist()
def make_customer(source_name, target_doc=None):
return _make_customer(source_name, target_doc)
@@ -234,16 +233,24 @@ def _make_customer(source_name, target_doc=None, ignore_permissions=False):
target.customer_group = frappe.db.get_default("Customer Group")
doclist = get_mapped_doc("Lead", source_name,
{"Lead": {
"doctype": "Customer",
"field_map": {
"name": "lead_name",
"company_name": "customer_name",
"contact_no": "phone_1",
"fax": "fax_1"
doclist = get_mapped_doc(
"Lead",
source_name,
{
"Lead": {
"doctype": "Customer",
"field_map": {
"name": "lead_name",
"company_name": "customer_name",
"contact_no": "phone_1",
"fax": "fax_1",
},
}
}}, target_doc, set_missing_values, ignore_permissions=ignore_permissions)
},
target_doc,
set_missing_values,
ignore_permissions=ignore_permissions,
)
return doclist
@@ -253,19 +260,26 @@ def make_opportunity(source_name, target_doc=None):
def set_missing_values(source, target):
_set_missing_values(source, target)
target_doc = get_mapped_doc("Lead", source_name,
{"Lead": {
"doctype": "Opportunity",
"field_map": {
"campaign_name": "campaign",
"doctype": "opportunity_from",
"name": "party_name",
"lead_name": "contact_display",
"company_name": "customer_name",
"email_id": "contact_email",
"mobile_no": "contact_mobile"
target_doc = get_mapped_doc(
"Lead",
source_name,
{
"Lead": {
"doctype": "Opportunity",
"field_map": {
"campaign_name": "campaign",
"doctype": "opportunity_from",
"name": "party_name",
"lead_name": "contact_display",
"company_name": "customer_name",
"email_id": "contact_email",
"mobile_no": "contact_mobile",
},
}
}}, target_doc, set_missing_values)
},
target_doc,
set_missing_values,
)
return target_doc
@@ -275,13 +289,13 @@ def make_quotation(source_name, target_doc=None):
def set_missing_values(source, target):
_set_missing_values(source, target)
target_doc = get_mapped_doc("Lead", source_name,
{"Lead": {
"doctype": "Quotation",
"field_map": {
"name": "party_name"
}
}}, target_doc, set_missing_values)
target_doc = get_mapped_doc(
"Lead",
source_name,
{"Lead": {"doctype": "Quotation", "field_map": {"name": "party_name"}}},
target_doc,
set_missing_values,
)
target_doc.quotation_to = "Lead"
target_doc.run_method("set_missing_values")
@@ -290,18 +304,29 @@ def make_quotation(source_name, target_doc=None):
return target_doc
def _set_missing_values(source, target):
address = frappe.get_all('Dynamic Link', {
'link_doctype': source.doctype,
'link_name': source.name,
'parenttype': 'Address',
}, ['parent'], limit=1)
contact = frappe.get_all('Dynamic Link', {
'link_doctype': source.doctype,
'link_name': source.name,
'parenttype': 'Contact',
}, ['parent'], limit=1)
def _set_missing_values(source, target):
address = frappe.get_all(
"Dynamic Link",
{
"link_doctype": source.doctype,
"link_name": source.name,
"parenttype": "Address",
},
["parent"],
limit=1,
)
contact = frappe.get_all(
"Dynamic Link",
{
"link_doctype": source.doctype,
"link_name": source.name,
"parenttype": "Contact",
},
["parent"],
limit=1,
)
if address:
target.customer_address = address[0].parent
@@ -309,39 +334,49 @@ def _set_missing_values(source, target):
if contact:
target.contact_person = contact[0].parent
@frappe.whitelist()
def get_lead_details(lead, posting_date=None, company=None):
if not lead:
return {}
from erpnext.accounts.party import set_address_details
out = frappe._dict()
lead_doc = frappe.get_doc("Lead", lead)
lead = lead_doc
out.update({
"territory": lead.territory,
"customer_name": lead.company_name or lead.lead_name,
"contact_display": " ".join(filter(None, [lead.salutation, lead.lead_name])),
"contact_email": lead.email_id,
"contact_mobile": lead.mobile_no,
"contact_phone": lead.phone,
})
out.update(
{
"territory": lead.territory,
"customer_name": lead.company_name or lead.lead_name,
"contact_display": " ".join(filter(None, [lead.salutation, lead.lead_name])),
"contact_email": lead.email_id,
"contact_mobile": lead.mobile_no,
"contact_phone": lead.phone,
}
)
set_address_details(out, lead, "Lead")
taxes_and_charges = set_taxes(None, 'Lead', posting_date, company,
billing_address=out.get('customer_address'), shipping_address=out.get('shipping_address_name'))
taxes_and_charges = set_taxes(
None,
"Lead",
posting_date,
company,
billing_address=out.get("customer_address"),
shipping_address=out.get("shipping_address_name"),
)
if taxes_and_charges:
out['taxes_and_charges'] = taxes_and_charges
out["taxes_and_charges"] = taxes_and_charges
return out
@frappe.whitelist()
def make_lead_from_communication(communication, ignore_communication_links=False):
""" raise a issue from email """
"""raise a issue from email"""
doc = frappe.get_doc("Communication", communication)
lead_name = None
@@ -350,12 +385,14 @@ def make_lead_from_communication(communication, ignore_communication_links=False
if not lead_name and doc.phone_no:
lead_name = frappe.db.get_value("Lead", {"mobile_no": doc.phone_no})
if not lead_name:
lead = frappe.get_doc({
"doctype": "Lead",
"lead_name": doc.sender_full_name,
"email_id": doc.sender,
"mobile_no": doc.phone_no
})
lead = frappe.get_doc(
{
"doctype": "Lead",
"lead_name": doc.sender_full_name,
"email_id": doc.sender,
"mobile_no": doc.phone_no,
}
)
lead.flags.ignore_mandatory = True
lead.flags.ignore_permissions = True
lead.insert()
@@ -365,29 +402,41 @@ def make_lead_from_communication(communication, ignore_communication_links=False
link_communication_to_document(doc, "Lead", lead_name, ignore_communication_links)
return lead_name
def get_lead_with_phone_number(number):
if not number: return
leads = frappe.get_all('Lead', or_filters={
'phone': ['like', '%{}'.format(number)],
'mobile_no': ['like', '%{}'.format(number)]
}, limit=1, order_by="creation DESC")
def get_lead_with_phone_number(number):
if not number:
return
leads = frappe.get_all(
"Lead",
or_filters={
"phone": ["like", "%{}".format(number)],
"mobile_no": ["like", "%{}".format(number)],
},
limit=1,
order_by="creation DESC",
)
lead = leads[0].name if leads else None
return lead
def daily_open_lead():
leads = frappe.get_all("Lead", filters = [["contact_date", "Between", [nowdate(), nowdate()]]])
leads = frappe.get_all("Lead", filters=[["contact_date", "Between", [nowdate(), nowdate()]]])
for lead in leads:
frappe.db.set_value("Lead", lead.name, "status", "Open")
@frappe.whitelist()
def add_lead_to_prospect(lead, prospect):
prospect = frappe.get_doc('Prospect', prospect)
prospect.append('prospect_lead', {
'lead': lead
})
prospect = frappe.get_doc("Prospect", prospect)
prospect.append("prospect_lead", {"lead": lead})
prospect.save(ignore_permissions=True)
frappe.msgprint(_('Lead {0} has been added to prospect {1}.').format(frappe.bold(lead), frappe.bold(prospect.name)),
title=_('Lead Added'), indicator='green')
frappe.msgprint(
_("Lead {0} has been added to prospect {1}.").format(
frappe.bold(lead), frappe.bold(prospect.name)
),
title=_("Lead Added"),
indicator="green",
)

View File

@@ -1,16 +1,9 @@
def get_data():
return {
'fieldname': 'lead',
'non_standard_fieldnames': {
'Quotation': 'party_name',
'Opportunity': 'party_name'
},
'dynamic_links': {
'party_name': ['Lead', 'quotation_to']
},
'transactions': [
{
'items': ['Opportunity', 'Quotation', 'Prospect']
},
]
"fieldname": "lead",
"non_standard_fieldnames": {"Quotation": "party_name", "Opportunity": "party_name"},
"dynamic_links": {"party_name": ["Lead", "quotation_to"]},
"transactions": [
{"items": ["Opportunity", "Quotation", "Prospect"]},
],
}

View File

@@ -7,7 +7,8 @@ import unittest
import frappe
from frappe.utils import random_string
test_records = frappe.get_test_records('Lead')
test_records = frappe.get_test_records("Lead")
class TestLead(unittest.TestCase):
def test_make_customer(self):
@@ -23,12 +24,16 @@ class TestLead(unittest.TestCase):
customer.customer_group = "_Test Customer Group"
customer.insert()
#check whether lead contact is carried forward to the customer.
contact = frappe.db.get_value('Dynamic Link', {
"parenttype": "Contact",
"link_doctype": "Lead",
"link_name": customer.lead_name,
}, "parent")
# check whether lead contact is carried forward to the customer.
contact = frappe.db.get_value(
"Dynamic Link",
{
"parenttype": "Contact",
"link_doctype": "Lead",
"link_name": customer.lead_name,
},
"parent",
)
if contact:
contact_doc = frappe.get_doc("Contact", contact)
@@ -46,51 +51,49 @@ class TestLead(unittest.TestCase):
customer.insert()
def test_create_lead_and_unlinking_dynamic_links(self):
lead_doc = make_lead(first_name = "Lorem", last_name="Ipsum", email_id="lorem_ipsum@example.com")
lead_doc = make_lead(first_name="Lorem", last_name="Ipsum", email_id="lorem_ipsum@example.com")
lead_doc_1 = make_lead()
frappe.get_doc({
"doctype": "Address",
"address_type": "Billing",
"city": "Mumbai",
"address_line1": "Vidya Vihar West",
"country": "India",
"links": [{
"link_doctype": "Lead",
"link_name": lead_doc.name
}]
}).insert()
frappe.get_doc(
{
"doctype": "Address",
"address_type": "Billing",
"city": "Mumbai",
"address_line1": "Vidya Vihar West",
"country": "India",
"links": [{"link_doctype": "Lead", "link_name": lead_doc.name}],
}
).insert()
address_1 = frappe.get_doc({
"doctype": "Address",
"address_type": "Billing",
"address_line1": "Baner",
"city": "Pune",
"country": "India",
"links": [
{
"link_doctype": "Lead",
"link_name": lead_doc.name
},
{
"link_doctype": "Lead",
"link_name": lead_doc_1.name
}
]
}).insert()
address_1 = frappe.get_doc(
{
"doctype": "Address",
"address_type": "Billing",
"address_line1": "Baner",
"city": "Pune",
"country": "India",
"links": [
{"link_doctype": "Lead", "link_name": lead_doc.name},
{"link_doctype": "Lead", "link_name": lead_doc_1.name},
],
}
).insert()
lead_doc.delete()
address_1.reload()
self.assertEqual(frappe.db.exists("Lead",lead_doc.name), None)
self.assertEqual(len(address_1.get('links')), 1)
self.assertEqual(frappe.db.exists("Lead", lead_doc.name), None)
self.assertEqual(len(address_1.get("links")), 1)
def make_lead(**args):
args = frappe._dict(args)
lead_doc = frappe.get_doc({
"doctype": "Lead",
"first_name": args.first_name or "_Test",
"last_name": args.last_name or "Lead",
"email_id": args.email_id or "new_lead_{}@example.com".format(random_string(5)),
}).insert()
lead_doc = frappe.get_doc(
{
"doctype": "Lead",
"first_name": args.first_name or "_Test",
"last_name": args.last_name or "Lead",
"email_id": args.email_id or "new_lead_{}@example.com".format(random_string(5)),
}
).insert()
return lead_doc

View File

@@ -15,12 +15,16 @@ from frappe.utils.file_manager import get_file_path
class LinkedInSettings(Document):
@frappe.whitelist()
def get_authorization_url(self):
params = urlencode({
"response_type":"code",
"client_id": self.consumer_key,
"redirect_uri": "{0}/api/method/erpnext.crm.doctype.linkedin_settings.linkedin_settings.callback?".format(frappe.utils.get_url()),
"scope": "r_emailaddress w_organization_social r_basicprofile r_liteprofile r_organization_social rw_organization_admin w_member_social"
})
params = urlencode(
{
"response_type": "code",
"client_id": self.consumer_key,
"redirect_uri": "{0}/api/method/erpnext.crm.doctype.linkedin_settings.linkedin_settings.callback?".format(
frappe.utils.get_url()
),
"scope": "r_emailaddress w_organization_social r_basicprofile r_liteprofile r_organization_social rw_organization_admin w_member_social",
}
)
url = "https://www.linkedin.com/oauth/v2/authorization?{}".format(params)
@@ -33,11 +37,11 @@ class LinkedInSettings(Document):
"code": code,
"client_id": self.consumer_key,
"client_secret": self.get_password(fieldname="consumer_secret"),
"redirect_uri": "{0}/api/method/erpnext.crm.doctype.linkedin_settings.linkedin_settings.callback?".format(frappe.utils.get_url()),
}
headers = {
"Content-Type": "application/x-www-form-urlencoded"
"redirect_uri": "{0}/api/method/erpnext.crm.doctype.linkedin_settings.linkedin_settings.callback?".format(
frappe.utils.get_url()
),
}
headers = {"Content-Type": "application/x-www-form-urlencoded"}
response = self.http_post(url=url, data=body, headers=headers)
response = frappe.parse_json(response.content.decode())
@@ -47,11 +51,15 @@ class LinkedInSettings(Document):
response = requests.get(url="https://api.linkedin.com/v2/me", headers=self.get_headers())
response = frappe.parse_json(response.content.decode())
frappe.db.set_value(self.doctype, self.name, {
"person_urn": response["id"],
"account_name": response["vanityName"],
"session_status": "Active"
})
frappe.db.set_value(
self.doctype,
self.name,
{
"person_urn": response["id"],
"account_name": response["vanityName"],
"session_status": "Active",
},
)
frappe.local.response["type"] = "redirect"
frappe.local.response["location"] = get_url_to_form("LinkedIn Settings", "LinkedIn Settings")
@@ -64,8 +72,7 @@ class LinkedInSettings(Document):
if media_id:
return self.post_text(text, title, media_id=media_id)
else:
frappe.log_error("Failed to upload media.","LinkedIn Upload Error")
frappe.log_error("Failed to upload media.", "LinkedIn Upload Error")
def upload_image(self, media):
media = get_file_path(media)
@@ -74,10 +81,9 @@ class LinkedInSettings(Document):
"registerUploadRequest": {
"recipes": ["urn:li:digitalmediaRecipe:feedshare-image"],
"owner": "urn:li:organization:{0}".format(self.company_id),
"serviceRelationships": [{
"relationshipType": "OWNER",
"identifier": "urn:li:userGeneratedContent"
}]
"serviceRelationships": [
{"relationshipType": "OWNER", "identifier": "urn:li:userGeneratedContent"}
],
}
}
headers = self.get_headers()
@@ -86,11 +92,16 @@ class LinkedInSettings(Document):
if response.status_code == 200:
response = response.json()
asset = response["value"]["asset"]
upload_url = response["value"]["uploadMechanism"]["com.linkedin.digitalmedia.uploading.MediaUploadHttpRequest"]["uploadUrl"]
headers['Content-Type']='image/jpeg'
response = self.http_post(upload_url, headers=headers, data=open(media,"rb"))
upload_url = response["value"]["uploadMechanism"][
"com.linkedin.digitalmedia.uploading.MediaUploadHttpRequest"
]["uploadUrl"]
headers["Content-Type"] = "image/jpeg"
response = self.http_post(upload_url, headers=headers, data=open(media, "rb"))
if response.status_code < 200 and response.status_code > 299:
frappe.throw(_("Error While Uploading Image"), title="{0} {1}".format(response.status_code, response.reason))
frappe.throw(
_("Error While Uploading Image"),
title="{0} {1}".format(response.status_code, response.reason),
)
return None
return asset
@@ -103,46 +114,26 @@ class LinkedInSettings(Document):
headers["Content-Type"] = "application/json; charset=UTF-8"
body = {
"distribution": {
"linkedInDistributionTarget": {}
},
"owner":"urn:li:organization:{0}".format(self.company_id),
"distribution": {"linkedInDistributionTarget": {}},
"owner": "urn:li:organization:{0}".format(self.company_id),
"subject": title,
"text": {
"text": text
}
"text": {"text": text},
}
reference_url = self.get_reference_url(text)
if reference_url:
body["content"] = {
"contentEntities": [
{
"entityLocation": reference_url
}
]
}
body["content"] = {"contentEntities": [{"entityLocation": reference_url}]}
if media_id:
body["content"]= {
"contentEntities": [{
"entity": media_id
}],
"shareMediaCategory": "IMAGE"
}
body["content"] = {"contentEntities": [{"entity": media_id}], "shareMediaCategory": "IMAGE"}
response = self.http_post(url=url, headers=headers, body=body)
return response
def http_post(self, url, headers=None, body=None, data=None):
try:
response = requests.post(
url = url,
json = body,
data = data,
headers = headers
)
if response.status_code not in [201,200]:
response = requests.post(url=url, json=body, data=data, headers=headers)
if response.status_code not in [201, 200]:
raise
except Exception as e:
@@ -151,12 +142,11 @@ class LinkedInSettings(Document):
return response
def get_headers(self):
return {
"Authorization": "Bearer {}".format(self.access_token)
}
return {"Authorization": "Bearer {}".format(self.access_token)}
def get_reference_url(self, text):
import re
regex_url = r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
urls = re.findall(regex_url, text)
if urls:
@@ -164,18 +154,23 @@ class LinkedInSettings(Document):
def delete_post(self, post_id):
try:
response = requests.delete(url="https://api.linkedin.com/v2/shares/urn:li:share:{0}".format(post_id), headers=self.get_headers())
if response.status_code !=200:
response = requests.delete(
url="https://api.linkedin.com/v2/shares/urn:li:share:{0}".format(post_id),
headers=self.get_headers(),
)
if response.status_code != 200:
raise
except Exception:
self.api_error(response)
def get_post(self, post_id):
url = "https://api.linkedin.com/v2/organizationalEntityShareStatistics?q=organizationalEntity&organizationalEntity=urn:li:organization:{0}&shares[0]=urn:li:share:{1}".format(self.company_id, post_id)
url = "https://api.linkedin.com/v2/organizationalEntityShareStatistics?q=organizationalEntity&organizationalEntity=urn:li:organization:{0}&shares[0]=urn:li:share:{1}".format(
self.company_id, post_id
)
try:
response = requests.get(url=url, headers=self.get_headers())
if response.status_code !=200:
if response.status_code != 200:
raise
except Exception:
@@ -200,6 +195,7 @@ class LinkedInSettings(Document):
else:
frappe.throw(response.reason, title=response.status_code)
@frappe.whitelist(allow_guest=True)
def callback(code=None, error=None, error_description=None):
if not error:
@@ -209,4 +205,4 @@ def callback(code=None, error=None, error_description=None):
frappe.db.commit()
else:
frappe.local.response["type"] = "redirect"
frappe.local.response["location"] = get_url_to_form("LinkedIn Settings","LinkedIn Settings")
frappe.local.response["location"] = get_url_to_form("LinkedIn Settings", "LinkedIn Settings")

View File

@@ -27,12 +27,16 @@ class Opportunity(TransactionBase):
add_link_in_communication(self.opportunity_from, self.party_name, self)
def validate(self):
self._prev = frappe._dict({
"contact_date": frappe.db.get_value("Opportunity", self.name, "contact_date") if \
(not cint(self.get("__islocal"))) else None,
"contact_by": frappe.db.get_value("Opportunity", self.name, "contact_by") if \
(not cint(self.get("__islocal"))) else None,
})
self._prev = frappe._dict(
{
"contact_date": frappe.db.get_value("Opportunity", self.name, "contact_date")
if (not cint(self.get("__islocal")))
else None,
"contact_by": frappe.db.get_value("Opportunity", self.name, "contact_by")
if (not cint(self.get("__islocal")))
else None,
}
)
self.make_new_lead_if_required()
self.validate_item_details()
@@ -60,7 +64,7 @@ class Opportunity(TransactionBase):
def calculate_totals(self):
total = base_total = 0
for item in self.get('items'):
for item in self.get("items"):
item.amount = flt(item.rate) * flt(item.qty)
item.base_rate = flt(self.conversion_rate * item.rate)
item.base_amount = flt(self.conversion_rate * item.amount)
@@ -75,17 +79,18 @@ class Opportunity(TransactionBase):
if (not self.get("party_name")) and self.contact_email:
# check if customer is already created agains the self.contact_email
dynamic_link, contact = DocType("Dynamic Link"), DocType("Contact")
customer = frappe.qb.from_(
dynamic_link
).join(
contact
).on(
(contact.name == dynamic_link.parent)
& (dynamic_link.link_doctype == "Customer")
& (contact.email_id == self.contact_email)
).select(
dynamic_link.link_name
).distinct().run(as_dict=True)
customer = (
frappe.qb.from_(dynamic_link)
.join(contact)
.on(
(contact.name == dynamic_link.parent)
& (dynamic_link.link_doctype == "Customer")
& (contact.email_id == self.contact_email)
)
.select(dynamic_link.link_name)
.distinct()
.run(as_dict=True)
)
if customer and customer[0].link_name:
self.party_name = customer[0].link_name
@@ -98,19 +103,17 @@ class Opportunity(TransactionBase):
if sender_name == self.contact_email:
sender_name = None
if not sender_name and ('@' in self.contact_email):
email_name = self.contact_email.split('@')[0]
if not sender_name and ("@" in self.contact_email):
email_name = self.contact_email.split("@")[0]
email_split = email_name.split('.')
sender_name = ''
email_split = email_name.split(".")
sender_name = ""
for s in email_split:
sender_name += s.capitalize() + ' '
sender_name += s.capitalize() + " "
lead = frappe.get_doc({
"doctype": "Lead",
"email_id": self.contact_email,
"lead_name": sender_name or 'Unknown'
})
lead = frappe.get_doc(
{"doctype": "Lead", "email_id": self.contact_email, "lead_name": sender_name or "Unknown"}
)
lead.flags.ignore_email_validation = True
lead.insert(ignore_permissions=True)
@@ -122,17 +125,17 @@ class Opportunity(TransactionBase):
@frappe.whitelist()
def declare_enquiry_lost(self, lost_reasons_list, competitors, detailed_reason=None):
if not self.has_active_quotation():
self.status = 'Lost'
self.status = "Lost"
self.lost_reasons = self.competitors = []
if detailed_reason:
self.order_lost_reason = detailed_reason
for reason in lost_reasons_list:
self.append('lost_reasons', reason)
self.append("lost_reasons", reason)
for competitor in competitors:
self.append('competitors', competitor)
self.append("competitors", competitor)
self.save()
@@ -144,85 +147,92 @@ class Opportunity(TransactionBase):
def has_active_quotation(self):
if not self.with_items:
return frappe.get_all('Quotation',
{
'opportunity': self.name,
'status': ("not in", ['Lost', 'Closed']),
'docstatus': 1
}, 'name')
return frappe.get_all(
"Quotation",
{"opportunity": self.name, "status": ("not in", ["Lost", "Closed"]), "docstatus": 1},
"name",
)
else:
return frappe.db.sql("""
return frappe.db.sql(
"""
select q.name
from `tabQuotation` q, `tabQuotation Item` qi
where q.name = qi.parent and q.docstatus=1 and qi.prevdoc_docname =%s
and q.status not in ('Lost', 'Closed')""", self.name)
and q.status not in ('Lost', 'Closed')""",
self.name,
)
def has_ordered_quotation(self):
if not self.with_items:
return frappe.get_all('Quotation',
{
'opportunity': self.name,
'status': 'Ordered',
'docstatus': 1
}, 'name')
return frappe.get_all(
"Quotation", {"opportunity": self.name, "status": "Ordered", "docstatus": 1}, "name"
)
else:
return frappe.db.sql("""
return frappe.db.sql(
"""
select q.name
from `tabQuotation` q, `tabQuotation Item` qi
where q.name = qi.parent and q.docstatus=1 and qi.prevdoc_docname =%s
and q.status = 'Ordered'""", self.name)
and q.status = 'Ordered'""",
self.name,
)
def has_lost_quotation(self):
lost_quotation = frappe.db.sql("""
lost_quotation = frappe.db.sql(
"""
select name
from `tabQuotation`
where docstatus=1
and opportunity =%s and status = 'Lost'
""", self.name)
""",
self.name,
)
if lost_quotation:
if self.has_active_quotation():
return False
return True
def validate_cust_name(self):
if self.party_name and self.opportunity_from == 'Customer':
if self.party_name and self.opportunity_from == "Customer":
self.customer_name = frappe.db.get_value("Customer", self.party_name, "customer_name")
elif self.party_name and self.opportunity_from == 'Lead':
lead_name, company_name = frappe.db.get_value("Lead", self.party_name, ["lead_name", "company_name"])
elif self.party_name and self.opportunity_from == "Lead":
lead_name, company_name = frappe.db.get_value(
"Lead", self.party_name, ["lead_name", "company_name"]
)
self.customer_name = company_name or lead_name
def on_update(self):
self.add_calendar_event()
def add_calendar_event(self, opts=None, force=False):
if frappe.db.get_single_value('CRM Settings', 'create_event_on_next_contact_date_opportunity'):
if frappe.db.get_single_value("CRM Settings", "create_event_on_next_contact_date_opportunity"):
if not opts:
opts = frappe._dict()
opts.description = ""
opts.contact_date = self.contact_date
if self.party_name and self.opportunity_from == 'Customer':
if self.party_name and self.opportunity_from == "Customer":
if self.contact_person:
opts.description = 'Contact '+cstr(self.contact_person)
opts.description = "Contact " + cstr(self.contact_person)
else:
opts.description = 'Contact customer '+cstr(self.party_name)
elif self.party_name and self.opportunity_from == 'Lead':
opts.description = "Contact customer " + cstr(self.party_name)
elif self.party_name and self.opportunity_from == "Lead":
if self.contact_display:
opts.description = 'Contact '+cstr(self.contact_display)
opts.description = "Contact " + cstr(self.contact_display)
else:
opts.description = 'Contact lead '+cstr(self.party_name)
opts.description = "Contact lead " + cstr(self.party_name)
opts.subject = opts.description
opts.description += '. By : ' + cstr(self.contact_by)
opts.description += ". By : " + cstr(self.contact_by)
if self.to_discuss:
opts.description += ' To Discuss : ' + cstr(self.to_discuss)
opts.description += " To Discuss : " + cstr(self.to_discuss)
super(Opportunity, self).add_calendar_event(opts, force)
def validate_item_details(self):
if not self.get('items'):
if not self.get("items"):
return
# set missing values
@@ -234,41 +244,51 @@ class Opportunity(TransactionBase):
item = frappe.db.get_value("Item", d.item_code, item_fields, as_dict=True)
for key in item_fields:
if not d.get(key): d.set(key, item.get(key))
if not d.get(key):
d.set(key, item.get(key))
@frappe.whitelist()
def get_item_details(item_code):
item = frappe.db.sql("""select item_name, stock_uom, image, description, item_group, brand
from `tabItem` where name = %s""", item_code, as_dict=1)
item = frappe.db.sql(
"""select item_name, stock_uom, image, description, item_group, brand
from `tabItem` where name = %s""",
item_code,
as_dict=1,
)
return {
'item_name': item and item[0]['item_name'] or '',
'uom': item and item[0]['stock_uom'] or '',
'description': item and item[0]['description'] or '',
'image': item and item[0]['image'] or '',
'item_group': item and item[0]['item_group'] or '',
'brand': item and item[0]['brand'] or ''
"item_name": item and item[0]["item_name"] or "",
"uom": item and item[0]["stock_uom"] or "",
"description": item and item[0]["description"] or "",
"image": item and item[0]["image"] or "",
"item_group": item and item[0]["item_group"] or "",
"brand": item and item[0]["brand"] or "",
}
@frappe.whitelist()
def make_quotation(source_name, target_doc=None):
def set_missing_values(source, target):
from erpnext.controllers.accounts_controller import get_default_taxes_and_charges
quotation = frappe.get_doc(target)
company_currency = frappe.get_cached_value('Company', quotation.company, "default_currency")
company_currency = frappe.get_cached_value("Company", quotation.company, "default_currency")
if company_currency == quotation.currency:
exchange_rate = 1
else:
exchange_rate = get_exchange_rate(quotation.currency, company_currency,
quotation.transaction_date, args="for_selling")
exchange_rate = get_exchange_rate(
quotation.currency, company_currency, quotation.transaction_date, args="for_selling"
)
quotation.conversion_rate = exchange_rate
# get default taxes
taxes = get_default_taxes_and_charges("Sales Taxes and Charges Template", company=quotation.company)
if taxes.get('taxes'):
taxes = get_default_taxes_and_charges(
"Sales Taxes and Charges Template", company=quotation.company
)
if taxes.get("taxes"):
quotation.update(taxes)
quotation.run_method("set_missing_values")
@@ -276,49 +296,53 @@ def make_quotation(source_name, target_doc=None):
if not source.with_items:
quotation.opportunity = source.name
doclist = get_mapped_doc("Opportunity", source_name, {
"Opportunity": {
"doctype": "Quotation",
"field_map": {
"opportunity_from": "quotation_to",
"name": "enq_no"
}
},
"Opportunity Item": {
"doctype": "Quotation Item",
"field_map": {
"parent": "prevdoc_docname",
"parenttype": "prevdoc_doctype",
"uom": "stock_uom"
doclist = get_mapped_doc(
"Opportunity",
source_name,
{
"Opportunity": {
"doctype": "Quotation",
"field_map": {"opportunity_from": "quotation_to", "name": "enq_no"},
},
"add_if_empty": True
}
}, target_doc, set_missing_values)
"Opportunity Item": {
"doctype": "Quotation Item",
"field_map": {
"parent": "prevdoc_docname",
"parenttype": "prevdoc_doctype",
"uom": "stock_uom",
},
"add_if_empty": True,
},
},
target_doc,
set_missing_values,
)
return doclist
@frappe.whitelist()
def make_request_for_quotation(source_name, target_doc=None):
def update_item(obj, target, source_parent):
target.conversion_factor = 1.0
doclist = get_mapped_doc("Opportunity", source_name, {
"Opportunity": {
"doctype": "Request for Quotation"
doclist = get_mapped_doc(
"Opportunity",
source_name,
{
"Opportunity": {"doctype": "Request for Quotation"},
"Opportunity Item": {
"doctype": "Request for Quotation Item",
"field_map": [["name", "opportunity_item"], ["parent", "opportunity"], ["uom", "uom"]],
"postprocess": update_item,
},
},
"Opportunity Item": {
"doctype": "Request for Quotation Item",
"field_map": [
["name", "opportunity_item"],
["parent", "opportunity"],
["uom", "uom"]
],
"postprocess": update_item
}
}, target_doc)
target_doc,
)
return doclist
@frappe.whitelist()
def make_customer(source_name, target_doc=None):
def set_missing_values(source, target):
@@ -327,37 +351,37 @@ def make_customer(source_name, target_doc=None):
if source.opportunity_from == "Lead":
target.lead_name = source.party_name
doclist = get_mapped_doc("Opportunity", source_name, {
"Opportunity": {
"doctype": "Customer",
"field_map": {
"currency": "default_currency",
"customer_name": "customer_name"
doclist = get_mapped_doc(
"Opportunity",
source_name,
{
"Opportunity": {
"doctype": "Customer",
"field_map": {"currency": "default_currency", "customer_name": "customer_name"},
}
}
}, target_doc, set_missing_values)
},
target_doc,
set_missing_values,
)
return doclist
@frappe.whitelist()
def make_supplier_quotation(source_name, target_doc=None):
doclist = get_mapped_doc("Opportunity", source_name, {
"Opportunity": {
"doctype": "Supplier Quotation",
"field_map": {
"name": "opportunity"
}
doclist = get_mapped_doc(
"Opportunity",
source_name,
{
"Opportunity": {"doctype": "Supplier Quotation", "field_map": {"name": "opportunity"}},
"Opportunity Item": {"doctype": "Supplier Quotation Item", "field_map": {"uom": "stock_uom"}},
},
"Opportunity Item": {
"doctype": "Supplier Quotation Item",
"field_map": {
"uom": "stock_uom"
}
}
}, target_doc)
target_doc,
)
return doclist
@frappe.whitelist()
def set_multiple_status(names, status):
names = json.loads(names)
@@ -366,12 +390,19 @@ def set_multiple_status(names, status):
opp.status = status
opp.save()
def auto_close_opportunity():
""" auto close the `Replied` Opportunities after 7 days """
auto_close_after_days = frappe.db.get_single_value("CRM Settings", "close_opportunity_after_days") or 15
opportunities = frappe.db.sql(""" select name from tabOpportunity where status='Replied' and
modified<DATE_SUB(CURDATE(), INTERVAL %s DAY) """, (auto_close_after_days), as_dict=True)
def auto_close_opportunity():
"""auto close the `Replied` Opportunities after 7 days"""
auto_close_after_days = (
frappe.db.get_single_value("CRM Settings", "close_opportunity_after_days") or 15
)
opportunities = frappe.db.sql(
""" select name from tabOpportunity where status='Replied' and
modified<DATE_SUB(CURDATE(), INTERVAL %s DAY) """,
(auto_close_after_days),
as_dict=True,
)
for opportunity in opportunities:
doc = frappe.get_doc("Opportunity", opportunity.get("name"))
@@ -380,9 +411,11 @@ def auto_close_opportunity():
doc.flags.ignore_mandatory = True
doc.save()
@frappe.whitelist()
def make_opportunity_from_communication(communication, company, ignore_communication_links=False):
from erpnext.crm.doctype.lead.lead import make_lead_from_communication
doc = frappe.get_doc("Communication", communication)
lead = doc.reference_name if doc.reference_doctype == "Lead" else None
@@ -391,16 +424,20 @@ def make_opportunity_from_communication(communication, company, ignore_communica
opportunity_from = "Lead"
opportunity = frappe.get_doc({
"doctype": "Opportunity",
"company": company,
"opportunity_from": opportunity_from,
"party_name": lead
}).insert(ignore_permissions=True)
opportunity = frappe.get_doc(
{
"doctype": "Opportunity",
"company": company,
"opportunity_from": opportunity_from,
"party_name": lead,
}
).insert(ignore_permissions=True)
link_communication_to_document(doc, "Opportunity", opportunity.name, ignore_communication_links)
return opportunity.name
@frappe.whitelist()
def get_events(start, end, filters=None):
"""Returns events for Gantt / Calendar view rendering.
@@ -409,9 +446,11 @@ def get_events(start, end, filters=None):
:param filters: Filters (JSON).
"""
from frappe.desk.calendar import get_event_conditions
conditions = get_event_conditions("Opportunity", filters)
data = frappe.db.sql("""
data = frappe.db.sql(
"""
select
distinct `tabOpportunity`.name, `tabOpportunity`.customer_name, `tabOpportunity`.opportunity_amount,
`tabOpportunity`.title, `tabOpportunity`.contact_date
@@ -420,8 +459,11 @@ def get_events(start, end, filters=None):
where
(`tabOpportunity`.contact_date between %(start)s and %(end)s)
{conditions}
""".format(conditions=conditions), {
"start": start,
"end": end
}, as_dict=True, update={"allDay": 0})
""".format(
conditions=conditions
),
{"start": start, "end": end},
as_dict=True,
update={"allDay": 0},
)
return data

View File

@@ -1,9 +1,7 @@
def get_data():
return {
'fieldname': 'opportunity',
'transactions': [
{
'items': ['Quotation', 'Supplier Quotation']
},
]
"fieldname": "opportunity",
"transactions": [
{"items": ["Quotation", "Supplier Quotation"]},
],
}

View File

@@ -11,22 +11,20 @@ from erpnext.crm.doctype.lead.test_lead import make_lead
from erpnext.crm.doctype.opportunity.opportunity import make_quotation
from erpnext.crm.utils import get_linked_communication_list
test_records = frappe.get_test_records('Opportunity')
test_records = frappe.get_test_records("Opportunity")
class TestOpportunity(unittest.TestCase):
def test_opportunity_status(self):
doc = make_opportunity(with_items=0)
quotation = make_quotation(doc.name)
quotation.append('items', {
"item_code": "_Test Item",
"qty": 1
})
quotation.append("items", {"item_code": "_Test Item", "qty": 1})
quotation.run_method("set_missing_values")
quotation.run_method("calculate_taxes_and_totals")
quotation.submit()
doc = frappe.get_doc('Opportunity', doc.name)
doc = frappe.get_doc("Opportunity", doc.name)
self.assertEqual(doc.status, "Quotation")
def test_make_new_lead_if_required(self):
@@ -34,18 +32,19 @@ class TestOpportunity(unittest.TestCase):
self.assertTrue(opp_doc.party_name)
self.assertEqual(opp_doc.opportunity_from, "Lead")
self.assertEqual(frappe.db.get_value("Lead", opp_doc.party_name, "email_id"), opp_doc.contact_email)
self.assertEqual(
frappe.db.get_value("Lead", opp_doc.party_name, "email_id"), opp_doc.contact_email
)
# create new customer and create new contact against 'new.opportunity@example.com'
customer = make_customer(opp_doc.party_name).insert(ignore_permissions=True)
contact = frappe.get_doc({
"doctype": "Contact",
"first_name": "_Test Opportunity Customer",
"links": [{
"link_doctype": "Customer",
"link_name": customer.name
}]
})
contact = frappe.get_doc(
{
"doctype": "Contact",
"first_name": "_Test Opportunity Customer",
"links": [{"link_doctype": "Customer", "link_name": customer.name}],
}
)
contact.add_email(opp_doc.contact_email, is_primary=True)
contact.insert(ignore_permissions=True)
@@ -54,38 +53,51 @@ class TestOpportunity(unittest.TestCase):
self.assertEqual(opportunity_doc.total, 2200)
def test_carry_forward_of_email_and_comments(self):
frappe.db.set_value("CRM Settings", "CRM Settings", "carry_forward_communication_and_comments", 1)
frappe.db.set_value(
"CRM Settings", "CRM Settings", "carry_forward_communication_and_comments", 1
)
lead_doc = make_lead()
lead_doc.add_comment('Comment', text='Test Comment 1')
lead_doc.add_comment('Comment', text='Test Comment 2')
lead_doc.add_comment("Comment", text="Test Comment 1")
lead_doc.add_comment("Comment", text="Test Comment 2")
create_communication(lead_doc.doctype, lead_doc.name, lead_doc.email_id)
create_communication(lead_doc.doctype, lead_doc.name, lead_doc.email_id)
opp_doc = make_opportunity(opportunity_from="Lead", lead=lead_doc.name)
opportunity_comment_count = frappe.db.count("Comment", {"reference_doctype": opp_doc.doctype, "reference_name": opp_doc.name})
opportunity_communication_count = len(get_linked_communication_list(opp_doc.doctype, opp_doc.name))
opportunity_comment_count = frappe.db.count(
"Comment", {"reference_doctype": opp_doc.doctype, "reference_name": opp_doc.name}
)
opportunity_communication_count = len(
get_linked_communication_list(opp_doc.doctype, opp_doc.name)
)
self.assertEqual(opportunity_comment_count, 2)
self.assertEqual(opportunity_communication_count, 2)
opp_doc.add_comment('Comment', text='Test Comment 3')
opp_doc.add_comment('Comment', text='Test Comment 4')
opp_doc.add_comment("Comment", text="Test Comment 3")
opp_doc.add_comment("Comment", text="Test Comment 4")
create_communication(opp_doc.doctype, opp_doc.name, opp_doc.contact_email)
create_communication(opp_doc.doctype, opp_doc.name, opp_doc.contact_email)
quotation_doc = make_quotation(opp_doc.name)
quotation_doc.append('items', {
"item_code": "_Test Item",
"qty": 1
})
quotation_doc.append("items", {"item_code": "_Test Item", "qty": 1})
quotation_doc.run_method("set_missing_values")
quotation_doc.run_method("calculate_taxes_and_totals")
quotation_doc.save()
quotation_comment_count = frappe.db.count("Comment", {"reference_doctype": quotation_doc.doctype, "reference_name": quotation_doc.name, "comment_type": "Comment"})
quotation_communication_count = len(get_linked_communication_list(quotation_doc.doctype, quotation_doc.name))
quotation_comment_count = frappe.db.count(
"Comment",
{
"reference_doctype": quotation_doc.doctype,
"reference_name": quotation_doc.name,
"comment_type": "Comment",
},
)
quotation_communication_count = len(
get_linked_communication_list(quotation_doc.doctype, quotation_doc.name)
)
self.assertEqual(quotation_comment_count, 4)
self.assertEqual(quotation_communication_count, 4)
def make_opportunity_from_lead():
new_lead_email_id = "new{}@example.com".format(random_string(5))
args = {
@@ -93,56 +105,67 @@ def make_opportunity_from_lead():
"contact_email": new_lead_email_id,
"opportunity_type": "Sales",
"with_items": 0,
"transaction_date": today()
"transaction_date": today(),
}
# new lead should be created against the new.opportunity@example.com
opp_doc = frappe.get_doc(args).insert(ignore_permissions=True)
return opp_doc
def make_opportunity(**args):
args = frappe._dict(args)
opp_doc = frappe.get_doc({
"doctype": "Opportunity",
"company": args.company or "_Test Company",
"opportunity_from": args.opportunity_from or "Customer",
"opportunity_type": "Sales",
"conversion_rate": 1.0,
"with_items": args.with_items or 0,
"transaction_date": today()
})
opp_doc = frappe.get_doc(
{
"doctype": "Opportunity",
"company": args.company or "_Test Company",
"opportunity_from": args.opportunity_from or "Customer",
"opportunity_type": "Sales",
"conversion_rate": 1.0,
"with_items": args.with_items or 0,
"transaction_date": today(),
}
)
if opp_doc.opportunity_from == 'Customer':
opp_doc.party_name= args.customer or "_Test Customer"
if opp_doc.opportunity_from == "Customer":
opp_doc.party_name = args.customer or "_Test Customer"
if opp_doc.opportunity_from == 'Lead':
if opp_doc.opportunity_from == "Lead":
opp_doc.party_name = args.lead or "_T-Lead-00001"
if args.with_items:
opp_doc.append('items', {
"item_code": args.item_code or "_Test Item",
"qty": args.qty or 1,
"rate": args.rate or 1000,
"uom": "_Test UOM"
})
opp_doc.append(
"items",
{
"item_code": args.item_code or "_Test Item",
"qty": args.qty or 1,
"rate": args.rate or 1000,
"uom": "_Test UOM",
},
)
opp_doc.insert()
return opp_doc
def create_communication(reference_doctype, reference_name, sender, sent_or_received=None, creation=None):
communication = frappe.get_doc({
"doctype": "Communication",
"communication_type": "Communication",
"communication_medium": "Email",
"sent_or_received": sent_or_received or "Sent",
"email_status": "Open",
"subject": "Test Subject",
"sender": sender,
"content": "Test",
"status": "Linked",
"reference_doctype": reference_doctype,
"creation": creation or now_datetime(),
"reference_name": reference_name
})
communication.save()
def create_communication(
reference_doctype, reference_name, sender, sent_or_received=None, creation=None
):
communication = frappe.get_doc(
{
"doctype": "Communication",
"communication_type": "Communication",
"communication_medium": "Email",
"sent_or_received": sent_or_received or "Sent",
"email_status": "Open",
"subject": "Test Subject",
"sender": sender,
"content": "Test",
"status": "Linked",
"reference_doctype": reference_doctype,
"creation": creation or now_datetime(),
"reference_name": reference_name,
}
)
communication.save()

View File

@@ -24,13 +24,15 @@ class Prospect(Document):
def after_insert(self):
if frappe.db.get_single_value("CRM Settings", "carry_forward_communication_and_comments"):
for row in self.get('prospect_lead'):
for row in self.get("prospect_lead"):
copy_comments("Lead", row.lead, self)
add_link_in_communication("Lead", row.lead, self)
def update_lead_details(self):
for row in self.get('prospect_lead'):
lead = frappe.get_value('Lead', row.lead, ['lead_name', 'status', 'email_id', 'mobile_no'], as_dict=True)
for row in self.get("prospect_lead"):
lead = frappe.get_value(
"Lead", row.lead, ["lead_name", "status", "email_id", "mobile_no"], as_dict=True
)
row.lead_name = lead.lead_name
row.status = lead.status
row.email = lead.email_id
@@ -38,39 +40,45 @@ class Prospect(Document):
def link_with_lead_contact_and_address(self):
for row in self.prospect_lead:
links = frappe.get_all('Dynamic Link', filters={'link_doctype': 'Lead', 'link_name': row.lead}, fields=['parent', 'parenttype'])
links = frappe.get_all(
"Dynamic Link",
filters={"link_doctype": "Lead", "link_name": row.lead},
fields=["parent", "parenttype"],
)
for link in links:
linked_doc = frappe.get_doc(link['parenttype'], link['parent'])
linked_doc = frappe.get_doc(link["parenttype"], link["parent"])
exists = False
for d in linked_doc.get('links'):
for d in linked_doc.get("links"):
if d.link_doctype == self.doctype and d.link_name == self.name:
exists = True
if not exists:
linked_doc.append('links', {
'link_doctype': self.doctype,
'link_name': self.name
})
linked_doc.append("links", {"link_doctype": self.doctype, "link_name": self.name})
linked_doc.save(ignore_permissions=True)
def unlink_dynamic_links(self):
links = frappe.get_all('Dynamic Link', filters={'link_doctype': self.doctype, 'link_name': self.name}, fields=['parent', 'parenttype'])
links = frappe.get_all(
"Dynamic Link",
filters={"link_doctype": self.doctype, "link_name": self.name},
fields=["parent", "parenttype"],
)
for link in links:
linked_doc = frappe.get_doc(link['parenttype'], link['parent'])
linked_doc = frappe.get_doc(link["parenttype"], link["parent"])
if len(linked_doc.get('links')) == 1:
if len(linked_doc.get("links")) == 1:
linked_doc.delete(ignore_permissions=True)
else:
to_remove = None
for d in linked_doc.get('links'):
for d in linked_doc.get("links"):
if d.link_doctype == self.doctype and d.link_name == self.name:
to_remove = d
if to_remove:
linked_doc.remove(to_remove)
linked_doc.save(ignore_permissions=True)
@frappe.whitelist()
def make_customer(source_name, target_doc=None):
def set_missing_values(source, target):
@@ -78,18 +86,23 @@ def make_customer(source_name, target_doc=None):
target.company_name = source.name
target.customer_group = source.customer_group or frappe.db.get_default("Customer Group")
doclist = get_mapped_doc("Prospect", source_name,
{"Prospect": {
"doctype": "Customer",
"field_map": {
"company_name": "customer_name",
"currency": "default_currency",
"fax": "fax"
doclist = get_mapped_doc(
"Prospect",
source_name,
{
"Prospect": {
"doctype": "Customer",
"field_map": {"company_name": "customer_name", "currency": "default_currency", "fax": "fax"},
}
}}, target_doc, set_missing_values, ignore_permissions=False)
},
target_doc,
set_missing_values,
ignore_permissions=False,
)
return doclist
@frappe.whitelist()
def make_opportunity(source_name, target_doc=None):
def set_missing_values(source, target):
@@ -97,12 +110,20 @@ def make_opportunity(source_name, target_doc=None):
target.customer_name = source.company_name
target.customer_group = source.customer_group or frappe.db.get_default("Customer Group")
doclist = get_mapped_doc("Prospect", source_name,
{"Prospect": {
"doctype": "Opportunity",
"field_map": {
"name": "party_name",
doclist = get_mapped_doc(
"Prospect",
source_name,
{
"Prospect": {
"doctype": "Opportunity",
"field_map": {
"name": "party_name",
},
}
}}, target_doc, set_missing_values, ignore_permissions=False)
},
target_doc,
set_missing_values,
ignore_permissions=False,
)
return doclist

View File

@@ -14,43 +14,45 @@ class TestProspect(unittest.TestCase):
def test_add_lead_to_prospect_and_address_linking(self):
lead_doc = make_lead()
address_doc = make_address(address_title=lead_doc.name)
address_doc.append('links', {
"link_doctype": lead_doc.doctype,
"link_name": lead_doc.name
})
address_doc.append("links", {"link_doctype": lead_doc.doctype, "link_name": lead_doc.name})
address_doc.save()
prospect_doc = make_prospect()
add_lead_to_prospect(lead_doc.name, prospect_doc.name)
prospect_doc.reload()
lead_exists_in_prosoect = False
for rec in prospect_doc.get('prospect_lead'):
for rec in prospect_doc.get("prospect_lead"):
if rec.lead == lead_doc.name:
lead_exists_in_prosoect = True
self.assertEqual(lead_exists_in_prosoect, True)
address_doc.reload()
self.assertEqual(address_doc.has_link('Prospect', prospect_doc.name), True)
self.assertEqual(address_doc.has_link("Prospect", prospect_doc.name), True)
def make_prospect(**args):
args = frappe._dict(args)
prospect_doc = frappe.get_doc({
"doctype": "Prospect",
"company_name": args.company_name or "_Test Company {}".format(random_string(3)),
}).insert()
prospect_doc = frappe.get_doc(
{
"doctype": "Prospect",
"company_name": args.company_name or "_Test Company {}".format(random_string(3)),
}
).insert()
return prospect_doc
def make_address(**args):
args = frappe._dict(args)
address_doc = frappe.get_doc({
"doctype": "Address",
"address_title": args.address_title or "Address Title",
"address_type": args.address_type or "Billing",
"city": args.city or "Mumbai",
"address_line1": args.address_line1 or "Vidya Vihar West",
"country": args.country or "India"
}).insert()
address_doc = frappe.get_doc(
{
"doctype": "Address",
"address_title": args.address_title or "Address Title",
"address_type": args.address_type or "Billing",
"city": args.city or "Mumbai",
"address_line1": args.address_line1 or "Vidya Vihar West",
"country": args.country or "India",
}
).insert()
return address_doc

View File

@@ -11,7 +11,7 @@ from frappe.model.document import Document
class SocialMediaPost(Document):
def validate(self):
if (not self.twitter and not self.linkedin):
if not self.twitter and not self.linkedin:
frappe.throw(_("Select atleast one Social Media Platform to Share on."))
if self.scheduled_time:
@@ -29,7 +29,7 @@ class SocialMediaPost(Document):
super(SocialMediaPost, self).submit()
def on_cancel(self):
self.db_set('post_status', 'Cancelled')
self.db_set("post_status", "Cancelled")
@frappe.whitelist()
def delete_post(self):
@@ -41,17 +41,17 @@ class SocialMediaPost(Document):
linkedin = frappe.get_doc("LinkedIn Settings")
linkedin.delete_post(self.linkedin_post_id)
self.db_set('post_status', 'Deleted')
self.db_set("post_status", "Deleted")
@frappe.whitelist()
def get_post(self):
response = {}
if self.linkedin and self.linkedin_post_id:
linkedin = frappe.get_doc("LinkedIn Settings")
response['linkedin'] = linkedin.get_post(self.linkedin_post_id)
response["linkedin"] = linkedin.get_post(self.linkedin_post_id)
if self.twitter and self.twitter_post_id:
twitter = frappe.get_doc("Twitter Settings")
response['twitter'] = twitter.get_tweet(self.twitter_post_id)
response["twitter"] = twitter.get_tweet(self.twitter_post_id)
return response
@@ -65,7 +65,7 @@ class SocialMediaPost(Document):
if self.linkedin and not self.linkedin_post_id:
linkedin = frappe.get_doc("LinkedIn Settings")
linkedin_post = linkedin.post(self.linkedin_post, self.title, self.image)
self.db_set("linkedin_post_id", linkedin_post.headers['X-RestLi-Id'])
self.db_set("linkedin_post_id", linkedin_post.headers["X-RestLi-Id"])
self.db_set("post_status", "Posted")
except Exception:
@@ -73,13 +73,18 @@ class SocialMediaPost(Document):
title = _("Error while POSTING {0}").format(self.name)
frappe.log_error(message=frappe.get_traceback(), title=title)
def process_scheduled_social_media_posts():
posts = frappe.get_list("Social Media Post", filters={"post_status": "Scheduled", "docstatus":1}, fields= ["name", "scheduled_time"])
posts = frappe.get_list(
"Social Media Post",
filters={"post_status": "Scheduled", "docstatus": 1},
fields=["name", "scheduled_time"],
)
start = frappe.utils.now_datetime()
end = start + datetime.timedelta(minutes=10)
for post in posts:
if post.scheduled_time:
post_time = frappe.utils.get_datetime(post.scheduled_time)
if post_time > start and post_time <= end:
sm_post = frappe.get_doc('Social Media Post', post.name)
sm_post = frappe.get_doc("Social Media Post", post.name)
sm_post.post()

View File

@@ -16,22 +16,26 @@ from tweepy.error import TweepError
class TwitterSettings(Document):
@frappe.whitelist()
def get_authorize_url(self):
callback_url = "{0}/api/method/erpnext.crm.doctype.twitter_settings.twitter_settings.callback?".format(frappe.utils.get_url())
auth = tweepy.OAuthHandler(self.consumer_key, self.get_password(fieldname="consumer_secret"), callback_url)
callback_url = (
"{0}/api/method/erpnext.crm.doctype.twitter_settings.twitter_settings.callback?".format(
frappe.utils.get_url()
)
)
auth = tweepy.OAuthHandler(
self.consumer_key, self.get_password(fieldname="consumer_secret"), callback_url
)
try:
redirect_url = auth.get_authorization_url()
return redirect_url
except tweepy.TweepError as e:
frappe.msgprint(_("Error! Failed to get request token."))
frappe.throw(_('Invalid {0} or {1}').format(frappe.bold("Consumer Key"), frappe.bold("Consumer Secret Key")))
frappe.throw(
_("Invalid {0} or {1}").format(frappe.bold("Consumer Key"), frappe.bold("Consumer Secret Key"))
)
def get_access_token(self, oauth_token, oauth_verifier):
auth = tweepy.OAuthHandler(self.consumer_key, self.get_password(fieldname="consumer_secret"))
auth.request_token = {
'oauth_token' : oauth_token,
'oauth_token_secret' : oauth_verifier
}
auth.request_token = {"oauth_token": oauth_token, "oauth_token_secret": oauth_verifier}
try:
auth.get_access_token(oauth_verifier)
@@ -39,21 +43,25 @@ class TwitterSettings(Document):
self.access_token_secret = auth.access_token_secret
api = self.get_api()
user = api.me()
profile_pic = (user._json["profile_image_url"]).replace("_normal","")
profile_pic = (user._json["profile_image_url"]).replace("_normal", "")
frappe.db.set_value(self.doctype, self.name, {
"access_token" : auth.access_token,
"access_token_secret" : auth.access_token_secret,
"account_name" : user._json["screen_name"],
"profile_pic" : profile_pic,
"session_status" : "Active"
})
frappe.db.set_value(
self.doctype,
self.name,
{
"access_token": auth.access_token,
"access_token_secret": auth.access_token_secret,
"account_name": user._json["screen_name"],
"profile_pic": profile_pic,
"session_status": "Active",
},
)
frappe.local.response["type"] = "redirect"
frappe.local.response["location"] = get_url_to_form("Twitter Settings","Twitter Settings")
frappe.local.response["location"] = get_url_to_form("Twitter Settings", "Twitter Settings")
except TweepError as e:
frappe.msgprint(_("Error! Failed to get access token."))
frappe.throw(_('Invalid Consumer Key or Consumer Secret Key'))
frappe.throw(_("Invalid Consumer Key or Consumer Secret Key"))
def get_api(self):
# authentication of consumer key and secret
@@ -82,9 +90,9 @@ class TwitterSettings(Document):
api = self.get_api()
try:
if media_id:
response = api.update_status(status = text, media_ids = [media_id])
response = api.update_status(status=text, media_ids=[media_id])
else:
response = api.update_status(status = text)
response = api.update_status(status=text)
return response
@@ -113,15 +121,18 @@ class TwitterSettings(Document):
if e.response.status_code == 401:
self.db_set("session_status", "Expired")
frappe.db.commit()
frappe.throw(content["message"],title=_("Twitter Error {0} : {1}").format(e.response.status_code, e.response.reason))
frappe.throw(
content["message"],
title=_("Twitter Error {0} : {1}").format(e.response.status_code, e.response.reason),
)
@frappe.whitelist(allow_guest=True)
def callback(oauth_token = None, oauth_verifier = None):
def callback(oauth_token=None, oauth_verifier=None):
if oauth_token and oauth_verifier:
twitter_settings = frappe.get_single("Twitter Settings")
twitter_settings.get_access_token(oauth_token,oauth_verifier)
twitter_settings.get_access_token(oauth_token, oauth_verifier)
frappe.db.commit()
else:
frappe.local.response["type"] = "redirect"
frappe.local.response["location"] = get_url_to_form("Twitter Settings","Twitter Settings")
frappe.local.response["location"] = get_url_to_form("Twitter Settings", "Twitter Settings")

View File

@@ -4,16 +4,17 @@ import frappe
@frappe.whitelist()
def get_last_interaction(contact=None, lead=None):
if not contact and not lead: return
if not contact and not lead:
return
last_communication = None
last_issue = None
if contact:
query_condition = ''
query_condition = ""
values = []
contact = frappe.get_doc('Contact', contact)
contact = frappe.get_doc("Contact", contact)
for link in contact.links:
if link.link_doctype == 'Customer':
if link.link_doctype == "Customer":
last_issue = get_last_issue_from_customer(link.link_name)
query_condition += "(`reference_doctype`=%s AND `reference_name`=%s) OR"
values += [link.link_doctype, link.link_name]
@@ -21,65 +22,82 @@ def get_last_interaction(contact=None, lead=None):
if query_condition:
# remove extra appended 'OR'
query_condition = query_condition[:-2]
last_communication = frappe.db.sql("""
last_communication = frappe.db.sql(
"""
SELECT `name`, `content`
FROM `tabCommunication`
WHERE `sent_or_received`='Received'
AND ({})
ORDER BY `modified`
LIMIT 1
""".format(query_condition), values, as_dict=1) # nosec
""".format(
query_condition
),
values,
as_dict=1,
) # nosec
if lead:
last_communication = frappe.get_all('Communication', filters={
'reference_doctype': 'Lead',
'reference_name': lead,
'sent_or_received': 'Received'
}, fields=['name', 'content'], order_by='`creation` DESC', limit=1)
last_communication = frappe.get_all(
"Communication",
filters={"reference_doctype": "Lead", "reference_name": lead, "sent_or_received": "Received"},
fields=["name", "content"],
order_by="`creation` DESC",
limit=1,
)
last_communication = last_communication[0] if last_communication else None
return {
'last_communication': last_communication,
'last_issue': last_issue
}
return {"last_communication": last_communication, "last_issue": last_issue}
def get_last_issue_from_customer(customer_name):
issues = frappe.get_all('Issue', {
'customer': customer_name
}, ['name', 'subject', 'customer'], order_by='`creation` DESC', limit=1)
issues = frappe.get_all(
"Issue",
{"customer": customer_name},
["name", "subject", "customer"],
order_by="`creation` DESC",
limit=1,
)
return issues[0] if issues else None
def get_scheduled_employees_for_popup(communication_medium):
if not communication_medium: return []
if not communication_medium:
return []
now_time = frappe.utils.nowtime()
weekday = frappe.utils.get_weekday()
available_employee_groups = frappe.get_all("Communication Medium Timeslot", filters={
'day_of_week': weekday,
'parent': communication_medium,
'from_time': ['<=', now_time],
'to_time': ['>=', now_time],
}, fields=['employee_group'])
available_employee_groups = frappe.get_all(
"Communication Medium Timeslot",
filters={
"day_of_week": weekday,
"parent": communication_medium,
"from_time": ["<=", now_time],
"to_time": [">=", now_time],
},
fields=["employee_group"],
)
available_employee_groups = tuple([emp.employee_group for emp in available_employee_groups])
employees = frappe.get_all('Employee Group Table', filters={
'parent': ['in', available_employee_groups]
}, fields=['user_id'])
employees = frappe.get_all(
"Employee Group Table", filters={"parent": ["in", available_employee_groups]}, fields=["user_id"]
)
employee_emails = set([employee.user_id for employee in employees])
return employee_emails
def strip_number(number):
if not number: return
if not number:
return
# strip + and 0 from the start of the number for proper number comparisions
# eg. +7888383332 should match with 7888383332
# eg. 07888383332 should match with 7888383332
number = number.lstrip('+')
number = number.lstrip('0')
number = number.lstrip("+")
number = number.lstrip("0")
return number

View File

@@ -9,77 +9,40 @@ from frappe.utils import flt
def execute(filters=None):
columns, data = [], []
columns=get_columns("Campaign Name")
data=get_lead_data(filters or {}, "Campaign Name")
columns = get_columns("Campaign Name")
data = get_lead_data(filters or {}, "Campaign Name")
return columns, data
def get_columns(based_on):
return [
{
"fieldname": frappe.scrub(based_on),
"label": _(based_on),
"fieldtype": "Data",
"width": 150
},
{
"fieldname": "lead_count",
"label": _("Lead Count"),
"fieldtype": "Int",
"width": 80
},
{
"fieldname": "opp_count",
"label": _("Opp Count"),
"fieldtype": "Int",
"width": 80
},
{
"fieldname": "quot_count",
"label": _("Quot Count"),
"fieldtype": "Int",
"width": 80
},
{
"fieldname": "order_count",
"label": _("Order Count"),
"fieldtype": "Int",
"width": 100
},
{
"fieldname": "order_value",
"label": _("Order Value"),
"fieldtype": "Float",
"width": 100
},
{
"fieldname": "opp_lead",
"label": _("Opp/Lead %"),
"fieldtype": "Float",
"width": 100
},
{
"fieldname": "quot_lead",
"label": _("Quot/Lead %"),
"fieldtype": "Float",
"width": 100
},
{
"fieldname": "order_quot",
"label": _("Order/Quot %"),
"fieldtype": "Float",
"width": 100
}
{"fieldname": frappe.scrub(based_on), "label": _(based_on), "fieldtype": "Data", "width": 150},
{"fieldname": "lead_count", "label": _("Lead Count"), "fieldtype": "Int", "width": 80},
{"fieldname": "opp_count", "label": _("Opp Count"), "fieldtype": "Int", "width": 80},
{"fieldname": "quot_count", "label": _("Quot Count"), "fieldtype": "Int", "width": 80},
{"fieldname": "order_count", "label": _("Order Count"), "fieldtype": "Int", "width": 100},
{"fieldname": "order_value", "label": _("Order Value"), "fieldtype": "Float", "width": 100},
{"fieldname": "opp_lead", "label": _("Opp/Lead %"), "fieldtype": "Float", "width": 100},
{"fieldname": "quot_lead", "label": _("Quot/Lead %"), "fieldtype": "Float", "width": 100},
{"fieldname": "order_quot", "label": _("Order/Quot %"), "fieldtype": "Float", "width": 100},
]
def get_lead_data(filters, based_on):
based_on_field = frappe.scrub(based_on)
conditions = get_filter_conditions(filters)
lead_details = frappe.db.sql("""
lead_details = frappe.db.sql(
"""
select {based_on_field}, name
from `tabLead`
where {based_on_field} is not null and {based_on_field} != '' {conditions}
""".format(based_on_field=based_on_field, conditions=conditions), filters, as_dict=1)
""".format(
based_on_field=based_on_field, conditions=conditions
),
filters,
as_dict=1,
)
lead_map = frappe._dict()
for d in lead_details:
@@ -87,11 +50,8 @@ def get_lead_data(filters, based_on):
data = []
for based_on_value, leads in lead_map.items():
row = {
based_on_field: based_on_value,
"lead_count": len(leads)
}
row["quot_count"]= get_lead_quotation_count(leads)
row = {based_on_field: based_on_value, "lead_count": len(leads)}
row["quot_count"] = get_lead_quotation_count(leads)
row["opp_count"] = get_lead_opp_count(leads)
row["order_count"] = get_quotation_ordered_count(leads)
row["order_value"] = get_order_amount(leads) or 0
@@ -105,8 +65,9 @@ def get_lead_data(filters, based_on):
return data
def get_filter_conditions(filters):
conditions=""
conditions = ""
if filters.from_date:
conditions += " and date(creation) >= %(from_date)s"
if filters.to_date:
@@ -114,23 +75,45 @@ def get_filter_conditions(filters):
return conditions
def get_lead_quotation_count(leads):
return frappe.db.sql("""select count(name) from `tabQuotation`
where quotation_to = 'Lead' and party_name in (%s)""" % ', '.join(["%s"]*len(leads)), tuple(leads))[0][0] #nosec
return frappe.db.sql(
"""select count(name) from `tabQuotation`
where quotation_to = 'Lead' and party_name in (%s)"""
% ", ".join(["%s"] * len(leads)),
tuple(leads),
)[0][
0
] # nosec
def get_lead_opp_count(leads):
return frappe.db.sql("""select count(name) from `tabOpportunity`
where opportunity_from = 'Lead' and party_name in (%s)""" % ', '.join(["%s"]*len(leads)), tuple(leads))[0][0]
return frappe.db.sql(
"""select count(name) from `tabOpportunity`
where opportunity_from = 'Lead' and party_name in (%s)"""
% ", ".join(["%s"] * len(leads)),
tuple(leads),
)[0][0]
def get_quotation_ordered_count(leads):
return frappe.db.sql("""select count(name)
return frappe.db.sql(
"""select count(name)
from `tabQuotation` where status = 'Ordered' and quotation_to = 'Lead'
and party_name in (%s)""" % ', '.join(["%s"]*len(leads)), tuple(leads))[0][0]
and party_name in (%s)"""
% ", ".join(["%s"] * len(leads)),
tuple(leads),
)[0][0]
def get_order_amount(leads):
return frappe.db.sql("""select sum(base_net_amount)
return frappe.db.sql(
"""select sum(base_net_amount)
from `tabSales Order Item`
where prevdoc_docname in (
select name from `tabQuotation` where status = 'Ordered'
and quotation_to = 'Lead' and party_name in (%s)
)""" % ', '.join(["%s"]*len(leads)), tuple(leads))[0][0]
)"""
% ", ".join(["%s"] * len(leads)),
tuple(leads),
)[0][0]

View File

@@ -7,21 +7,17 @@ import frappe
def execute(filters=None):
columns = [
{"fieldname": "creation_date", "label": "Date", "fieldtype": "Date", "width": 300},
{
'fieldname': 'creation_date',
'label': 'Date',
'fieldtype': 'Date',
'width': 300
},
{
'fieldname': 'first_response_time',
'fieldtype': 'Duration',
'label': 'First Response Time',
'width': 300
"fieldname": "first_response_time",
"fieldtype": "Duration",
"label": "First Response Time",
"width": 300,
},
]
data = frappe.db.sql('''
data = frappe.db.sql(
"""
SELECT
date(creation) as creation_date,
avg(first_response_time) as avg_response_time
@@ -31,6 +27,8 @@ def execute(filters=None):
and first_response_time > 0
GROUP BY creation_date
ORDER BY creation_date desc
''', (filters.from_date, filters.to_date))
""",
(filters.from_date, filters.to_date),
)
return columns, data

View File

@@ -8,7 +8,8 @@ from frappe.utils import date_diff, flt
def execute(filters=None):
if not filters: filters = {}
if not filters:
filters = {}
communication_list = get_communication_details(filters)
columns = get_columns()
@@ -19,8 +20,12 @@ def execute(filters=None):
data = []
for communication in communication_list:
row = [communication.get('customer'), communication.get('interactions'),\
communication.get('duration'), communication.get('support_tickets')]
row = [
communication.get("customer"),
communication.get("interactions"),
communication.get("duration"),
communication.get("support_tickets"),
]
data.append(row)
# add the average row
@@ -32,9 +37,17 @@ def execute(filters=None):
total_interactions += row[1]
total_duration += row[2]
total_tickets += row[3]
data.append(['Average', total_interactions/len(data), total_duration/len(data), total_tickets/len(data)])
data.append(
[
"Average",
total_interactions / len(data),
total_duration / len(data),
total_tickets / len(data),
]
)
return columns, data
def get_columns():
return [
{
@@ -42,36 +55,37 @@ def get_columns():
"fieldname": "customer",
"fieldtype": "Link",
"options": "Customer",
"width": 120
"width": 120,
},
{
"label": _("No of Interactions"),
"fieldname": "interactions",
"fieldtype": "Float",
"width": 120
},
{
"label": _("Duration in Days"),
"fieldname": "duration",
"fieldtype": "Float",
"width": 120
"width": 120,
},
{"label": _("Duration in Days"), "fieldname": "duration", "fieldtype": "Float", "width": 120},
{
"label": _("Support Tickets"),
"fieldname": "support_tickets",
"fieldtype": "Float",
"width": 120
}
"width": 120,
},
]
def get_communication_details(filters):
communication_count = None
communication_list = []
opportunities = frappe.db.get_values('Opportunity', {'opportunity_from': 'Lead'},\
['name', 'customer_name', 'contact_email'], as_dict=1)
opportunities = frappe.db.get_values(
"Opportunity",
{"opportunity_from": "Lead"},
["name", "customer_name", "contact_email"],
as_dict=1,
)
for d in opportunities:
invoice = frappe.db.sql('''
invoice = frappe.db.sql(
"""
SELECT
date(creation)
FROM
@@ -81,22 +95,30 @@ def get_communication_details(filters):
ORDER BY
creation
LIMIT 1
''', (d.contact_email, filters.from_date, filters.to_date))
""",
(d.contact_email, filters.from_date, filters.to_date),
)
if not invoice: continue
if not invoice:
continue
communication_count = frappe.db.sql('''
communication_count = frappe.db.sql(
"""
SELECT
count(*)
FROM
`tabCommunication`
WHERE
sender = %s AND date(communication_date) <= %s
''', (d.contact_email, invoice))[0][0]
""",
(d.contact_email, invoice),
)[0][0]
if not communication_count: continue
if not communication_count:
continue
first_contact = frappe.db.sql('''
first_contact = frappe.db.sql(
"""
SELECT
date(communication_date)
FROM
@@ -106,10 +128,19 @@ def get_communication_details(filters):
ORDER BY
communication_date
LIMIT 1
''', (d.contact_email))[0][0]
""",
(d.contact_email),
)[0][0]
duration = flt(date_diff(invoice[0][0], first_contact))
support_tickets = len(frappe.db.get_all('Issue', {'raised_by': d.contact_email}))
communication_list.append({'customer': d.customer_name, 'interactions': communication_count, 'duration': duration, 'support_tickets': support_tickets})
support_tickets = len(frappe.db.get_all("Issue", {"raised_by": d.contact_email}))
communication_list.append(
{
"customer": d.customer_name,
"interactions": communication_count,
"duration": duration,
"support_tickets": support_tickets,
}
)
return communication_list

View File

@@ -10,6 +10,7 @@ def execute(filters=None):
columns, data = get_columns(), get_data(filters)
return columns, data
def get_columns():
columns = [
{
@@ -19,101 +20,57 @@ def get_columns():
"options": "Lead",
"width": 150,
},
{"label": _("Lead Name"), "fieldname": "lead_name", "fieldtype": "Data", "width": 120},
{"fieldname": "status", "label": _("Status"), "fieldtype": "Data", "width": 100},
{
"label": _("Lead Name"),
"fieldname": "lead_name",
"fieldtype": "Data",
"width": 120
},
{
"fieldname":"status",
"label": _("Status"),
"fieldtype": "Data",
"width": 100
},
{
"fieldname":"lead_owner",
"fieldname": "lead_owner",
"label": _("Lead Owner"),
"fieldtype": "Link",
"options": "User",
"width": 100
"width": 100,
},
{
"label": _("Territory"),
"fieldname": "territory",
"fieldtype": "Link",
"options": "Territory",
"width": 100
},
{
"label": _("Source"),
"fieldname": "source",
"fieldtype": "Data",
"width": 120
},
{
"label": _("Email"),
"fieldname": "email_id",
"fieldtype": "Data",
"width": 120
},
{
"label": _("Mobile"),
"fieldname": "mobile_no",
"fieldtype": "Data",
"width": 120
},
{
"label": _("Phone"),
"fieldname": "phone",
"fieldtype": "Data",
"width": 120
"width": 100,
},
{"label": _("Source"), "fieldname": "source", "fieldtype": "Data", "width": 120},
{"label": _("Email"), "fieldname": "email_id", "fieldtype": "Data", "width": 120},
{"label": _("Mobile"), "fieldname": "mobile_no", "fieldtype": "Data", "width": 120},
{"label": _("Phone"), "fieldname": "phone", "fieldtype": "Data", "width": 120},
{
"label": _("Owner"),
"fieldname": "owner",
"fieldtype": "Link",
"options": "user",
"width": 120
"width": 120,
},
{
"label": _("Company"),
"fieldname": "company",
"fieldtype": "Link",
"options": "Company",
"width": 120
"width": 120,
},
{"fieldname": "address", "label": _("Address"), "fieldtype": "Data", "width": 130},
{"fieldname": "state", "label": _("State"), "fieldtype": "Data", "width": 100},
{"fieldname": "pincode", "label": _("Postal Code"), "fieldtype": "Data", "width": 90},
{
"fieldname":"address",
"label": _("Address"),
"fieldtype": "Data",
"width": 130
},
{
"fieldname":"state",
"label": _("State"),
"fieldtype": "Data",
"width": 100
},
{
"fieldname":"pincode",
"label": _("Postal Code"),
"fieldtype": "Data",
"width": 90
},
{
"fieldname":"country",
"fieldname": "country",
"label": _("Country"),
"fieldtype": "Link",
"options": "Country",
"width": 100
"width": 100,
},
]
return columns
def get_data(filters):
return frappe.db.sql("""
return frappe.db.sql(
"""
SELECT
`tabLead`.name,
`tabLead`.lead_name,
@@ -144,9 +101,15 @@ def get_data(filters):
AND `tabLead`.creation BETWEEN %(from_date)s AND %(to_date)s
{conditions}
ORDER BY
`tabLead`.creation asc """.format(conditions=get_conditions(filters)), filters, as_dict=1)
`tabLead`.creation asc """.format(
conditions=get_conditions(filters)
),
filters,
as_dict=1,
)
def get_conditions(filters) :
def get_conditions(filters):
conditions = []
if filters.get("territory"):

View File

@@ -9,10 +9,11 @@ from erpnext.crm.report.campaign_efficiency.campaign_efficiency import get_lead_
def execute(filters=None):
columns, data = [], []
columns=get_columns()
data=get_lead_data(filters, "Lead Owner")
columns = get_columns()
data = get_lead_data(filters, "Lead Owner")
return columns, data
def get_columns():
return [
{
@@ -20,54 +21,14 @@ def get_columns():
"label": _("Lead Owner"),
"fieldtype": "Link",
"options": "User",
"width": "130"
"width": "130",
},
{
"fieldname": "lead_count",
"label": _("Lead Count"),
"fieldtype": "Int",
"width": "80"
},
{
"fieldname": "opp_count",
"label": _("Opp Count"),
"fieldtype": "Int",
"width": "80"
},
{
"fieldname": "quot_count",
"label": _("Quot Count"),
"fieldtype": "Int",
"width": "80"
},
{
"fieldname": "order_count",
"label": _("Order Count"),
"fieldtype": "Int",
"width": "100"
},
{
"fieldname": "order_value",
"label": _("Order Value"),
"fieldtype": "Float",
"width": "100"
},
{
"fieldname": "opp_lead",
"label": _("Opp/Lead %"),
"fieldtype": "Float",
"width": "100"
},
{
"fieldname": "quot_lead",
"label": _("Quot/Lead %"),
"fieldtype": "Float",
"width": "100"
},
{
"fieldname": "order_quot",
"label": _("Order/Quot %"),
"fieldtype": "Float",
"width": "100"
}
{"fieldname": "lead_count", "label": _("Lead Count"), "fieldtype": "Int", "width": "80"},
{"fieldname": "opp_count", "label": _("Opp Count"), "fieldtype": "Int", "width": "80"},
{"fieldname": "quot_count", "label": _("Quot Count"), "fieldtype": "Int", "width": "80"},
{"fieldname": "order_count", "label": _("Order Count"), "fieldtype": "Int", "width": "100"},
{"fieldname": "order_value", "label": _("Order Value"), "fieldtype": "Float", "width": "100"},
{"fieldname": "opp_lead", "label": _("Opp/Lead %"), "fieldtype": "Float", "width": "100"},
{"fieldname": "quot_lead", "label": _("Quot/Lead %"), "fieldtype": "Float", "width": "100"},
{"fieldname": "order_quot", "label": _("Order/Quot %"), "fieldtype": "Float", "width": "100"},
]

View File

@@ -10,6 +10,7 @@ def execute(filters=None):
columns, data = get_columns(), get_data(filters)
return columns, data
def get_columns():
columns = [
{
@@ -24,59 +25,56 @@ def get_columns():
"fieldname": "opportunity_from",
"fieldtype": "Link",
"options": "DocType",
"width": 130
"width": 130,
},
{
"label": _("Party"),
"fieldname":"party_name",
"fieldname": "party_name",
"fieldtype": "Dynamic Link",
"options": "opportunity_from",
"width": 160
"width": 160,
},
{
"label": _("Customer/Lead Name"),
"fieldname":"customer_name",
"fieldname": "customer_name",
"fieldtype": "Data",
"width": 150
"width": 150,
},
{
"label": _("Opportunity Type"),
"fieldname": "opportunity_type",
"fieldtype": "Data",
"width": 130
},
{
"label": _("Lost Reasons"),
"fieldname": "lost_reason",
"fieldtype": "Data",
"width": 220
"width": 130,
},
{"label": _("Lost Reasons"), "fieldname": "lost_reason", "fieldtype": "Data", "width": 220},
{
"label": _("Sales Stage"),
"fieldname": "sales_stage",
"fieldtype": "Link",
"options": "Sales Stage",
"width": 150
"width": 150,
},
{
"label": _("Territory"),
"fieldname": "territory",
"fieldtype": "Link",
"options": "Territory",
"width": 150
"width": 150,
},
{
"label": _("Next Contact By"),
"fieldname": "contact_by",
"fieldtype": "Link",
"options": "User",
"width": 150
}
"width": 150,
},
]
return columns
def get_data(filters):
return frappe.db.sql("""
return frappe.db.sql(
"""
SELECT
`tabOpportunity`.name,
`tabOpportunity`.opportunity_from,
@@ -97,7 +95,12 @@ def get_data(filters):
GROUP BY
`tabOpportunity`.name
ORDER BY
`tabOpportunity`.creation asc """.format(conditions=get_conditions(filters), join=get_join(filters)), filters, as_dict=1)
`tabOpportunity`.creation asc """.format(
conditions=get_conditions(filters), join=get_join(filters)
),
filters,
as_dict=1,
)
def get_conditions(filters):
@@ -117,6 +120,7 @@ def get_conditions(filters):
return " ".join(conditions) if conditions else ""
def get_join(filters):
join = """LEFT JOIN `tabOpportunity Lost Reason Detail`
ON `tabOpportunity Lost Reason Detail`.parenttype = 'Opportunity' and
@@ -127,6 +131,8 @@ def get_join(filters):
ON `tabOpportunity Lost Reason Detail`.parenttype = 'Opportunity' and
`tabOpportunity Lost Reason Detail`.parent = `tabOpportunity`.name and
`tabOpportunity Lost Reason Detail`.lost_reason = '{0}'
""".format(filters.get("lost_reason"))
""".format(
filters.get("lost_reason")
)
return join

View File

@@ -13,8 +13,9 @@ from erpnext.setup.utils import get_exchange_rate
def execute(filters=None):
return OpportunitySummaryBySalesStage(filters).run()
class OpportunitySummaryBySalesStage(object):
def __init__(self,filters=None):
def __init__(self, filters=None):
self.filters = frappe._dict(filters or {})
def run(self):
@@ -26,98 +27,94 @@ class OpportunitySummaryBySalesStage(object):
def get_columns(self):
self.columns = []
if self.filters.get('based_on') == 'Opportunity Owner':
self.columns.append({
'label': _('Opportunity Owner'),
'fieldname': 'opportunity_owner',
'width': 200
})
if self.filters.get("based_on") == "Opportunity Owner":
self.columns.append(
{"label": _("Opportunity Owner"), "fieldname": "opportunity_owner", "width": 200}
)
if self.filters.get('based_on') == 'Source':
self.columns.append({
'label': _('Source'),
'fieldname': 'source',
'fieldtype': 'Link',
'options': 'Lead Source',
'width': 200
})
if self.filters.get("based_on") == "Source":
self.columns.append(
{
"label": _("Source"),
"fieldname": "source",
"fieldtype": "Link",
"options": "Lead Source",
"width": 200,
}
)
if self.filters.get('based_on') == 'Opportunity Type':
self.columns.append({
'label': _('Opportunity Type'),
'fieldname': 'opportunity_type',
'width': 200
})
if self.filters.get("based_on") == "Opportunity Type":
self.columns.append(
{"label": _("Opportunity Type"), "fieldname": "opportunity_type", "width": 200}
)
self.set_sales_stage_columns()
def set_sales_stage_columns(self):
self.sales_stage_list = frappe.db.get_list('Sales Stage', pluck='name')
self.sales_stage_list = frappe.db.get_list("Sales Stage", pluck="name")
for sales_stage in self.sales_stage_list:
if self.filters.get('data_based_on') == 'Number':
self.columns.append({
'label': _(sales_stage),
'fieldname': sales_stage,
'fieldtype': 'Int',
'width': 150
})
if self.filters.get("data_based_on") == "Number":
self.columns.append(
{"label": _(sales_stage), "fieldname": sales_stage, "fieldtype": "Int", "width": 150}
)
elif self.filters.get('data_based_on') == 'Amount':
self.columns.append({
'label': _(sales_stage),
'fieldname': sales_stage,
'fieldtype': 'Currency',
'width': 150
})
elif self.filters.get("data_based_on") == "Amount":
self.columns.append(
{"label": _(sales_stage), "fieldname": sales_stage, "fieldtype": "Currency", "width": 150}
)
def get_data(self):
self.data = []
based_on = {
'Opportunity Owner': '_assign',
'Source': 'source',
'Opportunity Type': 'opportunity_type'
}[self.filters.get('based_on')]
"Opportunity Owner": "_assign",
"Source": "source",
"Opportunity Type": "opportunity_type",
}[self.filters.get("based_on")]
data_based_on = {
'Number': 'count(name) as count',
'Amount': 'opportunity_amount as amount',
}[self.filters.get('data_based_on')]
"Number": "count(name) as count",
"Amount": "opportunity_amount as amount",
}[self.filters.get("data_based_on")]
self.get_data_query(based_on, data_based_on)
self.get_rows()
def get_data_query(self, based_on, data_based_on):
if self.filters.get('data_based_on') == 'Number':
group_by = '{},{}'.format('sales_stage', based_on)
self.query_result = frappe.db.get_list('Opportunity',
if self.filters.get("data_based_on") == "Number":
group_by = "{},{}".format("sales_stage", based_on)
self.query_result = frappe.db.get_list(
"Opportunity",
filters=self.get_conditions(),
fields=['sales_stage', data_based_on, based_on],
group_by=group_by
fields=["sales_stage", data_based_on, based_on],
group_by=group_by,
)
elif self.filters.get('data_based_on') == 'Amount':
self.query_result = frappe.db.get_list('Opportunity',
elif self.filters.get("data_based_on") == "Amount":
self.query_result = frappe.db.get_list(
"Opportunity",
filters=self.get_conditions(),
fields=['sales_stage', based_on, data_based_on, 'currency']
fields=["sales_stage", based_on, data_based_on, "currency"],
)
self.convert_to_base_currency()
dataframe = pandas.DataFrame.from_records(self.query_result)
dataframe.replace(to_replace=[None], value='Not Assigned', inplace=True)
result = dataframe.groupby(['sales_stage', based_on], as_index=False)['amount'].sum()
dataframe.replace(to_replace=[None], value="Not Assigned", inplace=True)
result = dataframe.groupby(["sales_stage", based_on], as_index=False)["amount"].sum()
self.grouped_data = []
for i in range(len(result['amount'])):
self.grouped_data.append({
'sales_stage': result['sales_stage'][i],
based_on : result[based_on][i],
'amount': result['amount'][i]
})
for i in range(len(result["amount"])):
self.grouped_data.append(
{
"sales_stage": result["sales_stage"][i],
based_on: result[based_on][i],
"amount": result["amount"][i],
}
)
self.query_result = self.grouped_data
@@ -125,17 +122,17 @@ class OpportunitySummaryBySalesStage(object):
self.data = []
self.get_formatted_data()
for based_on,data in self.formatted_data.items():
row_based_on={
'Opportunity Owner': 'opportunity_owner',
'Source': 'source',
'Opportunity Type': 'opportunity_type'
}[self.filters.get('based_on')]
for based_on, data in self.formatted_data.items():
row_based_on = {
"Opportunity Owner": "opportunity_owner",
"Source": "source",
"Opportunity Type": "opportunity_type",
}[self.filters.get("based_on")]
row = {row_based_on: based_on}
for d in self.query_result:
sales_stage = d.get('sales_stage')
sales_stage = d.get("sales_stage")
row[sales_stage] = data.get(sales_stage)
self.data.append(row)
@@ -144,24 +141,21 @@ class OpportunitySummaryBySalesStage(object):
self.formatted_data = frappe._dict()
for d in self.query_result:
data_based_on ={
'Number': 'count',
'Amount': 'amount'
}[self.filters.get('data_based_on')]
data_based_on = {"Number": "count", "Amount": "amount"}[self.filters.get("data_based_on")]
based_on ={
'Opportunity Owner': '_assign',
'Source': 'source',
'Opportunity Type': 'opportunity_type'
}[self.filters.get('based_on')]
based_on = {
"Opportunity Owner": "_assign",
"Source": "source",
"Opportunity Type": "opportunity_type",
}[self.filters.get("based_on")]
if self.filters.get('based_on') == 'Opportunity Owner':
if d.get(based_on) == '[]' or d.get(based_on) is None or d.get(based_on) == 'Not Assigned':
assignments = ['Not Assigned']
if self.filters.get("based_on") == "Opportunity Owner":
if d.get(based_on) == "[]" or d.get(based_on) is None or d.get(based_on) == "Not Assigned":
assignments = ["Not Assigned"]
else:
assignments = json.loads(d.get(based_on))
sales_stage = d.get('sales_stage')
sales_stage = d.get("sales_stage")
count = d.get(data_based_on)
if assignments:
@@ -173,7 +167,7 @@ class OpportunitySummaryBySalesStage(object):
self.set_formatted_data_based_on_sales_stage(assigned_to, sales_stage, count)
else:
value = d.get(based_on)
sales_stage = d.get('sales_stage')
sales_stage = d.get("sales_stage")
count = d.get(data_based_on)
self.set_formatted_data_based_on_sales_stage(value, sales_stage, count)
@@ -184,20 +178,22 @@ class OpportunitySummaryBySalesStage(object):
def get_conditions(self):
filters = []
if self.filters.get('company'):
filters.append({'company': self.filters.get('company')})
if self.filters.get("company"):
filters.append({"company": self.filters.get("company")})
if self.filters.get('opportunity_type'):
filters.append({'opportunity_type': self.filters.get('opportunity_type')})
if self.filters.get("opportunity_type"):
filters.append({"opportunity_type": self.filters.get("opportunity_type")})
if self.filters.get('opportunity_source'):
filters.append({'source': self.filters.get('opportunity_source')})
if self.filters.get("opportunity_source"):
filters.append({"source": self.filters.get("opportunity_source")})
if self.filters.get('status'):
filters.append({'status': ('in',self.filters.get('status'))})
if self.filters.get("status"):
filters.append({"status": ("in", self.filters.get("status"))})
if self.filters.get('from_date') and self.filters.get('to_date'):
filters.append(['transaction_date', 'between', [self.filters.get('from_date'), self.filters.get('to_date')]])
if self.filters.get("from_date") and self.filters.get("to_date"):
filters.append(
["transaction_date", "between", [self.filters.get("from_date"), self.filters.get("to_date")]]
)
return filters
@@ -209,45 +205,36 @@ class OpportunitySummaryBySalesStage(object):
for sales_stage in self.sales_stage_list:
labels.append(sales_stage)
options = {
'Number': 'count',
'Amount': 'amount'
}[self.filters.get('data_based_on')]
options = {"Number": "count", "Amount": "amount"}[self.filters.get("data_based_on")]
for data in self.query_result:
for count in range(len(values)):
if data['sales_stage'] == labels[count]:
if data["sales_stage"] == labels[count]:
values[count] = values[count] + data[options]
datasets.append({'name':options, 'values':values})
datasets.append({"name": options, "values": values})
self.chart = {
'data':{
'labels': labels,
'datasets': datasets
},
'type':'line'
}
self.chart = {"data": {"labels": labels, "datasets": datasets}, "type": "line"}
def currency_conversion(self,from_currency,to_currency):
def currency_conversion(self, from_currency, to_currency):
cacheobj = frappe.cache()
if cacheobj.get(from_currency):
return flt(str(cacheobj.get(from_currency),'UTF-8'))
return flt(str(cacheobj.get(from_currency), "UTF-8"))
else:
value = get_exchange_rate(from_currency,to_currency)
cacheobj.set(from_currency,value)
return flt(str(cacheobj.get(from_currency),'UTF-8'))
value = get_exchange_rate(from_currency, to_currency)
cacheobj.set(from_currency, value)
return flt(str(cacheobj.get(from_currency), "UTF-8"))
def get_default_currency(self):
company = self.filters.get('company')
return frappe.db.get_value('Company', company, 'default_currency')
company = self.filters.get("company")
return frappe.db.get_value("Company", company, "default_currency")
def convert_to_base_currency(self):
default_currency = self.get_default_currency()
for data in self.query_result:
if data.get('currency') != default_currency:
opportunity_currency = data.get('currency')
value = self.currency_conversion(opportunity_currency,default_currency)
data['amount'] = data['amount'] * value
if data.get("currency") != default_currency:
opportunity_currency = data.get("currency")
value = self.currency_conversion(opportunity_currency, default_currency)
data["amount"] = data["amount"] * value

View File

@@ -27,68 +27,44 @@ class TestOpportunitySummaryBySalesStage(unittest.TestCase):
self.check_all_filters()
def check_for_opportunity_owner(self):
filters = {
'based_on': "Opportunity Owner",
'data_based_on': "Number",
'company': "Best Test"
}
filters = {"based_on": "Opportunity Owner", "data_based_on": "Number", "company": "Best Test"}
report = execute(filters)
expected_data = [{
'opportunity_owner': "Not Assigned",
'Prospecting': 1
}]
expected_data = [{"opportunity_owner": "Not Assigned", "Prospecting": 1}]
self.assertEqual(expected_data, report[1])
def check_for_source(self):
filters = {
'based_on': "Source",
'data_based_on': "Number",
'company': "Best Test"
}
filters = {"based_on": "Source", "data_based_on": "Number", "company": "Best Test"}
report = execute(filters)
expected_data = [{
'source': 'Cold Calling',
'Prospecting': 1
}]
expected_data = [{"source": "Cold Calling", "Prospecting": 1}]
self.assertEqual(expected_data, report[1])
def check_for_opportunity_type(self):
filters = {
'based_on': "Opportunity Type",
'data_based_on': "Number",
'company': "Best Test"
}
filters = {"based_on": "Opportunity Type", "data_based_on": "Number", "company": "Best Test"}
report = execute(filters)
expected_data = [{
'opportunity_type': 'Sales',
'Prospecting': 1
}]
expected_data = [{"opportunity_type": "Sales", "Prospecting": 1}]
self.assertEqual(expected_data, report[1])
def check_all_filters(self):
filters = {
'based_on': "Opportunity Type",
'data_based_on': "Number",
'company': "Best Test",
'opportunity_source': "Cold Calling",
'opportunity_type': "Sales",
'status': ["Open"]
"based_on": "Opportunity Type",
"data_based_on": "Number",
"company": "Best Test",
"opportunity_source": "Cold Calling",
"opportunity_type": "Sales",
"status": ["Open"],
}
report = execute(filters)
expected_data = [{
'opportunity_type': 'Sales',
'Prospecting': 1
}]
expected_data = [{"opportunity_type": "Sales", "Prospecting": 1}]
self.assertEqual(expected_data, report[1])
self.assertEqual(expected_data, report[1])

View File

@@ -15,62 +15,58 @@ def execute(filters=None):
return columns, data
def set_defaut_value_for_filters(filters):
if not filters.get('no_of_interaction'): filters["no_of_interaction"] = 1
if not filters.get('lead_age'): filters["lead_age"] = 60
if not filters.get("no_of_interaction"):
filters["no_of_interaction"] = 1
if not filters.get("lead_age"):
filters["lead_age"] = 60
def get_columns():
columns = [{
"label": _("Lead"),
"fieldname": "lead",
"fieldtype": "Link",
"options": "Lead",
"width": 130
},
{
"label": _("Name"),
"fieldname": "name",
"width": 120
},
{
"label": _("Organization"),
"fieldname": "organization",
"width": 120
},
columns = [
{"label": _("Lead"), "fieldname": "lead", "fieldtype": "Link", "options": "Lead", "width": 130},
{"label": _("Name"), "fieldname": "name", "width": 120},
{"label": _("Organization"), "fieldname": "organization", "width": 120},
{
"label": _("Reference Document Type"),
"fieldname": "reference_document_type",
"fieldtype": "Link",
"options": "Doctype",
"width": 100
"width": 100,
},
{
"label": _("Reference Name"),
"fieldname": "reference_name",
"fieldtype": "Dynamic Link",
"options": "reference_document_type",
"width": 140
"width": 140,
},
{
"label": _("Last Communication"),
"fieldname": "last_communication",
"fieldtype": "Data",
"width": 200
"width": 200,
},
{
"label": _("Last Communication Date"),
"fieldname": "last_communication_date",
"fieldtype": "Date",
"width": 100
}]
"width": 100,
},
]
return columns
def get_data(filters):
lead_details = []
lead_filters = get_lead_filters(filters)
for lead in frappe.get_all('Lead', fields = ['name', 'lead_name', 'company_name'], filters=lead_filters):
data = frappe.db.sql("""
for lead in frappe.get_all(
"Lead", fields=["name", "lead_name", "company_name"], filters=lead_filters
):
data = frappe.db.sql(
"""
select
`tabCommunication`.reference_doctype, `tabCommunication`.reference_name,
`tabCommunication`.content, `tabCommunication`.communication_date
@@ -90,7 +86,8 @@ def get_data(filters):
`tabCommunication`.sent_or_received = 'Received'
order by
ref_document.lead, `tabCommunication`.creation desc limit %(limit)s""",
{'lead': lead.name, 'limit': filters.get('no_of_interaction')})
{"lead": lead.name, "limit": filters.get("no_of_interaction")},
)
for lead_info in data:
lead_data = [lead.name, lead.lead_name, lead.company_name] + list(lead_info)
@@ -98,13 +95,15 @@ def get_data(filters):
return lead_details
def get_lead_filters(filters):
lead_creation_date = get_creation_date_based_on_lead_age(filters)
lead_filters = [["status", "!=", "Converted"], ["creation", ">", lead_creation_date]]
if filters.get('lead'):
lead_filters.append(["name", "=", filters.get('lead')])
if filters.get("lead"):
lead_filters.append(["name", "=", filters.get("lead")])
return lead_filters
def get_creation_date_based_on_lead_age(filters):
return add_days(now(), (filters.get('lead_age') * -1))
return add_days(now(), (filters.get("lead_age") * -1))

View File

@@ -16,6 +16,7 @@ from erpnext.setup.utils import get_exchange_rate
def execute(filters=None):
return SalesPipelineAnalytics(filters).run()
class SalesPipelineAnalytics(object):
def __init__(self, filters=None):
self.filters = frappe._dict(filters or {})
@@ -34,113 +35,94 @@ class SalesPipelineAnalytics(object):
self.set_pipeline_based_on_column()
def set_range_columns(self):
based_on = {
'Number': 'Int',
'Amount': 'Currency'
}[self.filters.get('based_on')]
based_on = {"Number": "Int", "Amount": "Currency"}[self.filters.get("based_on")]
if self.filters.get('range') == 'Monthly':
if self.filters.get("range") == "Monthly":
month_list = self.get_month_list()
for month in month_list:
self.columns.append({
'fieldname': month,
'fieldtype': based_on,
'label': month,
'width': 200
})
self.columns.append({"fieldname": month, "fieldtype": based_on, "label": month, "width": 200})
elif self.filters.get('range') == 'Quarterly':
elif self.filters.get("range") == "Quarterly":
for quarter in range(1, 5):
self.columns.append({
'fieldname': f'Q{quarter}',
'fieldtype': based_on,
'label': f'Q{quarter}',
'width': 200
})
self.columns.append(
{"fieldname": f"Q{quarter}", "fieldtype": based_on, "label": f"Q{quarter}", "width": 200}
)
def set_pipeline_based_on_column(self):
if self.filters.get('pipeline_by') == 'Owner':
self.columns.insert(0, {
'fieldname': 'opportunity_owner',
'label': _('Opportunity Owner'),
'width': 200
})
if self.filters.get("pipeline_by") == "Owner":
self.columns.insert(
0, {"fieldname": "opportunity_owner", "label": _("Opportunity Owner"), "width": 200}
)
elif self.filters.get('pipeline_by') == 'Sales Stage':
self.columns.insert(0, {
'fieldname': 'sales_stage',
'label': _('Sales Stage'),
'width': 200
})
elif self.filters.get("pipeline_by") == "Sales Stage":
self.columns.insert(0, {"fieldname": "sales_stage", "label": _("Sales Stage"), "width": 200})
def get_fields(self):
self.based_on ={
'Owner': '_assign as opportunity_owner',
'Sales Stage': 'sales_stage'
}[self.filters.get('pipeline_by')]
self.based_on = {"Owner": "_assign as opportunity_owner", "Sales Stage": "sales_stage"}[
self.filters.get("pipeline_by")
]
self.data_based_on ={
'Number': 'count(name) as count',
'Amount': 'opportunity_amount as amount'
}[self.filters.get('based_on')]
self.data_based_on = {
"Number": "count(name) as count",
"Amount": "opportunity_amount as amount",
}[self.filters.get("based_on")]
self.group_by_based_on = {
'Owner': '_assign',
'Sales Stage': 'sales_stage'
}[self.filters.get('pipeline_by')]
self.group_by_based_on = {"Owner": "_assign", "Sales Stage": "sales_stage"}[
self.filters.get("pipeline_by")
]
self.group_by_period = {
'Monthly': 'month(expected_closing)',
'Quarterly': 'QUARTER(expected_closing)'
}[self.filters.get('range')]
"Monthly": "month(expected_closing)",
"Quarterly": "QUARTER(expected_closing)",
}[self.filters.get("range")]
self.pipeline_by = {
'Owner': 'opportunity_owner',
'Sales Stage': 'sales_stage'
}[self.filters.get('pipeline_by')]
self.pipeline_by = {"Owner": "opportunity_owner", "Sales Stage": "sales_stage"}[
self.filters.get("pipeline_by")
]
self.duration = {
'Monthly': 'monthname(expected_closing) as month',
'Quarterly': 'QUARTER(expected_closing) as quarter'
}[self.filters.get('range')]
"Monthly": "monthname(expected_closing) as month",
"Quarterly": "QUARTER(expected_closing) as quarter",
}[self.filters.get("range")]
self.period_by = {
'Monthly': 'month',
'Quarterly': 'quarter'
}[self.filters.get('range')]
self.period_by = {"Monthly": "month", "Quarterly": "quarter"}[self.filters.get("range")]
def get_data(self):
self.get_fields()
if self.filters.get('based_on') == 'Number':
self.query_result = frappe.db.get_list('Opportunity',
if self.filters.get("based_on") == "Number":
self.query_result = frappe.db.get_list(
"Opportunity",
filters=self.get_conditions(),
fields=[self.based_on, self.data_based_on, self.duration],
group_by='{},{}'.format(self.group_by_based_on, self.group_by_period),
order_by=self.group_by_period
group_by="{},{}".format(self.group_by_based_on, self.group_by_period),
order_by=self.group_by_period,
)
if self.filters.get('based_on') == 'Amount':
self.query_result = frappe.db.get_list('Opportunity',
if self.filters.get("based_on") == "Amount":
self.query_result = frappe.db.get_list(
"Opportunity",
filters=self.get_conditions(),
fields=[self.based_on, self.data_based_on, self.duration, 'currency']
fields=[self.based_on, self.data_based_on, self.duration, "currency"],
)
self.convert_to_base_currency()
dataframe = pandas.DataFrame.from_records(self.query_result)
dataframe.replace(to_replace=[None], value='Not Assigned', inplace=True)
result = dataframe.groupby([self.pipeline_by, self.period_by], as_index=False)['amount'].sum()
dataframe.replace(to_replace=[None], value="Not Assigned", inplace=True)
result = dataframe.groupby([self.pipeline_by, self.period_by], as_index=False)["amount"].sum()
self.grouped_data = []
for i in range(len(result['amount'])):
self.grouped_data.append({
self.pipeline_by : result[self.pipeline_by][i],
self.period_by : result[self.period_by][i],
'amount': result['amount'][i]
})
for i in range(len(result["amount"])):
self.grouped_data.append(
{
self.pipeline_by: result[self.pipeline_by][i],
self.period_by: result[self.period_by][i],
"amount": result["amount"][i],
}
)
self.query_result = self.grouped_data
@@ -150,21 +132,22 @@ class SalesPipelineAnalytics(object):
def get_conditions(self):
conditions = []
if self.filters.get('opportunity_source'):
conditions.append({'source': self.filters.get('opportunity_source')})
if self.filters.get("opportunity_source"):
conditions.append({"source": self.filters.get("opportunity_source")})
if self.filters.get('opportunity_type'):
conditions.append({'opportunity_type': self.filters.get('opportunity_type')})
if self.filters.get("opportunity_type"):
conditions.append({"opportunity_type": self.filters.get("opportunity_type")})
if self.filters.get('status'):
conditions.append({'status': self.filters.get('status')})
if self.filters.get("status"):
conditions.append({"status": self.filters.get("status")})
if self.filters.get('company'):
conditions.append({'company': self.filters.get('company')})
if self.filters.get("company"):
conditions.append({"company": self.filters.get("company")})
if self.filters.get('from_date') and self.filters.get('to_date'):
conditions.append(['expected_closing', 'between',
[self.filters.get('from_date'), self.filters.get('to_date')]])
if self.filters.get("from_date") and self.filters.get("to_date"):
conditions.append(
["expected_closing", "between", [self.filters.get("from_date"), self.filters.get("to_date")]]
)
return conditions
@@ -175,49 +158,36 @@ class SalesPipelineAnalytics(object):
self.append_to_dataset(datasets)
for column in self.columns:
if column['fieldname'] != 'opportunity_owner' and column['fieldname'] != 'sales_stage':
labels.append(column['fieldname'])
if column["fieldname"] != "opportunity_owner" and column["fieldname"] != "sales_stage":
labels.append(column["fieldname"])
self.chart = {
'data':{
'labels': labels,
'datasets': datasets
},
'type':'line'
}
self.chart = {"data": {"labels": labels, "datasets": datasets}, "type": "line"}
return self.chart
def get_periodic_data(self):
self.periodic_data = frappe._dict()
based_on = {
'Number': 'count',
'Amount': 'amount'
}[self.filters.get('based_on')]
based_on = {"Number": "count", "Amount": "amount"}[self.filters.get("based_on")]
pipeline_by = {
'Owner': 'opportunity_owner',
'Sales Stage': 'sales_stage'
}[self.filters.get('pipeline_by')]
pipeline_by = {"Owner": "opportunity_owner", "Sales Stage": "sales_stage"}[
self.filters.get("pipeline_by")
]
frequency = {
'Monthly': 'month',
'Quarterly': 'quarter'
}[self.filters.get('range')]
frequency = {"Monthly": "month", "Quarterly": "quarter"}[self.filters.get("range")]
for info in self.query_result:
if self.filters.get('range') == 'Monthly':
if self.filters.get("range") == "Monthly":
period = info.get(frequency)
if self.filters.get('range') == 'Quarterly':
if self.filters.get("range") == "Quarterly":
period = f'Q{cint(info.get("quarter"))}'
value = info.get(pipeline_by)
count_or_amount = info.get(based_on)
if self.filters.get('pipeline_by') == 'Owner':
if value == 'Not Assigned' or value == '[]' or value is None:
assigned_to = ['Not Assigned']
if self.filters.get("pipeline_by") == "Owner":
if value == "Not Assigned" or value == "[]" or value is None:
assigned_to = ["Not Assigned"]
else:
assigned_to = json.loads(value)
self.check_for_assigned_to(period, value, count_or_amount, assigned_to, info)
@@ -228,9 +198,9 @@ class SalesPipelineAnalytics(object):
def set_formatted_data(self, period, value, count_or_amount, assigned_to):
if assigned_to:
if len(assigned_to) > 1:
if self.filters.get('assigned_to'):
if self.filters.get("assigned_to"):
for user in assigned_to:
if self.filters.get('assigned_to') == user:
if self.filters.get("assigned_to") == user:
value = user
self.periodic_data.setdefault(value, frappe._dict()).setdefault(period, 0)
self.periodic_data[value][period] += count_or_amount
@@ -249,40 +219,34 @@ class SalesPipelineAnalytics(object):
self.periodic_data[value][period] += count_or_amount
def check_for_assigned_to(self, period, value, count_or_amount, assigned_to, info):
if self.filters.get('assigned_to'):
for data in json.loads(info.get('opportunity_owner')):
if data == self.filters.get('assigned_to'):
if self.filters.get("assigned_to"):
for data in json.loads(info.get("opportunity_owner")):
if data == self.filters.get("assigned_to"):
self.set_formatted_data(period, data, count_or_amount, assigned_to)
else:
self.set_formatted_data(period, value, count_or_amount, assigned_to)
def get_month_list(self):
month_list= []
month_list = []
current_date = date.today()
month_number = date.today().month
for month in range(month_number,13):
month_list.append(current_date.strftime('%B'))
for month in range(month_number, 13):
month_list.append(current_date.strftime("%B"))
current_date = current_date + relativedelta(months=1)
return month_list
def append_to_dataset(self, datasets):
range_by = {
'Monthly': 'month',
'Quarterly': 'quarter'
}[self.filters.get('range')]
range_by = {"Monthly": "month", "Quarterly": "quarter"}[self.filters.get("range")]
based_on = {
'Amount': 'amount',
'Number': 'count'
}[self.filters.get('based_on')]
based_on = {"Amount": "amount", "Number": "count"}[self.filters.get("based_on")]
if self.filters.get('range') == 'Quarterly':
frequency_list = [1,2,3,4]
if self.filters.get("range") == "Quarterly":
frequency_list = [1, 2, 3, 4]
count = [0] * 4
if self.filters.get('range') == 'Monthly':
if self.filters.get("range") == "Monthly":
frequency_list = self.get_month_list()
count = [0] * 12
@@ -290,43 +254,43 @@ class SalesPipelineAnalytics(object):
for i in range(len(frequency_list)):
if info[range_by] == frequency_list[i]:
count[i] = count[i] + info[based_on]
datasets.append({'name': based_on, 'values': count})
datasets.append({"name": based_on, "values": count})
def append_data(self, pipeline_by, period_by):
self.data = []
for pipeline,period_data in self.periodic_data.items():
row = {pipeline_by : pipeline}
for pipeline, period_data in self.periodic_data.items():
row = {pipeline_by: pipeline}
for info in self.query_result:
if self.filters.get('range') == 'Monthly':
if self.filters.get("range") == "Monthly":
period = info.get(period_by)
if self.filters.get('range') == 'Quarterly':
period = f'Q{cint(info.get(period_by))}'
if self.filters.get("range") == "Quarterly":
period = f"Q{cint(info.get(period_by))}"
count = period_data.get(period,0.0)
count = period_data.get(period, 0.0)
row[period] = count
self.data.append(row)
def get_default_currency(self):
company = self.filters.get('company')
return frappe.db.get_value('Company',company,['default_currency'])
company = self.filters.get("company")
return frappe.db.get_value("Company", company, ["default_currency"])
def get_currency_rate(self, from_currency, to_currency):
cacheobj = frappe.cache()
if cacheobj.get(from_currency):
return flt(str(cacheobj.get(from_currency),'UTF-8'))
return flt(str(cacheobj.get(from_currency), "UTF-8"))
else:
value = get_exchange_rate(from_currency, to_currency)
cacheobj.set(from_currency, value)
return flt(str(cacheobj.get(from_currency),'UTF-8'))
return flt(str(cacheobj.get(from_currency), "UTF-8"))
def convert_to_base_currency(self):
default_currency = self.get_default_currency()
for data in self.query_result:
if data.get('currency') != default_currency:
opportunity_currency = data.get('currency')
value = self.get_currency_rate(opportunity_currency,default_currency)
data['amount'] = data['amount'] * value
if data.get("currency") != default_currency:
opportunity_currency = data.get("currency")
value = self.get_currency_rate(opportunity_currency, default_currency)
data["amount"] = data["amount"] * value

View File

@@ -22,217 +22,175 @@ class TestSalesPipelineAnalytics(unittest.TestCase):
def check_for_monthly_and_number(self):
filters = {
'pipeline_by':"Owner",
'range':"Monthly",
'based_on':"Number",
'status':"Open",
'opportunity_type':"Sales",
'company':"Best Test"
"pipeline_by": "Owner",
"range": "Monthly",
"based_on": "Number",
"status": "Open",
"opportunity_type": "Sales",
"company": "Best Test",
}
report = execute(filters)
expected_data = [
{
'opportunity_owner':'Not Assigned',
'August':1
}
]
expected_data = [{"opportunity_owner": "Not Assigned", "August": 1}]
self.assertEqual(expected_data,report[1])
self.assertEqual(expected_data, report[1])
filters = {
'pipeline_by':"Sales Stage",
'range':"Monthly",
'based_on':"Number",
'status':"Open",
'opportunity_type':"Sales",
'company':"Best Test"
"pipeline_by": "Sales Stage",
"range": "Monthly",
"based_on": "Number",
"status": "Open",
"opportunity_type": "Sales",
"company": "Best Test",
}
report = execute(filters)
expected_data = [
{
'sales_stage':'Prospecting',
'August':1
}
]
expected_data = [{"sales_stage": "Prospecting", "August": 1}]
self.assertEqual(expected_data,report[1])
self.assertEqual(expected_data, report[1])
def check_for_monthly_and_amount(self):
filters = {
'pipeline_by':"Owner",
'range':"Monthly",
'based_on':"Amount",
'status':"Open",
'opportunity_type':"Sales",
'company':"Best Test"
"pipeline_by": "Owner",
"range": "Monthly",
"based_on": "Amount",
"status": "Open",
"opportunity_type": "Sales",
"company": "Best Test",
}
report = execute(filters)
expected_data = [
{
'opportunity_owner':'Not Assigned',
'August':150000
}
]
expected_data = [{"opportunity_owner": "Not Assigned", "August": 150000}]
self.assertEqual(expected_data,report[1])
self.assertEqual(expected_data, report[1])
filters = {
'pipeline_by':"Sales Stage",
'range':"Monthly",
'based_on':"Amount",
'status':"Open",
'opportunity_type':"Sales",
'company':"Best Test"
"pipeline_by": "Sales Stage",
"range": "Monthly",
"based_on": "Amount",
"status": "Open",
"opportunity_type": "Sales",
"company": "Best Test",
}
report = execute(filters)
expected_data = [
{
'sales_stage':'Prospecting',
'August':150000
}
]
expected_data = [{"sales_stage": "Prospecting", "August": 150000}]
self.assertEqual(expected_data,report[1])
self.assertEqual(expected_data, report[1])
def check_for_quarterly_and_number(self):
filters = {
'pipeline_by':"Owner",
'range':"Quarterly",
'based_on':"Number",
'status':"Open",
'opportunity_type':"Sales",
'company':"Best Test"
"pipeline_by": "Owner",
"range": "Quarterly",
"based_on": "Number",
"status": "Open",
"opportunity_type": "Sales",
"company": "Best Test",
}
report = execute(filters)
expected_data = [
{
'opportunity_owner':'Not Assigned',
'Q3':1
}
]
expected_data = [{"opportunity_owner": "Not Assigned", "Q3": 1}]
self.assertEqual(expected_data,report[1])
self.assertEqual(expected_data, report[1])
filters = {
'pipeline_by':"Sales Stage",
'range':"Quarterly",
'based_on':"Number",
'status':"Open",
'opportunity_type':"Sales",
'company':"Best Test"
"pipeline_by": "Sales Stage",
"range": "Quarterly",
"based_on": "Number",
"status": "Open",
"opportunity_type": "Sales",
"company": "Best Test",
}
report = execute(filters)
expected_data = [
{
'sales_stage':'Prospecting',
'Q3':1
}
]
expected_data = [{"sales_stage": "Prospecting", "Q3": 1}]
self.assertEqual(expected_data,report[1])
self.assertEqual(expected_data, report[1])
def check_for_quarterly_and_amount(self):
filters = {
'pipeline_by':"Owner",
'range':"Quarterly",
'based_on':"Amount",
'status':"Open",
'opportunity_type':"Sales",
'company':"Best Test"
"pipeline_by": "Owner",
"range": "Quarterly",
"based_on": "Amount",
"status": "Open",
"opportunity_type": "Sales",
"company": "Best Test",
}
report = execute(filters)
expected_data = [
{
'opportunity_owner':'Not Assigned',
'Q3':150000
}
]
expected_data = [{"opportunity_owner": "Not Assigned", "Q3": 150000}]
self.assertEqual(expected_data,report[1])
self.assertEqual(expected_data, report[1])
filters = {
'pipeline_by':"Sales Stage",
'range':"Quarterly",
'based_on':"Amount",
'status':"Open",
'opportunity_type':"Sales",
'company':"Best Test"
"pipeline_by": "Sales Stage",
"range": "Quarterly",
"based_on": "Amount",
"status": "Open",
"opportunity_type": "Sales",
"company": "Best Test",
}
report = execute(filters)
expected_data = [
{
'sales_stage':'Prospecting',
'Q3':150000
}
]
expected_data = [{"sales_stage": "Prospecting", "Q3": 150000}]
self.assertEqual(expected_data,report[1])
self.assertEqual(expected_data, report[1])
def check_for_all_filters(self):
filters = {
'pipeline_by':"Owner",
'range':"Monthly",
'based_on':"Number",
'status':"Open",
'opportunity_type':"Sales",
'company':"Best Test",
'opportunity_source':'Cold Calling',
'from_date': '2021-08-01',
'to_date':'2021-08-31'
"pipeline_by": "Owner",
"range": "Monthly",
"based_on": "Number",
"status": "Open",
"opportunity_type": "Sales",
"company": "Best Test",
"opportunity_source": "Cold Calling",
"from_date": "2021-08-01",
"to_date": "2021-08-31",
}
report = execute(filters)
expected_data = [
{
'opportunity_owner':'Not Assigned',
'August': 1
}
]
expected_data = [{"opportunity_owner": "Not Assigned", "August": 1}]
self.assertEqual(expected_data, report[1])
self.assertEqual(expected_data,report[1])
def create_company():
doc = frappe.db.exists('Company','Best Test')
doc = frappe.db.exists("Company", "Best Test")
if not doc:
doc = frappe.new_doc('Company')
doc.company_name = 'Best Test'
doc = frappe.new_doc("Company")
doc.company_name = "Best Test"
doc.default_currency = "INR"
doc.insert()
def create_customer():
doc = frappe.db.exists("Customer","_Test NC")
doc = frappe.db.exists("Customer", "_Test NC")
if not doc:
doc = frappe.new_doc("Customer")
doc.customer_name = '_Test NC'
doc.customer_name = "_Test NC"
doc.insert()
def create_opportunity():
doc = frappe.db.exists({"doctype":"Opportunity","party_name":"_Test NC"})
doc = frappe.db.exists({"doctype": "Opportunity", "party_name": "_Test NC"})
if not doc:
doc = frappe.new_doc("Opportunity")
doc.opportunity_from = "Customer"
customer_name = frappe.db.get_value("Customer",{"customer_name":'_Test NC'},['customer_name'])
customer_name = frappe.db.get_value("Customer", {"customer_name": "_Test NC"}, ["customer_name"])
doc.party_name = customer_name
doc.opportunity_amount = 150000
doc.source = "Cold Calling"
doc.currency = "INR"
doc.expected_closing = "2021-08-31"
doc.company = 'Best Test'
doc.insert()
doc.company = "Best Test"
doc.insert()

View File

@@ -9,12 +9,16 @@ def update_lead_phone_numbers(contact, method):
if len(contact.phone_nos) > 1:
# get the default phone number
primary_phones = [phone_doc.phone for phone_doc in contact.phone_nos if phone_doc.is_primary_phone]
primary_phones = [
phone_doc.phone for phone_doc in contact.phone_nos if phone_doc.is_primary_phone
]
if primary_phones:
phone = primary_phones[0]
# get the default mobile number
primary_mobile_nos = [phone_doc.phone for phone_doc in contact.phone_nos if phone_doc.is_primary_mobile_no]
primary_mobile_nos = [
phone_doc.phone for phone_doc in contact.phone_nos if phone_doc.is_primary_mobile_no
]
if primary_mobile_nos:
mobile_no = primary_mobile_nos[0]
@@ -22,15 +26,21 @@ def update_lead_phone_numbers(contact, method):
lead.db_set("phone", phone)
lead.db_set("mobile_no", mobile_no)
def copy_comments(doctype, docname, doc):
comments = frappe.db.get_values("Comment", filters={"reference_doctype": doctype, "reference_name": docname, "comment_type": "Comment"}, fieldname="*")
comments = frappe.db.get_values(
"Comment",
filters={"reference_doctype": doctype, "reference_name": docname, "comment_type": "Comment"},
fieldname="*",
)
for comment in comments:
comment = frappe.get_doc(comment.update({"doctype":"Comment"}))
comment = frappe.get_doc(comment.update({"doctype": "Comment"}))
comment.name = None
comment.reference_doctype = doc.doctype
comment.reference_name = doc.name
comment.insert()
def add_link_in_communication(doctype, docname, doc):
communication_list = get_linked_communication_list(doctype, docname)
@@ -38,13 +48,15 @@ def add_link_in_communication(doctype, docname, doc):
communication_doc = frappe.get_doc("Communication", communication)
communication_doc.add_link(doc.doctype, doc.name, autosave=True)
def get_linked_communication_list(doctype, docname):
communications = frappe.get_all("Communication", filters={"reference_doctype": doctype, "reference_name": docname}, pluck='name')
communication_links = frappe.get_all('Communication Link',
{
"link_doctype": doctype,
"link_name": docname,
"parent": ("not in", communications)
}, pluck="parent")
communications = frappe.get_all(
"Communication", filters={"reference_doctype": doctype, "reference_name": docname}, pluck="name"
)
communication_links = frappe.get_all(
"Communication Link",
{"link_doctype": doctype, "link_name": docname, "parent": ("not in", communications)},
pluck="parent",
)
return communications + communication_links