From 8ed63383366728ae7650f6858feaba7ee1f832cd Mon Sep 17 00:00:00 2001 From: Saqib Date: Thu, 26 Aug 2021 13:27:16 +0530 Subject: [PATCH] refactor!: remove e_invoice utils (#27167) --- erpnext/regional/india/e_invoice/utils.py | 1131 --------------------- 1 file changed, 1131 deletions(-) delete mode 100644 erpnext/regional/india/e_invoice/utils.py diff --git a/erpnext/regional/india/e_invoice/utils.py b/erpnext/regional/india/e_invoice/utils.py deleted file mode 100644 index 765b51f435f..00000000000 --- a/erpnext/regional/india/e_invoice/utils.py +++ /dev/null @@ -1,1131 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright (c) 2020, Frappe Technologies Pvt. Ltd. and contributors -# For license information, please see license.txt - -from __future__ import unicode_literals -import os -import re -import jwt -import sys -import json -import base64 -import frappe -import six -import traceback -import io -from frappe import _, bold -from pyqrcode import create as qrcreate -from frappe.utils.background_jobs import enqueue -from frappe.utils.scheduler import is_scheduler_inactive -from frappe.core.page.background_jobs.background_jobs import get_info -from frappe.integrations.utils import make_post_request, make_get_request -from erpnext.regional.india.utils import get_gst_accounts, get_place_of_supply -from frappe.utils.data import cstr, cint, format_date, flt, time_diff_in_seconds, now_datetime, add_to_date, get_link_to_form, getdate, time_diff_in_hours - -@frappe.whitelist() -def validate_eligibility(doc): - if isinstance(doc, six.string_types): - doc = json.loads(doc) - - invalid_doctype = doc.get('doctype') != 'Sales Invoice' - if invalid_doctype: - return False - - einvoicing_enabled = cint(frappe.db.get_single_value('E Invoice Settings', 'enable')) - if not einvoicing_enabled: - return False - - einvoicing_eligible_from = frappe.db.get_single_value('E Invoice Settings', 'applicable_from') or '2021-04-01' - if getdate(doc.get('posting_date')) < getdate(einvoicing_eligible_from): - return False - - invalid_company = not frappe.db.get_value('E Invoice User', { 'company': doc.get('company') }) - invalid_supply_type = doc.get('gst_category') not in ['Registered Regular', 'SEZ', 'Overseas', 'Deemed Export'] - company_transaction = doc.get('billing_address_gstin') == doc.get('company_gstin') - - # if export invoice, then taxes can be empty - # invoice can only be ineligible if no taxes applied and is not an export invoice - no_taxes_applied = not doc.get('taxes') and not doc.get('gst_category') == 'Overseas' - has_non_gst_item = any(d for d in doc.get('items', []) if d.get('is_non_gst')) - - if invalid_company or invalid_supply_type or company_transaction or no_taxes_applied or has_non_gst_item: - return False - - return True - -def validate_einvoice_fields(doc): - invoice_eligible = validate_eligibility(doc) - - if not invoice_eligible: - return - - if doc.docstatus == 0 and doc._action == 'save': - if doc.irn: - frappe.throw(_('You cannot edit the invoice after generating IRN'), title=_('Edit Not Allowed')) - if len(doc.name) > 16: - raise_document_name_too_long_error() - - doc.einvoice_status = 'Pending' - - elif doc.docstatus == 1 and doc._action == 'submit' and not doc.irn: - frappe.throw(_('You must generate IRN before submitting the document.'), title=_('Missing IRN')) - - elif doc.irn and doc.docstatus == 2 and doc._action == 'cancel' and not doc.irn_cancelled: - frappe.throw(_('You must cancel IRN before cancelling the document.'), title=_('Cancel Not Allowed')) - -def raise_document_name_too_long_error(): - title = _('Document ID Too Long') - msg = _('As you have E-Invoicing enabled, to be able to generate IRN for this invoice') - msg += ', ' - msg += _('document id {} exceed 16 letters.').format(bold(_('should not'))) - msg += '

' - msg += _('You must {} your {} in order to have document id of {} length 16.').format( - bold(_('modify')), bold(_('naming series')), bold(_('maximum')) - ) - msg += _('Please account for ammended documents too.') - frappe.throw(msg, title=title) - -def read_json(name): - file_path = os.path.join(os.path.dirname(__file__), '{name}.json'.format(name=name)) - with open(file_path, 'r') as f: - return cstr(f.read()) - -def get_transaction_details(invoice): - supply_type = '' - if invoice.gst_category == 'Registered Regular': supply_type = 'B2B' - elif invoice.gst_category == 'SEZ': supply_type = 'SEZWOP' - elif invoice.gst_category == 'Overseas': supply_type = 'EXPWOP' - elif invoice.gst_category == 'Deemed Export': supply_type = 'DEXP' - - if not supply_type: - rr, sez, overseas, export = bold('Registered Regular'), bold('SEZ'), bold('Overseas'), bold('Deemed Export') - frappe.throw(_('GST category should be one of {}, {}, {}, {}').format(rr, sez, overseas, export), - title=_('Invalid Supply Type')) - - return frappe._dict(dict( - tax_scheme='GST', - supply_type=supply_type, - reverse_charge=invoice.reverse_charge - )) - -def get_doc_details(invoice): - if getdate(invoice.posting_date) < getdate('2021-01-01'): - frappe.throw(_('IRN generation is not allowed for invoices dated before 1st Jan 2021'), title=_('Not Allowed')) - - invoice_type = 'CRN' if invoice.is_return else 'INV' - - invoice_name = invoice.name - invoice_date = format_date(invoice.posting_date, 'dd/mm/yyyy') - - return frappe._dict(dict( - invoice_type=invoice_type, - invoice_name=invoice_name, - invoice_date=invoice_date - )) - -def validate_address_fields(address, is_shipping_address): - if ((not address.gstin and not is_shipping_address) - or not address.city - or not address.pincode - or not address.address_title - or not address.address_line1 - or not address.gst_state_number): - - frappe.throw( - msg=_('Address Lines, City, Pincode, GSTIN are mandatory for address {}. Please set them and try again.').format(address.name), - title=_('Missing Address Fields') - ) - -def get_party_details(address_name, is_shipping_address=False): - addr = frappe.get_doc('Address', address_name) - - validate_address_fields(addr, is_shipping_address) - - if addr.gst_state_number == 97: - # according to einvoice standard - addr.pincode = 999999 - - party_address_details = frappe._dict(dict( - legal_name=sanitize_for_json(addr.address_title), - location=sanitize_for_json(addr.city), - pincode=addr.pincode, gstin=addr.gstin, - state_code=addr.gst_state_number, - address_line1=sanitize_for_json(addr.address_line1), - address_line2=sanitize_for_json(addr.address_line2) - )) - - return party_address_details - -def get_overseas_address_details(address_name): - address_title, address_line1, address_line2, city = frappe.db.get_value( - 'Address', address_name, ['address_title', 'address_line1', 'address_line2', 'city'] - ) - - if not address_title or not address_line1 or not city: - frappe.throw( - msg=_('Address lines and city is mandatory for address {}. Please set them and try again.').format( - get_link_to_form('Address', address_name) - ), - title=_('Missing Address Fields') - ) - - return frappe._dict(dict( - gstin='URP', - legal_name=sanitize_for_json(address_title), - location=city, - address_line1=sanitize_for_json(address_line1), - address_line2=sanitize_for_json(address_line2), - pincode=999999, state_code=96, place_of_supply=96 - )) - -def get_item_list(invoice): - item_list = [] - - for d in invoice.items: - einvoice_item_schema = read_json('einv_item_template') - item = frappe._dict({}) - item.update(d.as_dict()) - - item.sr_no = d.idx - item.description = sanitize_for_json(d.item_name) - - item.qty = abs(item.qty) - if flt(item.qty) != 0.0: - item.unit_rate = abs(item.taxable_value / item.qty) - else: - item.unit_rate = abs(item.taxable_value) - item.gross_amount = abs(item.taxable_value) - item.taxable_value = abs(item.taxable_value) - item.discount_amount = 0 - - item.batch_expiry_date = frappe.db.get_value('Batch', d.batch_no, 'expiry_date') if d.batch_no else None - item.batch_expiry_date = format_date(item.batch_expiry_date, 'dd/mm/yyyy') if item.batch_expiry_date else None - item.is_service_item = 'Y' if item.gst_hsn_code and item.gst_hsn_code[:2] == "99" else 'N' - item.serial_no = "" - - item = update_item_taxes(invoice, item) - - item.total_value = abs( - item.taxable_value + item.igst_amount + item.sgst_amount + - item.cgst_amount + item.cess_amount + item.cess_nadv_amount + item.other_charges - ) - einv_item = einvoice_item_schema.format(item=item) - item_list.append(einv_item) - - return ', '.join(item_list) - -def update_item_taxes(invoice, item): - gst_accounts = get_gst_accounts(invoice.company) - gst_accounts_list = [d for accounts in gst_accounts.values() for d in accounts if d] - - for attr in [ - 'tax_rate', 'cess_rate', 'cess_nadv_amount', - 'cgst_amount', 'sgst_amount', 'igst_amount', - 'cess_amount', 'cess_nadv_amount', 'other_charges' - ]: - item[attr] = 0 - - for t in invoice.taxes: - is_applicable = t.tax_amount and t.account_head in gst_accounts_list - if is_applicable: - # this contains item wise tax rate & tax amount (incl. discount) - item_tax_detail = json.loads(t.item_wise_tax_detail).get(item.item_code or item.item_name) - - item_tax_rate = item_tax_detail[0] - # item tax amount excluding discount amount - item_tax_amount = (item_tax_rate / 100) * item.taxable_value - - if t.account_head in gst_accounts.cess_account: - item_tax_amount_after_discount = item_tax_detail[1] - if t.charge_type == 'On Item Quantity': - item.cess_nadv_amount += abs(item_tax_amount_after_discount) - else: - item.cess_rate += item_tax_rate - item.cess_amount += abs(item_tax_amount_after_discount) - - for tax_type in ['igst', 'cgst', 'sgst']: - if t.account_head in gst_accounts[f'{tax_type}_account']: - item.tax_rate += item_tax_rate - item[f'{tax_type}_amount'] += abs(item_tax_amount) - else: - # TODO: other charges per item - pass - - return item - -def get_invoice_value_details(invoice): - invoice_value_details = frappe._dict(dict()) - invoice_value_details.base_total = abs(sum([i.taxable_value for i in invoice.get('items')])) - invoice_value_details.invoice_discount_amt = 0 - - invoice_value_details.round_off = invoice.base_rounding_adjustment - invoice_value_details.base_grand_total = abs(invoice.base_rounded_total) or abs(invoice.base_grand_total) - invoice_value_details.grand_total = abs(invoice.rounded_total) or abs(invoice.grand_total) - - invoice_value_details = update_invoice_taxes(invoice, invoice_value_details) - - return invoice_value_details - -def update_invoice_taxes(invoice, invoice_value_details): - gst_accounts = get_gst_accounts(invoice.company) - gst_accounts_list = [d for accounts in gst_accounts.values() for d in accounts if d] - - invoice_value_details.total_cgst_amt = 0 - invoice_value_details.total_sgst_amt = 0 - invoice_value_details.total_igst_amt = 0 - invoice_value_details.total_cess_amt = 0 - invoice_value_details.total_other_charges = 0 - considered_rows = [] - - for t in invoice.taxes: - tax_amount = t.base_tax_amount_after_discount_amount - if t.account_head in gst_accounts_list: - if t.account_head in gst_accounts.cess_account: - # using after discount amt since item also uses after discount amt for cess calc - invoice_value_details.total_cess_amt += abs(t.base_tax_amount_after_discount_amount) - - for tax_type in ['igst', 'cgst', 'sgst']: - if t.account_head in gst_accounts[f'{tax_type}_account']: - - invoice_value_details[f'total_{tax_type}_amt'] += abs(tax_amount) - update_other_charges(t, invoice_value_details, gst_accounts_list, invoice, considered_rows) - else: - invoice_value_details.total_other_charges += abs(tax_amount) - - return invoice_value_details - -def update_other_charges(tax_row, invoice_value_details, gst_accounts_list, invoice, considered_rows): - prev_row_id = cint(tax_row.row_id) - 1 - if tax_row.account_head in gst_accounts_list and prev_row_id not in considered_rows: - if tax_row.charge_type == 'On Previous Row Amount': - amount = invoice.get('taxes')[prev_row_id].tax_amount_after_discount_amount - invoice_value_details.total_other_charges -= abs(amount) - considered_rows.append(prev_row_id) - if tax_row.charge_type == 'On Previous Row Total': - amount = invoice.get('taxes')[prev_row_id].base_total - invoice.base_net_total - invoice_value_details.total_other_charges -= abs(amount) - considered_rows.append(prev_row_id) - -def get_payment_details(invoice): - payee_name = invoice.company - mode_of_payment = ', '.join([d.mode_of_payment for d in invoice.payments]) - paid_amount = invoice.base_paid_amount - outstanding_amount = invoice.outstanding_amount - - return frappe._dict(dict( - payee_name=payee_name, mode_of_payment=mode_of_payment, - paid_amount=paid_amount, outstanding_amount=outstanding_amount - )) - -def get_return_doc_reference(invoice): - invoice_date = frappe.db.get_value('Sales Invoice', invoice.return_against, 'posting_date') - return frappe._dict(dict( - invoice_name=invoice.return_against, invoice_date=format_date(invoice_date, 'dd/mm/yyyy') - )) - -def get_eway_bill_details(invoice): - if invoice.is_return: - frappe.throw(_('E-Way Bill cannot be generated for Credit Notes & Debit Notes. Please clear fields in the Transporter Section of the invoice.'), - title=_('Invalid Fields')) - - - mode_of_transport = { '': '', 'Road': '1', 'Air': '2', 'Rail': '3', 'Ship': '4' } - vehicle_type = { 'Regular': 'R', 'Over Dimensional Cargo (ODC)': 'O' } - - return frappe._dict(dict( - gstin=invoice.gst_transporter_id, - name=invoice.transporter_name, - mode_of_transport=mode_of_transport[invoice.mode_of_transport], - distance=invoice.distance or 0, - document_name=invoice.lr_no, - document_date=format_date(invoice.lr_date, 'dd/mm/yyyy'), - vehicle_no=invoice.vehicle_no, - vehicle_type=vehicle_type[invoice.gst_vehicle_type] - )) - -def validate_mandatory_fields(invoice): - if not invoice.company_address: - frappe.throw( - _('Company Address is mandatory to fetch company GSTIN details. Please set Company Address and try again.'), - title=_('Missing Fields') - ) - if not invoice.customer_address: - frappe.throw( - _('Customer Address is mandatory to fetch customer GSTIN details. Please set Company Address and try again.'), - title=_('Missing Fields') - ) - if not frappe.db.get_value('Address', invoice.company_address, 'gstin'): - frappe.throw( - _('GSTIN is mandatory to fetch company GSTIN details. Please enter GSTIN in selected company address.'), - title=_('Missing Fields') - ) - if invoice.gst_category != 'Overseas' and not frappe.db.get_value('Address', invoice.customer_address, 'gstin'): - frappe.throw( - _('GSTIN is mandatory to fetch customer GSTIN details. Please enter GSTIN in selected customer address.'), - title=_('Missing Fields') - ) - -def validate_totals(einvoice): - item_list = einvoice['ItemList'] - value_details = einvoice['ValDtls'] - - total_item_ass_value = 0 - total_item_cgst_value = 0 - total_item_sgst_value = 0 - total_item_igst_value = 0 - total_item_value = 0 - for item in item_list: - total_item_ass_value += flt(item['AssAmt']) - total_item_cgst_value += flt(item['CgstAmt']) - total_item_sgst_value += flt(item['SgstAmt']) - total_item_igst_value += flt(item['IgstAmt']) - total_item_value += flt(item['TotItemVal']) - - if abs(flt(item['AssAmt']) * flt(item['GstRt']) / 100) - (flt(item['CgstAmt']) + flt(item['SgstAmt']) + flt(item['IgstAmt'])) > 1: - frappe.throw(_('Row #{}: GST rate is invalid. Please remove tax rows with zero tax amount from taxes table.').format(item.idx)) - - if abs(flt(value_details['AssVal']) - total_item_ass_value) > 1: - frappe.throw(_('Total Taxable Value of the items is not equal to the Invoice Net Total. Please check item taxes / discounts for any correction.')) - - if abs( - flt(value_details['TotInvVal']) + flt(value_details['Discount']) - - flt(value_details['OthChrg']) - flt(value_details['RndOffAmt']) - - total_item_value) > 1: - frappe.throw(_('Total Value of the items is not equal to the Invoice Grand Total. Please check item taxes / discounts for any correction.')) - - calculated_invoice_value = \ - flt(value_details['AssVal']) + flt(value_details['CgstVal']) \ - + flt(value_details['SgstVal']) + flt(value_details['IgstVal']) \ - + flt(value_details['OthChrg']) + flt(value_details['RndOffAmt']) - flt(value_details['Discount']) - - if abs(flt(value_details['TotInvVal']) - calculated_invoice_value) > 1: - frappe.throw(_('Total Item Value + Taxes - Discount is not equal to the Invoice Grand Total. Please check taxes / discounts for any correction.')) - -def make_einvoice(invoice): - validate_mandatory_fields(invoice) - - schema = read_json('einv_template') - - transaction_details = get_transaction_details(invoice) - item_list = get_item_list(invoice) - doc_details = get_doc_details(invoice) - invoice_value_details = get_invoice_value_details(invoice) - seller_details = get_party_details(invoice.company_address) - - if invoice.gst_category == 'Overseas': - buyer_details = get_overseas_address_details(invoice.customer_address) - else: - buyer_details = get_party_details(invoice.customer_address) - place_of_supply = get_place_of_supply(invoice, invoice.doctype) - if place_of_supply: - place_of_supply = place_of_supply.split('-')[0] - else: - place_of_supply = sanitize_for_json(invoice.billing_address_gstin)[:2] - buyer_details.update(dict(place_of_supply=place_of_supply)) - - seller_details.update(dict(legal_name=invoice.company)) - buyer_details.update(dict(legal_name=invoice.customer_name or invoice.customer)) - - shipping_details = payment_details = prev_doc_details = eway_bill_details = frappe._dict({}) - if invoice.shipping_address_name and invoice.customer_address != invoice.shipping_address_name: - if invoice.gst_category == 'Overseas': - shipping_details = get_overseas_address_details(invoice.shipping_address_name) - else: - shipping_details = get_party_details(invoice.shipping_address_name, is_shipping_address=True) - - if invoice.is_pos and invoice.base_paid_amount: - payment_details = get_payment_details(invoice) - - if invoice.is_return and invoice.return_against: - prev_doc_details = get_return_doc_reference(invoice) - - if invoice.transporter and not invoice.is_return: - eway_bill_details = get_eway_bill_details(invoice) - - # not yet implemented - dispatch_details = period_details = export_details = frappe._dict({}) - - einvoice = schema.format( - transaction_details=transaction_details, doc_details=doc_details, dispatch_details=dispatch_details, - seller_details=seller_details, buyer_details=buyer_details, shipping_details=shipping_details, - item_list=item_list, invoice_value_details=invoice_value_details, payment_details=payment_details, - period_details=period_details, prev_doc_details=prev_doc_details, - export_details=export_details, eway_bill_details=eway_bill_details - ) - - try: - einvoice = safe_json_load(einvoice) - einvoice = santize_einvoice_fields(einvoice) - except Exception: - show_link_to_error_log(invoice, einvoice) - - validate_totals(einvoice) - - return einvoice - -def show_link_to_error_log(invoice, einvoice): - err_log = log_error(einvoice) - link_to_error_log = get_link_to_form('Error Log', err_log.name, 'Error Log') - frappe.throw( - _('An error occurred while creating e-invoice for {}. Please check {} for more information.').format( - invoice.name, link_to_error_log), - title=_('E Invoice Creation Failed') - ) - -def log_error(data=None): - if isinstance(data, six.string_types): - data = json.loads(data) - - seperator = "--" * 50 - err_tb = traceback.format_exc() - err_msg = str(sys.exc_info()[1]) - data = json.dumps(data, indent=4) - - message = "\n".join([ - "Error", err_msg, seperator, - "Data:", data, seperator, - "Exception:", err_tb - ]) - frappe.log_error(title=_('E Invoice Request Failed'), message=message) - -def santize_einvoice_fields(einvoice): - int_fields = ["Pin","Distance","CrDay"] - float_fields = ["Qty","FreeQty","UnitPrice","TotAmt","Discount","PreTaxVal","AssAmt","GstRt","IgstAmt","CgstAmt","SgstAmt","CesRt","CesAmt","CesNonAdvlAmt","StateCesRt","StateCesAmt","StateCesNonAdvlAmt","OthChrg","TotItemVal","AssVal","CgstVal","SgstVal","IgstVal","CesVal","StCesVal","Discount","OthChrg","RndOffAmt","TotInvVal","TotInvValFc","PaidAmt","PaymtDue","ExpDuty",] - copy = einvoice.copy() - for key, value in copy.items(): - if isinstance(value, list): - for idx, d in enumerate(value): - santized_dict = santize_einvoice_fields(d) - if santized_dict: - einvoice[key][idx] = santized_dict - else: - einvoice[key].pop(idx) - - if not einvoice[key]: - einvoice.pop(key, None) - - elif isinstance(value, dict): - santized_dict = santize_einvoice_fields(value) - if santized_dict: - einvoice[key] = santized_dict - else: - einvoice.pop(key, None) - - elif not value or value == "None": - einvoice.pop(key, None) - - elif key in float_fields: - einvoice[key] = flt(value, 2) - - elif key in int_fields: - einvoice[key] = cint(value) - - return einvoice - -def safe_json_load(json_string): - try: - return json.loads(json_string) - except json.JSONDecodeError as e: - # print a snippet of 40 characters around the location where error occured - pos = e.pos - start, end = max(0, pos-20), min(len(json_string)-1, pos+20) - snippet = json_string[start:end] - frappe.throw(_("Error in input data. Please check for any special characters near following input:
{}").format(snippet)) - -class RequestFailed(Exception): - pass -class CancellationNotAllowed(Exception): - pass - -class GSPConnector(): - def __init__(self, doctype=None, docname=None): - self.doctype = doctype - self.docname = docname - - self.set_invoice() - self.set_credentials() - - # authenticate url is same for sandbox & live - self.authenticate_url = 'https://gsp.adaequare.com/gsp/authenticate?grant_type=token' - self.base_url = 'https://gsp.adaequare.com' if not self.e_invoice_settings.sandbox_mode else 'https://gsp.adaequare.com/test' - - self.cancel_irn_url = self.base_url + '/enriched/ei/api/invoice/cancel' - self.irn_details_url = self.base_url + '/enriched/ei/api/invoice/irn' - self.generate_irn_url = self.base_url + '/enriched/ei/api/invoice' - self.gstin_details_url = self.base_url + '/enriched/ei/api/master/gstin' - self.cancel_ewaybill_url = self.base_url + '/enriched/ewb/ewayapi?action=CANEWB' - self.generate_ewaybill_url = self.base_url + '/enriched/ei/api/ewaybill' - - def set_invoice(self): - self.invoice = None - if self.doctype and self.docname: - self.invoice = frappe.get_cached_doc(self.doctype, self.docname) - - def set_credentials(self): - self.e_invoice_settings = frappe.get_cached_doc('E Invoice Settings') - - if not self.e_invoice_settings.enable: - frappe.throw(_("E-Invoicing is disabled. Please enable it from {} to generate e-invoices.").format(get_link_to_form("E Invoice Settings", "E Invoice Settings"))) - - if self.invoice: - gstin = self.get_seller_gstin() - credentials_for_gstin = [d for d in self.e_invoice_settings.credentials if d.gstin == gstin] - if credentials_for_gstin: - self.credentials = credentials_for_gstin[0] - else: - frappe.throw(_('Cannot find e-invoicing credentials for selected Company GSTIN. Please check E-Invoice Settings')) - else: - self.credentials = self.e_invoice_settings.credentials[0] if self.e_invoice_settings.credentials else None - - def get_seller_gstin(self): - gstin = frappe.db.get_value('Address', self.invoice.company_address, 'gstin') - if not gstin: - frappe.throw(_('Cannot retrieve Company GSTIN. Please select company address with valid GSTIN.')) - return gstin - - def get_auth_token(self): - if time_diff_in_seconds(self.e_invoice_settings.token_expiry, now_datetime()) < 150.0: - self.fetch_auth_token() - - return self.e_invoice_settings.auth_token - - def make_request(self, request_type, url, headers=None, data=None): - if request_type == 'post': - res = make_post_request(url, headers=headers, data=data) - else: - res = make_get_request(url, headers=headers, data=data) - - self.log_request(url, headers, data, res) - return res - - def log_request(self, url, headers, data, res): - headers.update({ 'password': self.credentials.password }) - request_log = frappe.get_doc({ - "doctype": "E Invoice Request Log", - "user": frappe.session.user, - "reference_invoice": self.invoice.name if self.invoice else None, - "url": url, - "headers": json.dumps(headers, indent=4) if headers else None, - "data": json.dumps(data, indent=4) if isinstance(data, dict) else data, - "response": json.dumps(res, indent=4) if res else None - }) - request_log.save(ignore_permissions=True) - frappe.db.commit() - - def fetch_auth_token(self): - headers = { - 'gspappid': frappe.conf.einvoice_client_id, - 'gspappsecret': frappe.conf.einvoice_client_secret - } - res = {} - try: - res = self.make_request('post', self.authenticate_url, headers) - self.e_invoice_settings.auth_token = "{} {}".format(res.get('token_type'), res.get('access_token')) - self.e_invoice_settings.token_expiry = add_to_date(None, seconds=res.get('expires_in')) - self.e_invoice_settings.save(ignore_permissions=True) - self.e_invoice_settings.reload() - - except Exception: - log_error(res) - self.raise_error(True) - - def get_headers(self): - return { - 'content-type': 'application/json', - 'user_name': self.credentials.username, - 'password': self.credentials.get_password(), - 'gstin': self.credentials.gstin, - 'authorization': self.get_auth_token(), - 'requestid': str(base64.b64encode(os.urandom(18))), - } - - def fetch_gstin_details(self, gstin): - headers = self.get_headers() - - try: - params = '?gstin={gstin}'.format(gstin=gstin) - res = self.make_request('get', self.gstin_details_url + params, headers) - if res.get('success'): - return res.get('result') - else: - log_error(res) - raise RequestFailed - - except RequestFailed: - self.raise_error() - - except Exception: - log_error() - self.raise_error(True) - @staticmethod - def get_gstin_details(gstin): - '''fetch and cache GSTIN details''' - if not hasattr(frappe.local, 'gstin_cache'): - frappe.local.gstin_cache = {} - - key = gstin - gsp_connector = GSPConnector() - details = gsp_connector.fetch_gstin_details(gstin) - - frappe.local.gstin_cache[key] = details - frappe.cache().hset('gstin_cache', key, details) - return details - - def generate_irn(self): - data = {} - try: - headers = self.get_headers() - einvoice = make_einvoice(self.invoice) - data = json.dumps(einvoice, indent=4) - res = self.make_request('post', self.generate_irn_url, headers, data) - - if res.get('success'): - self.set_einvoice_data(res.get('result')) - - elif '2150' in res.get('message'): - # IRN already generated but not updated in invoice - # Extract the IRN from the response description and fetch irn details - irn = res.get('result')[0].get('Desc').get('Irn') - irn_details = self.get_irn_details(irn) - if irn_details: - self.set_einvoice_data(irn_details) - else: - raise RequestFailed('IRN has already been generated for the invoice but cannot fetch details for the it. \ - Contact ERPNext support to resolve the issue.') - - else: - raise RequestFailed - - except RequestFailed: - errors = self.sanitize_error_message(res.get('message')) - self.set_failed_status(errors=errors) - self.raise_error(errors=errors) - - except Exception as e: - self.set_failed_status(errors=str(e)) - log_error(data) - self.raise_error(True) - - @staticmethod - def bulk_generate_irn(invoices): - gsp_connector = GSPConnector() - gsp_connector.doctype = 'Sales Invoice' - - failed = [] - - for invoice in invoices: - try: - gsp_connector.docname = invoice - gsp_connector.set_invoice() - gsp_connector.set_credentials() - gsp_connector.generate_irn() - - except Exception as e: - failed.append({ - 'docname': invoice, - 'message': str(e) - }) - - return failed - - def get_irn_details(self, irn): - headers = self.get_headers() - - try: - params = '?irn={irn}'.format(irn=irn) - res = self.make_request('get', self.irn_details_url + params, headers) - if res.get('success'): - return res.get('result') - else: - raise RequestFailed - - except RequestFailed: - errors = self.sanitize_error_message(res.get('message')) - self.raise_error(errors=errors) - - except Exception: - log_error() - self.raise_error(True) - - def cancel_irn(self, irn, reason, remark): - data, res = {}, {} - try: - # validate cancellation - if time_diff_in_hours(now_datetime(), self.invoice.ack_date) > 24: - frappe.throw(_('E-Invoice cannot be cancelled after 24 hours of IRN generation.'), title=_('Not Allowed'), exc=CancellationNotAllowed) - if not irn: - frappe.throw(_('IRN not found. You must generate IRN before cancelling.'), title=_('Not Allowed'), exc=CancellationNotAllowed) - - headers = self.get_headers() - data = json.dumps({ - 'Irn': irn, - 'Cnlrsn': reason, - 'Cnlrem': remark - }, indent=4) - - res = self.make_request('post', self.cancel_irn_url, headers, data) - if res.get('success') or '9999' in res.get('message'): - self.invoice.irn_cancelled = 1 - self.invoice.irn_cancel_date = res.get('result')['CancelDate'] if res.get('result') else "" - self.invoice.einvoice_status = 'Cancelled' - self.invoice.flags.updater_reference = { - 'doctype': self.invoice.doctype, - 'docname': self.invoice.name, - 'label': _('IRN Cancelled - {}').format(remark) - } - self.update_invoice() - - else: - raise RequestFailed - - except RequestFailed: - errors = self.sanitize_error_message(res.get('message')) - self.set_failed_status(errors=errors) - self.raise_error(errors=errors) - - except CancellationNotAllowed as e: - self.set_failed_status(errors=str(e)) - self.raise_error(errors=str(e)) - - except Exception as e: - self.set_failed_status(errors=str(e)) - log_error(data) - self.raise_error(True) - - @staticmethod - def bulk_cancel_irn(invoices, reason, remark): - gsp_connector = GSPConnector() - gsp_connector.doctype = 'Sales Invoice' - - failed = [] - - for invoice in invoices: - try: - gsp_connector.docname = invoice - gsp_connector.set_invoice() - gsp_connector.set_credentials() - irn = gsp_connector.invoice.irn - gsp_connector.cancel_irn(irn, reason, remark) - - except Exception as e: - failed.append({ - 'docname': invoice, - 'message': str(e) - }) - - return failed - - def generate_eway_bill(self, **kwargs): - args = frappe._dict(kwargs) - - headers = self.get_headers() - eway_bill_details = get_eway_bill_details(args) - data = json.dumps({ - 'Irn': args.irn, - 'Distance': cint(eway_bill_details.distance), - 'TransMode': eway_bill_details.mode_of_transport, - 'TransId': eway_bill_details.gstin, - 'TransName': eway_bill_details.transporter, - 'TrnDocDt': eway_bill_details.document_date, - 'TrnDocNo': eway_bill_details.document_name, - 'VehNo': eway_bill_details.vehicle_no, - 'VehType': eway_bill_details.vehicle_type - }, indent=4) - - try: - res = self.make_request('post', self.generate_ewaybill_url, headers, data) - if res.get('success'): - self.invoice.ewaybill = res.get('result').get('EwbNo') - self.invoice.eway_bill_validity = res.get('result').get('EwbValidTill') - self.invoice.eway_bill_cancelled = 0 - self.invoice.update(args) - self.invoice.flags.updater_reference = { - 'doctype': self.invoice.doctype, - 'docname': self.invoice.name, - 'label': _('E-Way Bill Generated') - } - self.update_invoice() - - else: - raise RequestFailed - - except RequestFailed: - errors = self.sanitize_error_message(res.get('message')) - self.raise_error(errors=errors) - - except Exception: - log_error(data) - self.raise_error(True) - - def cancel_eway_bill(self, eway_bill, reason, remark): - headers = self.get_headers() - data = json.dumps({ - 'ewbNo': eway_bill, - 'cancelRsnCode': reason, - 'cancelRmrk': remark - }, indent=4) - headers["username"] = headers["user_name"] - del headers["user_name"] - try: - res = self.make_request('post', self.cancel_ewaybill_url, headers, data) - if res.get('success'): - self.invoice.ewaybill = '' - self.invoice.eway_bill_cancelled = 1 - self.invoice.flags.updater_reference = { - 'doctype': self.invoice.doctype, - 'docname': self.invoice.name, - 'label': _('E-Way Bill Cancelled - {}').format(remark) - } - self.update_invoice() - - else: - raise RequestFailed - - except RequestFailed: - errors = self.sanitize_error_message(res.get('message')) - self.raise_error(errors=errors) - - except Exception: - log_error(data) - self.raise_error(True) - - def sanitize_error_message(self, message): - ''' - On validation errors, response message looks something like this: - message = '2174 : For inter-state transaction, CGST and SGST amounts are not applicable; only IGST amount is applicable, - 3095 : Supplier GSTIN is inactive' - we search for string between ':' to extract the error messages - errors = [ - ': For inter-state transaction, CGST and SGST amounts are not applicable; only IGST amount is applicable, 3095 ', - ': Test' - ] - then we trim down the message by looping over errors - ''' - if not message: - return [] - - errors = re.findall(': [^:]+', message) - for idx, e in enumerate(errors): - # remove colons - errors[idx] = errors[idx].replace(':', '').strip() - # if not last - if idx != len(errors) - 1: - # remove last 7 chars eg: ', 3095 ' - errors[idx] = errors[idx][:-6] - - return errors - - def raise_error(self, raise_exception=False, errors=[]): - title = _('E Invoice Request Failed') - if errors: - frappe.throw(errors, title=title, as_list=1) - else: - link_to_error_list = 'Error Log' - frappe.msgprint( - _('An error occurred while making e-invoicing request. Please check {} for more information.').format(link_to_error_list), - title=title, - raise_exception=raise_exception, - indicator='red' - ) - - def set_einvoice_data(self, res): - enc_signed_invoice = res.get('SignedInvoice') - dec_signed_invoice = jwt.decode(enc_signed_invoice, options={"verify_signature": False})['data'] - - self.invoice.irn = res.get('Irn') - self.invoice.ewaybill = res.get('EwbNo') - self.invoice.eway_bill_validity = res.get('EwbValidTill') - self.invoice.ack_no = res.get('AckNo') - self.invoice.ack_date = res.get('AckDt') - self.invoice.signed_einvoice = dec_signed_invoice - self.invoice.ack_no = res.get('AckNo') - self.invoice.ack_date = res.get('AckDt') - self.invoice.signed_qr_code = res.get('SignedQRCode') - self.invoice.einvoice_status = 'Generated' - - self.attach_qrcode_image() - - self.invoice.flags.updater_reference = { - 'doctype': self.invoice.doctype, - 'docname': self.invoice.name, - 'label': _('IRN Generated') - } - self.update_invoice() - - def attach_qrcode_image(self): - qrcode = self.invoice.signed_qr_code - doctype = self.invoice.doctype - docname = self.invoice.name - filename = 'QRCode_{}.png'.format(docname).replace(os.path.sep, "__") - - qr_image = io.BytesIO() - url = qrcreate(qrcode, error='L') - url.png(qr_image, scale=2, quiet_zone=1) - _file = frappe.get_doc({ - "doctype": "File", - "file_name": filename, - "attached_to_doctype": doctype, - "attached_to_name": docname, - "attached_to_field": "qrcode_image", - "is_private": 0, - "content": qr_image.getvalue()}) - _file.save() - frappe.db.commit() - self.invoice.qrcode_image = _file.file_url - - def update_invoice(self): - self.invoice.flags.ignore_validate_update_after_submit = True - self.invoice.flags.ignore_validate = True - self.invoice.save() - - def set_failed_status(self, errors=None): - frappe.db.rollback() - self.invoice.einvoice_status = 'Failed' - self.invoice.failure_description = self.get_failure_message(errors) if errors else "" - self.update_invoice() - frappe.db.commit() - - def get_failure_message(self, errors): - if isinstance(errors, list): - errors = ', '.join(errors) - return errors - -def sanitize_for_json(string): - """Escape JSON specific characters from a string.""" - - # json.dumps adds double-quotes to the string. Indexing to remove them. - return json.dumps(string)[1:-1] - -@frappe.whitelist() -def get_einvoice(doctype, docname): - invoice = frappe.get_doc(doctype, docname) - return make_einvoice(invoice) - -@frappe.whitelist() -def generate_irn(doctype, docname): - gsp_connector = GSPConnector(doctype, docname) - gsp_connector.generate_irn() - -@frappe.whitelist() -def cancel_irn(doctype, docname, irn, reason, remark): - gsp_connector = GSPConnector(doctype, docname) - gsp_connector.cancel_irn(irn, reason, remark) - -@frappe.whitelist() -def generate_eway_bill(doctype, docname, **kwargs): - gsp_connector = GSPConnector(doctype, docname) - gsp_connector.generate_eway_bill(**kwargs) - -@frappe.whitelist() -def cancel_eway_bill(doctype, docname): - # TODO: uncomment when eway_bill api from Adequare is enabled - # gsp_connector = GSPConnector(doctype, docname) - # gsp_connector.cancel_eway_bill(eway_bill, reason, remark) - - frappe.db.set_value(doctype, docname, 'ewaybill', '') - frappe.db.set_value(doctype, docname, 'eway_bill_cancelled', 1) - -@frappe.whitelist() -def generate_einvoices(docnames): - docnames = json.loads(docnames) or [] - - if len(docnames) < 10: - failures = GSPConnector.bulk_generate_irn(docnames) - frappe.local.message_log = [] - - if failures: - show_bulk_action_failure_message(failures) - - success = len(docnames) - len(failures) - frappe.msgprint( - _('{} e-invoices generated successfully').format(success), - title=_('Bulk E-Invoice Generation Complete') - ) - - else: - enqueue_bulk_action(schedule_bulk_generate_irn, docnames=docnames) - -def schedule_bulk_generate_irn(docnames): - failures = GSPConnector.bulk_generate_irn(docnames) - frappe.local.message_log = [] - - frappe.publish_realtime("bulk_einvoice_generation_complete", { - "user": frappe.session.user, - "failures": failures, - "invoices": docnames - }) - -def show_bulk_action_failure_message(failures): - for doc in failures: - docname = '{0}'.format(doc.get('docname')) - message = doc.get('message').replace("'", '"') - if message[0] == '[': - errors = json.loads(message) - error_list = ''.join(['
  • {}
  • '.format(err) for err in errors]) - message = '''{} has following errors:
    - '''.format(docname, error_list) - else: - message = '{} - {}'.format(docname, message) - - frappe.msgprint( - message, - title=_('Bulk E-Invoice Generation Complete'), - indicator='red' - ) - -@frappe.whitelist() -def cancel_irns(docnames, reason, remark): - docnames = json.loads(docnames) or [] - - if len(docnames) < 10: - failures = GSPConnector.bulk_cancel_irn(docnames, reason, remark) - frappe.local.message_log = [] - - if failures: - show_bulk_action_failure_message(failures) - - success = len(docnames) - len(failures) - frappe.msgprint( - _('{} e-invoices cancelled successfully').format(success), - title=_('Bulk E-Invoice Cancellation Complete') - ) - else: - enqueue_bulk_action(schedule_bulk_cancel_irn, docnames=docnames, reason=reason, remark=remark) - -def schedule_bulk_cancel_irn(docnames, reason, remark): - failures = GSPConnector.bulk_cancel_irn(docnames, reason, remark) - frappe.local.message_log = [] - - frappe.publish_realtime("bulk_einvoice_cancellation_complete", { - "user": frappe.session.user, - "failures": failures, - "invoices": docnames - }) - -def enqueue_bulk_action(job, **kwargs): - check_scheduler_status() - - enqueue( - job, - **kwargs, - queue="long", - timeout=10000, - event="processing_bulk_einvoice_action", - now=frappe.conf.developer_mode or frappe.flags.in_test, - ) - - if job == schedule_bulk_generate_irn: - msg = _('E-Invoices will be generated in a background process.') - else: - msg = _('E-Invoices will be cancelled in a background process.') - - frappe.msgprint(msg, alert=1) - -def check_scheduler_status(): - if is_scheduler_inactive() and not frappe.flags.in_test: - frappe.throw(_("Scheduler is inactive. Cannot enqueue job."), title=_("Scheduler Inactive")) - -def job_already_enqueued(job_name): - enqueued_jobs = [d.get("job_name") for d in get_info()] - if job_name in enqueued_jobs: - return True