diff --git a/erpnext/edi/doctype/code_list/code_list_import.js b/erpnext/edi/doctype/code_list/code_list_import.js
index 4a33f3e2fe6..917e815fc97 100644
--- a/erpnext/edi/doctype/code_list/code_list_import.js
+++ b/erpnext/edi/doctype/code_list/code_list_import.js
@@ -10,6 +10,7 @@ erpnext.edi.import_genericode = function (listview_or_form) {
method: "erpnext.edi.doctype.code_list.code_list_import.import_genericode",
doctype: doctype,
docname: docname,
+ allow_web_link: false,
allow_toggle_private: false,
allow_take_photo: false,
on_success: function (_file_doc, r) {
diff --git a/erpnext/edi/doctype/code_list/code_list_import.py b/erpnext/edi/doctype/code_list/code_list_import.py
index 71cb7d0f82d..582f1d6b864 100644
--- a/erpnext/edi/doctype/code_list/code_list_import.py
+++ b/erpnext/edi/doctype/code_list/code_list_import.py
@@ -1,48 +1,106 @@
import json
+from urllib.parse import urlsplit
import frappe
import requests
from frappe import _
from frappe.utils import escape_html
+from frappe.utils.file_manager import save_file
from lxml import etree
-URL_PREFIXES = ("http://", "https://")
+GENERICODE_FETCH_TIMEOUT = 15
+LOCAL_FILE_PREFIXES = ("/files/", "/private/files/")
+
+
+class RemoteGenericodeUrlNotAllowedError(Exception):
+ pass
+
+
+class CodeListSelectionMismatchError(Exception):
+ pass
@frappe.whitelist()
def import_genericode():
- doctype = frappe.form_dict.doctype
- docname = frappe.form_dict.docname
- content = frappe.local.uploaded_file
-
- # recover the content, if it's a link
- if (file_url := frappe.local.uploaded_file_url) and file_url.startswith(URL_PREFIXES):
- try:
- # If it's a URL, fetch the content and make it a local file (for durable audit)
- response = requests.get(frappe.local.uploaded_file_url)
- response.raise_for_status()
- frappe.local.uploaded_file = content = response.content
- frappe.local.uploaded_filename = frappe.local.uploaded_file_url.split("/")[-1]
- frappe.local.uploaded_file_url = None
- except Exception as e:
- frappe.throw(f"
{e!s}", title=_("Fetching Error"))
-
- if file_url := frappe.local.uploaded_file_url:
- file_path = frappe.utils.file_manager.get_file_path(file_url)
- with open(file_path.encode(), mode="rb") as f:
- content = f.read()
-
- # Parse the xml content
- parser = etree.XMLParser(
- remove_blank_text=True,
- resolve_entities=False,
- load_dtd=False,
- no_network=True,
- )
try:
- root = etree.fromstring(content, parser=parser)
- except Exception as e:
- frappe.throw(f"{e!s}", title=_("Parsing Error"))
+ content, file_name = get_uploaded_genericode_file()
+
+ return import_genericode_content(
+ doctype=frappe.form_dict.doctype,
+ docname=frappe.form_dict.docname,
+ content=content,
+ file_name=file_name,
+ )
+ except RemoteGenericodeUrlNotAllowedError:
+ frappe.throw(
+ _("Importing Code Lists from remote URLs is not allowed."),
+ title=_("Invalid Upload"),
+ )
+ except CodeListSelectionMismatchError:
+ frappe.throw(_("The uploaded file does not match the selected Code List."))
+ except etree.XMLSyntaxError:
+ frappe.throw(
+ _("The uploaded file could not be parsed as a genericode XML document."),
+ title=_("Parsing Error"),
+ )
+
+
+def import_genericode_from_url(
+ url: str,
+ doctype: str = "Code List",
+ docname: str | None = None,
+):
+ """Import a Code List from a trusted backend URL."""
+ content = fetch_genericode_from_url(url)
+ file_name = urlsplit(url).path.rsplit("/", 1)[-1] or "genericode.xml"
+
+ return import_genericode_content(
+ doctype=doctype,
+ docname=docname,
+ content=content,
+ file_name=file_name,
+ )
+
+
+def get_uploaded_genericode_file() -> tuple[bytes, str | None]:
+ uploaded_data = frappe.local.uploaded_file
+ file_name = frappe.local.uploaded_filename
+ if uploaded_data and file_name:
+ return uploaded_data, file_name
+
+ file_url = frappe.local.uploaded_file_url
+ if not file_url:
+ raise frappe.ValidationError(_("No file uploaded or URL provided."))
+
+ if not is_local_file_url(file_url):
+ raise RemoteGenericodeUrlNotAllowedError
+
+ file_doc = frappe.get_doc("File", {"file_url": file_url})
+ file_doc.check_permission("read")
+ return file_doc.get_content(encodings=()), file_name
+
+
+def is_local_file_url(file_url: str | None) -> bool:
+ if not file_url:
+ return False
+
+ parsed = urlsplit(file_url.strip())
+ return not parsed.scheme and not parsed.netloc and parsed.path.startswith(LOCAL_FILE_PREFIXES)
+
+
+def fetch_genericode_from_url(url: str) -> bytes:
+ response = requests.get(url, timeout=GENERICODE_FETCH_TIMEOUT)
+ response.raise_for_status()
+ return response.content
+
+
+def import_genericode_content(
+ doctype: str,
+ docname: str | None,
+ content: bytes,
+ file_name: str | None,
+):
+ root = parse_genericode_content(content)
# Extract the name (CanonicalVersionUri) from the parsed XML
name = root.find(".//CanonicalVersionUri").text
@@ -51,7 +109,7 @@ def import_genericode():
if frappe.db.exists(doctype, docname):
code_list = frappe.get_doc(doctype, docname)
if code_list.name != name:
- frappe.throw(_("The uploaded file does not match the selected Code List."))
+ raise CodeListSelectionMismatchError
else:
# Create a new Code List document with the extracted name
code_list = frappe.new_doc(doctype)
@@ -60,19 +118,13 @@ def import_genericode():
code_list.from_genericode(root)
code_list.save()
- # Attach the file and provide a recoverable identifier
- file_doc = frappe.get_doc(
- {
- "doctype": "File",
- "attached_to_doctype": "Code List",
- "attached_to_name": code_list.name,
- "folder": frappe.db.get_value("File", {"is_attachments_folder": 1}),
- "file_name": frappe.local.uploaded_filename,
- "file_url": frappe.local.uploaded_file_url,
- "is_private": 1,
- "content": content,
- }
- ).save()
+ file_doc = save_file(
+ fname=file_name,
+ content=content,
+ dt=doctype,
+ dn=code_list.name,
+ is_private=1,
+ )
# Get available columns and example values
columns, example_values, filterable_columns = get_genericode_columns_and_examples(root)
@@ -87,6 +139,16 @@ def import_genericode():
}
+def parse_genericode_content(content: bytes):
+ parser = etree.XMLParser(
+ remove_blank_text=True,
+ resolve_entities=False,
+ load_dtd=False,
+ no_network=True,
+ )
+ return etree.fromstring(content, parser=parser)
+
+
@frappe.whitelist()
def process_genericode_import(
code_list_name: str,
diff --git a/erpnext/edi/doctype/code_list/test_code_list_import.py b/erpnext/edi/doctype/code_list/test_code_list_import.py
new file mode 100644
index 00000000000..d03dd1edac9
--- /dev/null
+++ b/erpnext/edi/doctype/code_list/test_code_list_import.py
@@ -0,0 +1,200 @@
+# Copyright (c) 2024, Frappe Technologies Pvt. Ltd. and Contributors
+# See license.txt
+
+from unittest.mock import Mock, patch
+
+import frappe
+import requests
+
+from erpnext.edi.doctype.code_list import code_list_import
+from erpnext.tests.utils import ERPNextTestSuite
+
+SAMPLE_GENERICODE = b"""
+
+
+ Test Code List
+ 1.0
+ test-code-list
+ Code list for tests
+
+ Test Agency
+ TEST
+
+ https://example.com/codelists/test.xml
+
+ test-code-list-v1
+
+
+
+
+
+
+
+ A
+ Alpha
+ Group 1
+
+
+ B
+ Beta
+ Group 2
+
+
+ C
+ Gamma
+ Group 1
+
+
+
+"""
+
+
+class TestCodeListImport(ERPNextTestSuite):
+ def test_import_genericode_rejects_remote_file_url(self):
+ self.set_upload_context(
+ file_name="trusted.xml",
+ file_url="https://example.com/codelists/trusted.xml",
+ )
+
+ with patch("erpnext.edi.doctype.code_list.code_list_import.requests.get") as mock_get:
+ with self.assertRaisesRegex(
+ frappe.ValidationError, "Importing Code Lists from remote URLs is not allowed."
+ ):
+ code_list_import.import_genericode()
+
+ mock_get.assert_not_called()
+
+ def test_import_genericode_rejects_file_scheme_url(self):
+ self.set_upload_context(
+ file_name="trusted.xml",
+ file_url="file:///tmp/trusted.xml",
+ )
+
+ with patch("erpnext.edi.doctype.code_list.code_list_import.requests.get") as mock_get:
+ with self.assertRaisesRegex(
+ frappe.ValidationError, "Importing Code Lists from remote URLs is not allowed."
+ ):
+ code_list_import.import_genericode()
+
+ mock_get.assert_not_called()
+
+ def test_import_genericode_from_trusted_url(self):
+ response = Mock()
+ response.content = SAMPLE_GENERICODE
+ response.raise_for_status.return_value = None
+
+ with patch(
+ "erpnext.edi.doctype.code_list.code_list_import.requests.get",
+ return_value=response,
+ ) as mock_get:
+ import_result = code_list_import.import_genericode_from_url(
+ "https://example.com/codelists/trusted.xml"
+ )
+
+ self.assert_import_response(import_result)
+ mock_get.assert_called_once_with(
+ "https://example.com/codelists/trusted.xml",
+ timeout=code_list_import.GENERICODE_FETCH_TIMEOUT,
+ )
+
+ file_doc = frappe.get_doc("File", import_result["file"])
+ self.assertEqual(file_doc.file_name, "trusted.xml")
+ self.assertFalse(file_doc.file_url.startswith("https://"))
+
+ def test_import_genericode_from_trusted_url_propagates_fetch_errors(self):
+ with patch(
+ "erpnext.edi.doctype.code_list.code_list_import.requests.get",
+ side_effect=requests.Timeout,
+ ):
+ with self.assertRaises(requests.Timeout):
+ code_list_import.import_genericode_from_url("https://example.com/codelists/trusted.xml")
+
+ def test_import_genericode_from_uploaded_file_returns_metadata(self):
+ self.set_upload_context(content=SAMPLE_GENERICODE, file_name="uploaded_genericode.xml")
+
+ import_result = code_list_import.import_genericode()
+
+ self.assert_import_response(import_result)
+
+ file_doc = frappe.get_doc("File", import_result["file"])
+ self.assertEqual(file_doc.file_name, "uploaded_genericode.xml")
+
+ def test_process_genericode_import_reads_file_doc_content(self):
+ self.set_upload_context(content=SAMPLE_GENERICODE, file_name="uploaded_genericode.xml")
+
+ import_result = code_list_import.import_genericode()
+ count = code_list_import.process_genericode_import(
+ code_list_name=import_result["code_list"],
+ file_name=import_result["file"],
+ code_column="code",
+ title_column="name",
+ )
+
+ self.assertEqual(count, 3)
+ self.assertEqual(frappe.db.count("Common Code", {"code_list": import_result["code_list"]}), 3)
+ self.assertEqual(
+ frappe.db.get_value(
+ "Common Code",
+ {"code_list": import_result["code_list"], "common_code": "A"},
+ "title",
+ ),
+ "Alpha",
+ )
+
+ def test_import_genericode_from_local_file_url(self):
+ source_file = frappe.get_doc(
+ {
+ "doctype": "File",
+ "file_name": "library_genericode.xml",
+ "content": SAMPLE_GENERICODE,
+ "is_private": 1,
+ }
+ ).insert()
+ self.set_upload_context(file_name=source_file.file_name, file_url=source_file.file_url)
+
+ import_result = code_list_import.import_genericode()
+
+ self.assert_import_response(import_result)
+
+ def set_upload_context(
+ self,
+ content: bytes | None = None,
+ file_name: str = "genericode.xml",
+ file_url: str | None = None,
+ docname: str | None = None,
+ ):
+ attrs = ("form_dict", "uploaded_file", "uploaded_file_url", "uploaded_filename")
+ originals = {attr: getattr(frappe.local, attr, None) for attr in attrs}
+
+ frappe.local.form_dict = frappe._dict(doctype="Code List", docname=docname)
+ frappe.local.uploaded_file = content
+ frappe.local.uploaded_file_url = file_url
+ frappe.local.uploaded_filename = file_name
+
+ def restore():
+ for attr, value in originals.items():
+ setattr(frappe.local, attr, value)
+
+ self.addCleanup(restore)
+
+ def assert_import_response(self, import_result):
+ self.assertEqual(
+ set(import_result),
+ {
+ "code_list",
+ "code_list_title",
+ "file",
+ "columns",
+ "example_values",
+ "filterable_columns",
+ },
+ )
+ self.assertEqual(import_result["code_list"], "test-code-list-v1")
+ self.assertEqual(import_result["code_list_title"], "Test Code List")
+ self.assertEqual(import_result["columns"], ["code", "name", "category"])
+ self.assertEqual(import_result["example_values"]["code"], ["A", "B", "C"])
+ self.assertEqual(import_result["example_values"]["name"], ["Alpha", "Beta", "Gamma"])
+ self.assertEqual(import_result["example_values"]["category"], ["Group 1", "Group 2", "Group 1"])
+ self.assertCountEqual(import_result["filterable_columns"]["category"], ["Group 1", "Group 2"])
+ self.assertTrue(frappe.db.exists("Code List", import_result["code_list"]))
+ self.assertTrue(frappe.db.exists("File", import_result["file"]))
diff --git a/erpnext/edi/doctype/common_code/common_code.py b/erpnext/edi/doctype/common_code/common_code.py
index d1fd88350be..57e5e32f33a 100644
--- a/erpnext/edi/doctype/common_code/common_code.py
+++ b/erpnext/edi/doctype/common_code/common_code.py
@@ -9,6 +9,8 @@ from frappe.model.document import Document
from frappe.utils.data import get_link_to_form
from lxml import etree
+from erpnext.edi.doctype.code_list.code_list_import import parse_genericode_content
+
class CommonCode(Document):
# begin: auto-generated types
@@ -86,15 +88,15 @@ def simple_hash(input_string, length=6):
def import_genericode(code_list: str, file_name: str, column_map: dict, filters: dict | None = None):
"""Import genericode file and create Common Code entries"""
- file_path = frappe.utils.file_manager.get_file_path(file_name)
- parser = etree.XMLParser(remove_blank_text=True)
- tree = etree.parse(file_path, parser=parser)
- root = tree.getroot()
+ file_doc = frappe.get_doc("File", file_name)
+ file_doc.check_permission("read")
+ root = parse_genericode_content(file_doc.get_content(encodings=()))
# Construct the XPath expression
xpath_expr = ".//SimpleCodeList/Row"
filter_conditions = [
- f"Value[@ColumnRef='{column_ref}']/SimpleValue='{value}'" for column_ref, value in filters.items()
+ f"Value[@ColumnRef='{column_ref}']/SimpleValue='{value}'"
+ for column_ref, value in (filters or {}).items()
]
if filter_conditions:
xpath_expr += "[" + " and ".join(filter_conditions) + "]"
@@ -102,7 +104,7 @@ def import_genericode(code_list: str, file_name: str, column_map: dict, filters:
elements = root.xpath(xpath_expr)
total_elements = len(elements)
for i, xml_element in enumerate(elements, start=1):
- common_code: "CommonCode" = frappe.new_doc("Common Code")
+ common_code: CommonCode = frappe.new_doc("Common Code")
common_code.code_list = code_list
common_code.from_genericode(column_map, xml_element)
common_code.save()