mirror of
https://github.com/frappe/erpnext.git
synced 2026-05-01 20:48:27 +00:00
Co-authored-by: Raffael Meyer <14891507+barredterra@users.noreply.github.com> fix(edi): restrict Code List imports to files and trusted backend URLs (#54137) fix(edi): hardcode "Code List" DocType in importer (#54488)
This commit is contained in:
@@ -10,6 +10,7 @@ erpnext.edi.import_genericode = function (listview_or_form) {
|
|||||||
method: "erpnext.edi.doctype.code_list.code_list_import.import_genericode",
|
method: "erpnext.edi.doctype.code_list.code_list_import.import_genericode",
|
||||||
doctype: doctype,
|
doctype: doctype,
|
||||||
docname: docname,
|
docname: docname,
|
||||||
|
allow_web_link: false,
|
||||||
allow_toggle_private: false,
|
allow_toggle_private: false,
|
||||||
allow_take_photo: false,
|
allow_take_photo: false,
|
||||||
on_success: function (_file_doc, r) {
|
on_success: function (_file_doc, r) {
|
||||||
|
|||||||
@@ -1,48 +1,106 @@
|
|||||||
import json
|
import json
|
||||||
|
from urllib.parse import urlsplit
|
||||||
|
|
||||||
import frappe
|
import frappe
|
||||||
import requests
|
import requests
|
||||||
from frappe import _
|
from frappe import _
|
||||||
from frappe.utils import escape_html
|
from frappe.utils import escape_html
|
||||||
|
from frappe.utils.file_manager import save_file
|
||||||
from lxml import etree
|
from lxml import etree
|
||||||
|
|
||||||
URL_PREFIXES = ("http://", "https://")
|
GENERICODE_FETCH_TIMEOUT = 15
|
||||||
|
LOCAL_FILE_PREFIXES = ("/files/", "/private/files/")
|
||||||
|
|
||||||
|
|
||||||
|
class RemoteGenericodeUrlNotAllowedError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class CodeListSelectionMismatchError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
@frappe.whitelist()
|
@frappe.whitelist()
|
||||||
def import_genericode():
|
def import_genericode():
|
||||||
doctype = frappe.form_dict.doctype
|
|
||||||
docname = frappe.form_dict.docname
|
|
||||||
content = frappe.local.uploaded_file
|
|
||||||
|
|
||||||
# recover the content, if it's a link
|
|
||||||
if (file_url := frappe.local.uploaded_file_url) and file_url.startswith(URL_PREFIXES):
|
|
||||||
try:
|
|
||||||
# If it's a URL, fetch the content and make it a local file (for durable audit)
|
|
||||||
response = requests.get(frappe.local.uploaded_file_url)
|
|
||||||
response.raise_for_status()
|
|
||||||
frappe.local.uploaded_file = content = response.content
|
|
||||||
frappe.local.uploaded_filename = frappe.local.uploaded_file_url.split("/")[-1]
|
|
||||||
frappe.local.uploaded_file_url = None
|
|
||||||
except Exception as e:
|
|
||||||
frappe.throw(f"<pre>{e!s}</pre>", title=_("Fetching Error"))
|
|
||||||
|
|
||||||
if file_url := frappe.local.uploaded_file_url:
|
|
||||||
file_path = frappe.utils.file_manager.get_file_path(file_url)
|
|
||||||
with open(file_path.encode(), mode="rb") as f:
|
|
||||||
content = f.read()
|
|
||||||
|
|
||||||
# Parse the xml content
|
|
||||||
parser = etree.XMLParser(
|
|
||||||
remove_blank_text=True,
|
|
||||||
resolve_entities=False,
|
|
||||||
load_dtd=False,
|
|
||||||
no_network=True,
|
|
||||||
)
|
|
||||||
try:
|
try:
|
||||||
root = etree.fromstring(content, parser=parser)
|
content, file_name = get_uploaded_genericode_file()
|
||||||
except Exception as e:
|
|
||||||
frappe.throw(f"<pre>{e!s}</pre>", title=_("Parsing Error"))
|
return import_genericode_content(
|
||||||
|
doctype="Code List",
|
||||||
|
docname=frappe.form_dict.docname,
|
||||||
|
content=content,
|
||||||
|
file_name=file_name,
|
||||||
|
)
|
||||||
|
except RemoteGenericodeUrlNotAllowedError:
|
||||||
|
frappe.throw(
|
||||||
|
_("Importing Code Lists from remote URLs is not allowed."),
|
||||||
|
title=_("Invalid Upload"),
|
||||||
|
)
|
||||||
|
except CodeListSelectionMismatchError:
|
||||||
|
frappe.throw(_("The uploaded file does not match the selected Code List."))
|
||||||
|
except etree.XMLSyntaxError:
|
||||||
|
frappe.throw(
|
||||||
|
_("The uploaded file could not be parsed as a genericode XML document."),
|
||||||
|
title=_("Parsing Error"),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def import_genericode_from_url(
|
||||||
|
url: str,
|
||||||
|
doctype: str = "Code List",
|
||||||
|
docname: str | None = None,
|
||||||
|
):
|
||||||
|
"""Import a Code List from a trusted backend URL."""
|
||||||
|
content = fetch_genericode_from_url(url)
|
||||||
|
file_name = urlsplit(url).path.rsplit("/", 1)[-1] or "genericode.xml"
|
||||||
|
|
||||||
|
return import_genericode_content(
|
||||||
|
doctype=doctype,
|
||||||
|
docname=docname,
|
||||||
|
content=content,
|
||||||
|
file_name=file_name,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def get_uploaded_genericode_file() -> tuple[bytes, str | None]:
|
||||||
|
uploaded_data = frappe.local.uploaded_file
|
||||||
|
file_name = frappe.local.uploaded_filename
|
||||||
|
if uploaded_data and file_name:
|
||||||
|
return uploaded_data, file_name
|
||||||
|
|
||||||
|
file_url = frappe.local.uploaded_file_url
|
||||||
|
if not file_url:
|
||||||
|
raise frappe.ValidationError(_("No file uploaded or URL provided."))
|
||||||
|
|
||||||
|
if not is_local_file_url(file_url):
|
||||||
|
raise RemoteGenericodeUrlNotAllowedError
|
||||||
|
|
||||||
|
file_doc = frappe.get_doc("File", {"file_url": file_url})
|
||||||
|
file_doc.check_permission("read")
|
||||||
|
return file_doc.get_content(encodings=()), file_name
|
||||||
|
|
||||||
|
|
||||||
|
def is_local_file_url(file_url: str | None) -> bool:
|
||||||
|
if not file_url:
|
||||||
|
return False
|
||||||
|
|
||||||
|
parsed = urlsplit(file_url.strip())
|
||||||
|
return not parsed.scheme and not parsed.netloc and parsed.path.startswith(LOCAL_FILE_PREFIXES)
|
||||||
|
|
||||||
|
|
||||||
|
def fetch_genericode_from_url(url: str) -> bytes:
|
||||||
|
response = requests.get(url, timeout=GENERICODE_FETCH_TIMEOUT)
|
||||||
|
response.raise_for_status()
|
||||||
|
return response.content
|
||||||
|
|
||||||
|
|
||||||
|
def import_genericode_content(
|
||||||
|
doctype: str,
|
||||||
|
docname: str | None,
|
||||||
|
content: bytes,
|
||||||
|
file_name: str | None,
|
||||||
|
):
|
||||||
|
root = parse_genericode_content(content)
|
||||||
|
|
||||||
# Extract the name (CanonicalVersionUri) from the parsed XML
|
# Extract the name (CanonicalVersionUri) from the parsed XML
|
||||||
name = root.find(".//CanonicalVersionUri").text
|
name = root.find(".//CanonicalVersionUri").text
|
||||||
@@ -51,7 +109,7 @@ def import_genericode():
|
|||||||
if frappe.db.exists(doctype, docname):
|
if frappe.db.exists(doctype, docname):
|
||||||
code_list = frappe.get_doc(doctype, docname)
|
code_list = frappe.get_doc(doctype, docname)
|
||||||
if code_list.name != name:
|
if code_list.name != name:
|
||||||
frappe.throw(_("The uploaded file does not match the selected Code List."))
|
raise CodeListSelectionMismatchError
|
||||||
else:
|
else:
|
||||||
# Create a new Code List document with the extracted name
|
# Create a new Code List document with the extracted name
|
||||||
code_list = frappe.new_doc(doctype)
|
code_list = frappe.new_doc(doctype)
|
||||||
@@ -60,19 +118,13 @@ def import_genericode():
|
|||||||
code_list.from_genericode(root)
|
code_list.from_genericode(root)
|
||||||
code_list.save()
|
code_list.save()
|
||||||
|
|
||||||
# Attach the file and provide a recoverable identifier
|
file_doc = save_file(
|
||||||
file_doc = frappe.get_doc(
|
fname=file_name,
|
||||||
{
|
content=content,
|
||||||
"doctype": "File",
|
dt=doctype,
|
||||||
"attached_to_doctype": "Code List",
|
dn=code_list.name,
|
||||||
"attached_to_name": code_list.name,
|
is_private=1,
|
||||||
"folder": frappe.db.get_value("File", {"is_attachments_folder": 1}),
|
)
|
||||||
"file_name": frappe.local.uploaded_filename,
|
|
||||||
"file_url": frappe.local.uploaded_file_url,
|
|
||||||
"is_private": 1,
|
|
||||||
"content": content,
|
|
||||||
}
|
|
||||||
).save()
|
|
||||||
|
|
||||||
# Get available columns and example values
|
# Get available columns and example values
|
||||||
columns, example_values, filterable_columns = get_genericode_columns_and_examples(root)
|
columns, example_values, filterable_columns = get_genericode_columns_and_examples(root)
|
||||||
@@ -87,6 +139,16 @@ def import_genericode():
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def parse_genericode_content(content: bytes):
|
||||||
|
parser = etree.XMLParser(
|
||||||
|
remove_blank_text=True,
|
||||||
|
resolve_entities=False,
|
||||||
|
load_dtd=False,
|
||||||
|
no_network=True,
|
||||||
|
)
|
||||||
|
return etree.fromstring(content, parser=parser)
|
||||||
|
|
||||||
|
|
||||||
@frappe.whitelist()
|
@frappe.whitelist()
|
||||||
def process_genericode_import(
|
def process_genericode_import(
|
||||||
code_list_name: str,
|
code_list_name: str,
|
||||||
|
|||||||
200
erpnext/edi/doctype/code_list/test_code_list_import.py
Normal file
200
erpnext/edi/doctype/code_list/test_code_list_import.py
Normal file
@@ -0,0 +1,200 @@
|
|||||||
|
# Copyright (c) 2024, Frappe Technologies Pvt. Ltd. and Contributors
|
||||||
|
# See license.txt
|
||||||
|
|
||||||
|
from unittest.mock import Mock, patch
|
||||||
|
|
||||||
|
import frappe
|
||||||
|
import requests
|
||||||
|
|
||||||
|
from erpnext.edi.doctype.code_list import code_list_import
|
||||||
|
from erpnext.tests.utils import ERPNextTestSuite
|
||||||
|
|
||||||
|
SAMPLE_GENERICODE = b"""<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<CodeList>
|
||||||
|
<Identification>
|
||||||
|
<ShortName>Test Code List</ShortName>
|
||||||
|
<Version>1.0</Version>
|
||||||
|
<CanonicalUri>test-code-list</CanonicalUri>
|
||||||
|
<LongName>Code list for tests</LongName>
|
||||||
|
<Agency>
|
||||||
|
<ShortName>Test Agency</ShortName>
|
||||||
|
<Identifier>TEST</Identifier>
|
||||||
|
</Agency>
|
||||||
|
<LocationUri>https://example.com/codelists/test.xml</LocationUri>
|
||||||
|
</Identification>
|
||||||
|
<CanonicalVersionUri>test-code-list-v1</CanonicalVersionUri>
|
||||||
|
<ColumnSet>
|
||||||
|
<Column Id="code" />
|
||||||
|
<Column Id="name" />
|
||||||
|
<Column Id="category" />
|
||||||
|
</ColumnSet>
|
||||||
|
<SimpleCodeList>
|
||||||
|
<Row>
|
||||||
|
<Value ColumnRef="code"><SimpleValue>A</SimpleValue></Value>
|
||||||
|
<Value ColumnRef="name"><SimpleValue>Alpha</SimpleValue></Value>
|
||||||
|
<Value ColumnRef="category"><SimpleValue>Group 1</SimpleValue></Value>
|
||||||
|
</Row>
|
||||||
|
<Row>
|
||||||
|
<Value ColumnRef="code"><SimpleValue>B</SimpleValue></Value>
|
||||||
|
<Value ColumnRef="name"><SimpleValue>Beta</SimpleValue></Value>
|
||||||
|
<Value ColumnRef="category"><SimpleValue>Group 2</SimpleValue></Value>
|
||||||
|
</Row>
|
||||||
|
<Row>
|
||||||
|
<Value ColumnRef="code"><SimpleValue>C</SimpleValue></Value>
|
||||||
|
<Value ColumnRef="name"><SimpleValue>Gamma</SimpleValue></Value>
|
||||||
|
<Value ColumnRef="category"><SimpleValue>Group 1</SimpleValue></Value>
|
||||||
|
</Row>
|
||||||
|
</SimpleCodeList>
|
||||||
|
</CodeList>
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class TestCodeListImport(ERPNextTestSuite):
|
||||||
|
def test_import_genericode_rejects_remote_file_url(self):
|
||||||
|
self.set_upload_context(
|
||||||
|
file_name="trusted.xml",
|
||||||
|
file_url="https://example.com/codelists/trusted.xml",
|
||||||
|
)
|
||||||
|
|
||||||
|
with patch("erpnext.edi.doctype.code_list.code_list_import.requests.get") as mock_get:
|
||||||
|
with self.assertRaisesRegex(
|
||||||
|
frappe.ValidationError, "Importing Code Lists from remote URLs is not allowed."
|
||||||
|
):
|
||||||
|
code_list_import.import_genericode()
|
||||||
|
|
||||||
|
mock_get.assert_not_called()
|
||||||
|
|
||||||
|
def test_import_genericode_rejects_file_scheme_url(self):
|
||||||
|
self.set_upload_context(
|
||||||
|
file_name="trusted.xml",
|
||||||
|
file_url="file:///tmp/trusted.xml",
|
||||||
|
)
|
||||||
|
|
||||||
|
with patch("erpnext.edi.doctype.code_list.code_list_import.requests.get") as mock_get:
|
||||||
|
with self.assertRaisesRegex(
|
||||||
|
frappe.ValidationError, "Importing Code Lists from remote URLs is not allowed."
|
||||||
|
):
|
||||||
|
code_list_import.import_genericode()
|
||||||
|
|
||||||
|
mock_get.assert_not_called()
|
||||||
|
|
||||||
|
def test_import_genericode_from_trusted_url(self):
|
||||||
|
response = Mock()
|
||||||
|
response.content = SAMPLE_GENERICODE
|
||||||
|
response.raise_for_status.return_value = None
|
||||||
|
|
||||||
|
with patch(
|
||||||
|
"erpnext.edi.doctype.code_list.code_list_import.requests.get",
|
||||||
|
return_value=response,
|
||||||
|
) as mock_get:
|
||||||
|
import_result = code_list_import.import_genericode_from_url(
|
||||||
|
"https://example.com/codelists/trusted.xml"
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assert_import_response(import_result)
|
||||||
|
mock_get.assert_called_once_with(
|
||||||
|
"https://example.com/codelists/trusted.xml",
|
||||||
|
timeout=code_list_import.GENERICODE_FETCH_TIMEOUT,
|
||||||
|
)
|
||||||
|
|
||||||
|
file_doc = frappe.get_doc("File", import_result["file"])
|
||||||
|
self.assertEqual(file_doc.get_content(encodings=()), SAMPLE_GENERICODE)
|
||||||
|
self.assertFalse(file_doc.file_url.startswith("https://"))
|
||||||
|
|
||||||
|
def test_import_genericode_from_trusted_url_propagates_fetch_errors(self):
|
||||||
|
with patch(
|
||||||
|
"erpnext.edi.doctype.code_list.code_list_import.requests.get",
|
||||||
|
side_effect=requests.Timeout,
|
||||||
|
):
|
||||||
|
with self.assertRaises(requests.Timeout):
|
||||||
|
code_list_import.import_genericode_from_url("https://example.com/codelists/trusted.xml")
|
||||||
|
|
||||||
|
def test_import_genericode_from_uploaded_file_returns_metadata(self):
|
||||||
|
self.set_upload_context(content=SAMPLE_GENERICODE, file_name="uploaded_genericode.xml")
|
||||||
|
|
||||||
|
import_result = code_list_import.import_genericode()
|
||||||
|
|
||||||
|
self.assert_import_response(import_result)
|
||||||
|
|
||||||
|
file_doc = frappe.get_doc("File", import_result["file"])
|
||||||
|
self.assertEqual(file_doc.get_content(encodings=()), SAMPLE_GENERICODE)
|
||||||
|
|
||||||
|
def test_process_genericode_import_reads_file_doc_content(self):
|
||||||
|
self.set_upload_context(content=SAMPLE_GENERICODE, file_name="uploaded_genericode.xml")
|
||||||
|
|
||||||
|
import_result = code_list_import.import_genericode()
|
||||||
|
count = code_list_import.process_genericode_import(
|
||||||
|
code_list_name=import_result["code_list"],
|
||||||
|
file_name=import_result["file"],
|
||||||
|
code_column="code",
|
||||||
|
title_column="name",
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(count, 3)
|
||||||
|
self.assertEqual(frappe.db.count("Common Code", {"code_list": import_result["code_list"]}), 3)
|
||||||
|
self.assertEqual(
|
||||||
|
frappe.db.get_value(
|
||||||
|
"Common Code",
|
||||||
|
{"code_list": import_result["code_list"], "common_code": "A"},
|
||||||
|
"title",
|
||||||
|
),
|
||||||
|
"Alpha",
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_import_genericode_from_local_file_url(self):
|
||||||
|
source_file = frappe.get_doc(
|
||||||
|
{
|
||||||
|
"doctype": "File",
|
||||||
|
"file_name": "library_genericode.xml",
|
||||||
|
"content": SAMPLE_GENERICODE,
|
||||||
|
"is_private": 1,
|
||||||
|
}
|
||||||
|
).insert()
|
||||||
|
self.set_upload_context(file_name=source_file.file_name, file_url=source_file.file_url)
|
||||||
|
|
||||||
|
import_result = code_list_import.import_genericode()
|
||||||
|
|
||||||
|
self.assert_import_response(import_result)
|
||||||
|
|
||||||
|
def set_upload_context(
|
||||||
|
self,
|
||||||
|
content: bytes | None = None,
|
||||||
|
file_name: str = "genericode.xml",
|
||||||
|
file_url: str | None = None,
|
||||||
|
docname: str | None = None,
|
||||||
|
):
|
||||||
|
attrs = ("form_dict", "uploaded_file", "uploaded_file_url", "uploaded_filename")
|
||||||
|
originals = {attr: getattr(frappe.local, attr, None) for attr in attrs}
|
||||||
|
|
||||||
|
frappe.local.form_dict = frappe._dict(doctype="Code List", docname=docname)
|
||||||
|
frappe.local.uploaded_file = content
|
||||||
|
frappe.local.uploaded_file_url = file_url
|
||||||
|
frappe.local.uploaded_filename = file_name
|
||||||
|
|
||||||
|
def restore():
|
||||||
|
for attr, value in originals.items():
|
||||||
|
setattr(frappe.local, attr, value)
|
||||||
|
|
||||||
|
self.addCleanup(restore)
|
||||||
|
|
||||||
|
def assert_import_response(self, import_result):
|
||||||
|
self.assertEqual(
|
||||||
|
set(import_result),
|
||||||
|
{
|
||||||
|
"code_list",
|
||||||
|
"code_list_title",
|
||||||
|
"file",
|
||||||
|
"columns",
|
||||||
|
"example_values",
|
||||||
|
"filterable_columns",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
self.assertEqual(import_result["code_list"], "test-code-list-v1")
|
||||||
|
self.assertEqual(import_result["code_list_title"], "Test Code List")
|
||||||
|
self.assertEqual(import_result["columns"], ["code", "name", "category"])
|
||||||
|
self.assertEqual(import_result["example_values"]["code"], ["A", "B", "C"])
|
||||||
|
self.assertEqual(import_result["example_values"]["name"], ["Alpha", "Beta", "Gamma"])
|
||||||
|
self.assertEqual(import_result["example_values"]["category"], ["Group 1", "Group 2", "Group 1"])
|
||||||
|
self.assertCountEqual(import_result["filterable_columns"]["category"], ["Group 1", "Group 2"])
|
||||||
|
self.assertTrue(frappe.db.exists("Code List", import_result["code_list"]))
|
||||||
|
self.assertTrue(frappe.db.exists("File", import_result["file"]))
|
||||||
@@ -9,6 +9,8 @@ from frappe.model.document import Document
|
|||||||
from frappe.utils.data import get_link_to_form
|
from frappe.utils.data import get_link_to_form
|
||||||
from lxml import etree
|
from lxml import etree
|
||||||
|
|
||||||
|
from erpnext.edi.doctype.code_list.code_list_import import parse_genericode_content
|
||||||
|
|
||||||
|
|
||||||
class CommonCode(Document):
|
class CommonCode(Document):
|
||||||
# begin: auto-generated types
|
# begin: auto-generated types
|
||||||
@@ -86,15 +88,15 @@ def simple_hash(input_string, length=6):
|
|||||||
|
|
||||||
def import_genericode(code_list: str, file_name: str, column_map: dict, filters: dict | None = None):
|
def import_genericode(code_list: str, file_name: str, column_map: dict, filters: dict | None = None):
|
||||||
"""Import genericode file and create Common Code entries"""
|
"""Import genericode file and create Common Code entries"""
|
||||||
file_path = frappe.utils.file_manager.get_file_path(file_name)
|
file_doc = frappe.get_doc("File", file_name)
|
||||||
parser = etree.XMLParser(remove_blank_text=True)
|
file_doc.check_permission("read")
|
||||||
tree = etree.parse(file_path, parser=parser)
|
root = parse_genericode_content(file_doc.get_content(encodings=()))
|
||||||
root = tree.getroot()
|
|
||||||
|
|
||||||
# Construct the XPath expression
|
# Construct the XPath expression
|
||||||
xpath_expr = ".//SimpleCodeList/Row"
|
xpath_expr = ".//SimpleCodeList/Row"
|
||||||
filter_conditions = [
|
filter_conditions = [
|
||||||
f"Value[@ColumnRef='{column_ref}']/SimpleValue='{value}'" for column_ref, value in filters.items()
|
f"Value[@ColumnRef='{column_ref}']/SimpleValue='{value}'"
|
||||||
|
for column_ref, value in (filters or {}).items()
|
||||||
]
|
]
|
||||||
if filter_conditions:
|
if filter_conditions:
|
||||||
xpath_expr += "[" + " and ".join(filter_conditions) + "]"
|
xpath_expr += "[" + " and ".join(filter_conditions) + "]"
|
||||||
@@ -102,7 +104,7 @@ def import_genericode(code_list: str, file_name: str, column_map: dict, filters:
|
|||||||
elements = root.xpath(xpath_expr)
|
elements = root.xpath(xpath_expr)
|
||||||
total_elements = len(elements)
|
total_elements = len(elements)
|
||||||
for i, xml_element in enumerate(elements, start=1):
|
for i, xml_element in enumerate(elements, start=1):
|
||||||
common_code: "CommonCode" = frappe.new_doc("Common Code")
|
common_code: CommonCode = frappe.new_doc("Common Code")
|
||||||
common_code.code_list = code_list
|
common_code.code_list = code_list
|
||||||
common_code.from_genericode(column_map, xml_element)
|
common_code.from_genericode(column_map, xml_element)
|
||||||
common_code.save()
|
common_code.save()
|
||||||
|
|||||||
Reference in New Issue
Block a user