fix(edi): restrict Code List imports to files and trusted backend URLs (#54137)

This commit is contained in:
Raffael Meyer
2026-04-13 18:14:24 +02:00
committed by GitHub
parent 2f025272d7
commit 8a72d7fafe
4 changed files with 317 additions and 52 deletions

View File

@@ -10,6 +10,7 @@ erpnext.edi.import_genericode = function (listview_or_form) {
method: "erpnext.edi.doctype.code_list.code_list_import.import_genericode",
doctype: doctype,
docname: docname,
allow_web_link: false,
allow_toggle_private: false,
allow_take_photo: false,
on_success: function (_file_doc, r) {

View File

@@ -1,48 +1,106 @@
import json
from urllib.parse import urlsplit
import frappe
import requests
from frappe import _
from frappe.utils import escape_html
from frappe.utils.file_manager import save_file
from lxml import etree
URL_PREFIXES = ("http://", "https://")
GENERICODE_FETCH_TIMEOUT = 15
LOCAL_FILE_PREFIXES = ("/files/", "/private/files/")
class RemoteGenericodeUrlNotAllowedError(Exception):
pass
class CodeListSelectionMismatchError(Exception):
pass
@frappe.whitelist()
def import_genericode():
doctype = frappe.form_dict.doctype
docname = frappe.form_dict.docname
content = frappe.local.uploaded_file
# recover the content, if it's a link
if (file_url := frappe.local.uploaded_file_url) and file_url.startswith(URL_PREFIXES):
try:
# If it's a URL, fetch the content and make it a local file (for durable audit)
response = requests.get(frappe.local.uploaded_file_url)
response.raise_for_status()
frappe.local.uploaded_file = content = response.content
frappe.local.uploaded_filename = frappe.local.uploaded_file_url.split("/")[-1]
frappe.local.uploaded_file_url = None
except Exception as e:
frappe.throw(f"<pre>{e!s}</pre>", title=_("Fetching Error"))
if file_url := frappe.local.uploaded_file_url:
file_path = frappe.utils.file_manager.get_file_path(file_url)
with open(file_path.encode(), mode="rb") as f:
content = f.read()
# Parse the xml content
parser = etree.XMLParser(
remove_blank_text=True,
resolve_entities=False,
load_dtd=False,
no_network=True,
)
try:
root = etree.fromstring(content, parser=parser)
except Exception as e:
frappe.throw(f"<pre>{e!s}</pre>", title=_("Parsing Error"))
content, file_name = get_uploaded_genericode_file()
return import_genericode_content(
doctype=frappe.form_dict.doctype,
docname=frappe.form_dict.docname,
content=content,
file_name=file_name,
)
except RemoteGenericodeUrlNotAllowedError:
frappe.throw(
_("Importing Code Lists from remote URLs is not allowed."),
title=_("Invalid Upload"),
)
except CodeListSelectionMismatchError:
frappe.throw(_("The uploaded file does not match the selected Code List."))
except etree.XMLSyntaxError:
frappe.throw(
_("The uploaded file could not be parsed as a genericode XML document."),
title=_("Parsing Error"),
)
def import_genericode_from_url(
url: str,
doctype: str = "Code List",
docname: str | None = None,
):
"""Import a Code List from a trusted backend URL."""
content = fetch_genericode_from_url(url)
file_name = urlsplit(url).path.rsplit("/", 1)[-1] or "genericode.xml"
return import_genericode_content(
doctype=doctype,
docname=docname,
content=content,
file_name=file_name,
)
def get_uploaded_genericode_file() -> tuple[bytes, str | None]:
uploaded_data = frappe.local.uploaded_file
file_name = frappe.local.uploaded_filename
if uploaded_data and file_name:
return uploaded_data, file_name
file_url = frappe.local.uploaded_file_url
if not file_url:
raise frappe.ValidationError(_("No file uploaded or URL provided."))
if not is_local_file_url(file_url):
raise RemoteGenericodeUrlNotAllowedError
file_doc = frappe.get_doc("File", {"file_url": file_url})
file_doc.check_permission("read")
return file_doc.get_content(encodings=()), file_name
def is_local_file_url(file_url: str | None) -> bool:
if not file_url:
return False
parsed = urlsplit(file_url.strip())
return not parsed.scheme and not parsed.netloc and parsed.path.startswith(LOCAL_FILE_PREFIXES)
def fetch_genericode_from_url(url: str) -> bytes:
response = requests.get(url, timeout=GENERICODE_FETCH_TIMEOUT)
response.raise_for_status()
return response.content
def import_genericode_content(
doctype: str,
docname: str | None,
content: bytes,
file_name: str | None,
):
root = parse_genericode_content(content)
# Extract the name (CanonicalVersionUri) from the parsed XML
name = root.find(".//CanonicalVersionUri").text
@@ -51,7 +109,7 @@ def import_genericode():
if frappe.db.exists(doctype, docname):
code_list = frappe.get_doc(doctype, docname)
if code_list.name != name:
frappe.throw(_("The uploaded file does not match the selected Code List."))
raise CodeListSelectionMismatchError
else:
# Create a new Code List document with the extracted name
code_list = frappe.new_doc(doctype)
@@ -60,19 +118,13 @@ def import_genericode():
code_list.from_genericode(root)
code_list.save()
# Attach the file and provide a recoverable identifier
file_doc = frappe.get_doc(
{
"doctype": "File",
"attached_to_doctype": "Code List",
"attached_to_name": code_list.name,
"folder": frappe.db.get_value("File", {"is_attachments_folder": 1}),
"file_name": frappe.local.uploaded_filename,
"file_url": frappe.local.uploaded_file_url,
"is_private": 1,
"content": content,
}
).save()
file_doc = save_file(
fname=file_name,
content=content,
dt=doctype,
dn=code_list.name,
is_private=1,
)
# Get available columns and example values
columns, example_values, filterable_columns = get_genericode_columns_and_examples(root)
@@ -87,6 +139,16 @@ def import_genericode():
}
def parse_genericode_content(content: bytes):
parser = etree.XMLParser(
remove_blank_text=True,
resolve_entities=False,
load_dtd=False,
no_network=True,
)
return etree.fromstring(content, parser=parser)
@frappe.whitelist()
def process_genericode_import(
code_list_name: str,

View File

@@ -0,0 +1,200 @@
# Copyright (c) 2024, Frappe Technologies Pvt. Ltd. and Contributors
# See license.txt
from unittest.mock import Mock, patch
import frappe
import requests
from erpnext.edi.doctype.code_list import code_list_import
from erpnext.tests.utils import ERPNextTestSuite
SAMPLE_GENERICODE = b"""<?xml version="1.0" encoding="UTF-8"?>
<CodeList>
<Identification>
<ShortName>Test Code List</ShortName>
<Version>1.0</Version>
<CanonicalUri>test-code-list</CanonicalUri>
<LongName>Code list for tests</LongName>
<Agency>
<ShortName>Test Agency</ShortName>
<Identifier>TEST</Identifier>
</Agency>
<LocationUri>https://example.com/codelists/test.xml</LocationUri>
</Identification>
<CanonicalVersionUri>test-code-list-v1</CanonicalVersionUri>
<ColumnSet>
<Column Id="code" />
<Column Id="name" />
<Column Id="category" />
</ColumnSet>
<SimpleCodeList>
<Row>
<Value ColumnRef="code"><SimpleValue>A</SimpleValue></Value>
<Value ColumnRef="name"><SimpleValue>Alpha</SimpleValue></Value>
<Value ColumnRef="category"><SimpleValue>Group 1</SimpleValue></Value>
</Row>
<Row>
<Value ColumnRef="code"><SimpleValue>B</SimpleValue></Value>
<Value ColumnRef="name"><SimpleValue>Beta</SimpleValue></Value>
<Value ColumnRef="category"><SimpleValue>Group 2</SimpleValue></Value>
</Row>
<Row>
<Value ColumnRef="code"><SimpleValue>C</SimpleValue></Value>
<Value ColumnRef="name"><SimpleValue>Gamma</SimpleValue></Value>
<Value ColumnRef="category"><SimpleValue>Group 1</SimpleValue></Value>
</Row>
</SimpleCodeList>
</CodeList>
"""
class TestCodeListImport(ERPNextTestSuite):
def test_import_genericode_rejects_remote_file_url(self):
self.set_upload_context(
file_name="trusted.xml",
file_url="https://example.com/codelists/trusted.xml",
)
with patch("erpnext.edi.doctype.code_list.code_list_import.requests.get") as mock_get:
with self.assertRaisesRegex(
frappe.ValidationError, "Importing Code Lists from remote URLs is not allowed."
):
code_list_import.import_genericode()
mock_get.assert_not_called()
def test_import_genericode_rejects_file_scheme_url(self):
self.set_upload_context(
file_name="trusted.xml",
file_url="file:///tmp/trusted.xml",
)
with patch("erpnext.edi.doctype.code_list.code_list_import.requests.get") as mock_get:
with self.assertRaisesRegex(
frappe.ValidationError, "Importing Code Lists from remote URLs is not allowed."
):
code_list_import.import_genericode()
mock_get.assert_not_called()
def test_import_genericode_from_trusted_url(self):
response = Mock()
response.content = SAMPLE_GENERICODE
response.raise_for_status.return_value = None
with patch(
"erpnext.edi.doctype.code_list.code_list_import.requests.get",
return_value=response,
) as mock_get:
import_result = code_list_import.import_genericode_from_url(
"https://example.com/codelists/trusted.xml"
)
self.assert_import_response(import_result)
mock_get.assert_called_once_with(
"https://example.com/codelists/trusted.xml",
timeout=code_list_import.GENERICODE_FETCH_TIMEOUT,
)
file_doc = frappe.get_doc("File", import_result["file"])
self.assertEqual(file_doc.file_name, "trusted.xml")
self.assertFalse(file_doc.file_url.startswith("https://"))
def test_import_genericode_from_trusted_url_propagates_fetch_errors(self):
with patch(
"erpnext.edi.doctype.code_list.code_list_import.requests.get",
side_effect=requests.Timeout,
):
with self.assertRaises(requests.Timeout):
code_list_import.import_genericode_from_url("https://example.com/codelists/trusted.xml")
def test_import_genericode_from_uploaded_file_returns_metadata(self):
self.set_upload_context(content=SAMPLE_GENERICODE, file_name="uploaded_genericode.xml")
import_result = code_list_import.import_genericode()
self.assert_import_response(import_result)
file_doc = frappe.get_doc("File", import_result["file"])
self.assertEqual(file_doc.file_name, "uploaded_genericode.xml")
def test_process_genericode_import_reads_file_doc_content(self):
self.set_upload_context(content=SAMPLE_GENERICODE, file_name="uploaded_genericode.xml")
import_result = code_list_import.import_genericode()
count = code_list_import.process_genericode_import(
code_list_name=import_result["code_list"],
file_name=import_result["file"],
code_column="code",
title_column="name",
)
self.assertEqual(count, 3)
self.assertEqual(frappe.db.count("Common Code", {"code_list": import_result["code_list"]}), 3)
self.assertEqual(
frappe.db.get_value(
"Common Code",
{"code_list": import_result["code_list"], "common_code": "A"},
"title",
),
"Alpha",
)
def test_import_genericode_from_local_file_url(self):
source_file = frappe.get_doc(
{
"doctype": "File",
"file_name": "library_genericode.xml",
"content": SAMPLE_GENERICODE,
"is_private": 1,
}
).insert()
self.set_upload_context(file_name=source_file.file_name, file_url=source_file.file_url)
import_result = code_list_import.import_genericode()
self.assert_import_response(import_result)
def set_upload_context(
self,
content: bytes | None = None,
file_name: str = "genericode.xml",
file_url: str | None = None,
docname: str | None = None,
):
attrs = ("form_dict", "uploaded_file", "uploaded_file_url", "uploaded_filename")
originals = {attr: getattr(frappe.local, attr, None) for attr in attrs}
frappe.local.form_dict = frappe._dict(doctype="Code List", docname=docname)
frappe.local.uploaded_file = content
frappe.local.uploaded_file_url = file_url
frappe.local.uploaded_filename = file_name
def restore():
for attr, value in originals.items():
setattr(frappe.local, attr, value)
self.addCleanup(restore)
def assert_import_response(self, import_result):
self.assertEqual(
set(import_result),
{
"code_list",
"code_list_title",
"file",
"columns",
"example_values",
"filterable_columns",
},
)
self.assertEqual(import_result["code_list"], "test-code-list-v1")
self.assertEqual(import_result["code_list_title"], "Test Code List")
self.assertEqual(import_result["columns"], ["code", "name", "category"])
self.assertEqual(import_result["example_values"]["code"], ["A", "B", "C"])
self.assertEqual(import_result["example_values"]["name"], ["Alpha", "Beta", "Gamma"])
self.assertEqual(import_result["example_values"]["category"], ["Group 1", "Group 2", "Group 1"])
self.assertCountEqual(import_result["filterable_columns"]["category"], ["Group 1", "Group 2"])
self.assertTrue(frappe.db.exists("Code List", import_result["code_list"]))
self.assertTrue(frappe.db.exists("File", import_result["file"]))

View File

@@ -9,6 +9,8 @@ from frappe.model.document import Document
from frappe.utils.data import get_link_to_form
from lxml import etree
from erpnext.edi.doctype.code_list.code_list_import import parse_genericode_content
class CommonCode(Document):
# begin: auto-generated types
@@ -86,15 +88,15 @@ def simple_hash(input_string, length=6):
def import_genericode(code_list: str, file_name: str, column_map: dict, filters: dict | None = None):
"""Import genericode file and create Common Code entries"""
file_path = frappe.utils.file_manager.get_file_path(file_name)
parser = etree.XMLParser(remove_blank_text=True)
tree = etree.parse(file_path, parser=parser)
root = tree.getroot()
file_doc = frappe.get_doc("File", file_name)
file_doc.check_permission("read")
root = parse_genericode_content(file_doc.get_content(encodings=()))
# Construct the XPath expression
xpath_expr = ".//SimpleCodeList/Row"
filter_conditions = [
f"Value[@ColumnRef='{column_ref}']/SimpleValue='{value}'" for column_ref, value in filters.items()
f"Value[@ColumnRef='{column_ref}']/SimpleValue='{value}'"
for column_ref, value in (filters or {}).items()
]
if filter_conditions:
xpath_expr += "[" + " and ".join(filter_conditions) + "]"
@@ -102,7 +104,7 @@ def import_genericode(code_list: str, file_name: str, column_map: dict, filters:
elements = root.xpath(xpath_expr)
total_elements = len(elements)
for i, xml_element in enumerate(elements, start=1):
common_code: "CommonCode" = frappe.new_doc("Common Code")
common_code: CommonCode = frappe.new_doc("Common Code")
common_code.code_list = code_list
common_code.from_genericode(column_map, xml_element)
common_code.save()