Merge pull request #54466 from rohitwaghchaure/revamp-stock-entry

refactor: stock_entry file to improve readability and maintainability
This commit is contained in:
rohitwaghchaure
2026-05-21 19:47:04 +05:30
committed by GitHub
25 changed files with 3810 additions and 3056 deletions

View File

@@ -1403,16 +1403,18 @@ def make_rm_stock_entry(
items_dict = { items_dict = {
rm_item_code: { rm_item_code: {
rm_detail_field: rm_item.get("name"), rm_detail_field: rm_item.get("name"),
"item_code": rm_item_code,
"item_name": rm_item.get("item_name") "item_name": rm_item.get("item_name")
or item_wh.get(rm_item_code, {}).get("item_name", ""), or item_wh.get(rm_item_code, {}).get("item_name", ""),
"description": item_wh.get(rm_item_code, {}).get("description", ""), "description": item_wh.get(rm_item_code, {}).get("description", ""),
"qty": qty, "qty": qty,
"from_warehouse": rm_item.get("warehouse") "s_warehouse": rm_item.get("warehouse")
or rm_item.get("reserve_warehouse"), or rm_item.get("reserve_warehouse"),
"to_warehouse": source_doc.supplier_warehouse, "t_warehouse": source_doc.supplier_warehouse,
"stock_uom": rm_item.get("stock_uom"), "stock_uom": rm_item.get("stock_uom"),
"serial_and_batch_bundle": rm_item.get("serial_and_batch_bundle"), "serial_and_batch_bundle": rm_item.get("serial_and_batch_bundle"),
"main_item_code": fg_item_code, "main_item_code": fg_item_code,
"subcontracted_item": fg_item_code,
"allow_alternative_item": item_wh.get(rm_item_code, {}).get( "allow_alternative_item": item_wh.get(rm_item_code, {}).get(
"allow_alternative_item" "allow_alternative_item"
), ),
@@ -1426,7 +1428,7 @@ def make_rm_stock_entry(
} }
} }
target_doc.add_to_stock_entry_detail(items_dict) target_doc.append("items", items_dict[rm_item_code])
stock_entry = get_mapped_doc( stock_entry = get_mapped_doc(
order_doctype, order_doctype,

View File

@@ -2023,10 +2023,10 @@ def get_secondary_items_from_sub_assemblies(bom_no, company, qty, secondary_item
return secondary_items return secondary_items
def get_backflush_based_on(bom_no): def get_backflush_based_on(bom_no=None):
backflush_based_on = None backflush_based_on = None
if bom_no: if bom_no:
backflush_based_on = frappe.get_cached_value("BOM", bom_no, "backflush_based_on") backflush_based_on = frappe.db.get_value("BOM", bom_no, "backflush_based_on")
if not backflush_based_on: if not backflush_based_on:
backflush_based_on = frappe.db.get_single_value( backflush_based_on = frappe.db.get_single_value(

View File

@@ -1476,6 +1476,8 @@ class JobCard(Document):
@frappe.whitelist() @frappe.whitelist()
def make_stock_entry_for_semi_fg_item(self, auto_submit: bool = False): def make_stock_entry_for_semi_fg_item(self, auto_submit: bool = False):
from erpnext.stock.doctype.stock_entry.stock_entry_handler.manufacturing import ManufactureStockEntry
def get_consumed_process_loss(): def get_consumed_process_loss():
table = frappe.qb.DocType("Stock Entry") table = frappe.qb.DocType("Stock Entry")
query = ( query = (
@@ -1511,9 +1513,7 @@ class JobCard(Document):
ste.stock_entry.flags.ignore_mandatory = True ste.stock_entry.flags.ignore_mandatory = True
wo_doc = frappe.get_doc("Work Order", self.work_order) wo_doc = frappe.get_doc("Work Order", self.work_order)
add_additional_cost(ste.stock_entry, wo_doc, self) add_additional_cost(ste.stock_entry, wo_doc, self)
ManufactureStockEntry(ste.stock_entry).add_secondary_items_from_job_card()
ste.stock_entry.pro_doc = frappe.get_doc("Work Order", self.work_order)
ste.stock_entry.set_secondary_items_from_job_card()
for row in ste.stock_entry.items: for row in ste.stock_entry.items:
if (row.type or row.is_legacy_scrap_item) and not row.t_warehouse: if (row.type or row.is_legacy_scrap_item) and not row.t_warehouse:
row.t_warehouse = self.target_warehouse row.t_warehouse = self.target_warehouse

View File

@@ -31,6 +31,7 @@ from erpnext.stock.doctype.serial_and_batch_bundle.test_serial_and_batch_bundle
) )
from erpnext.stock.doctype.serial_no.serial_no import get_serial_nos from erpnext.stock.doctype.serial_no.serial_no import get_serial_nos
from erpnext.stock.doctype.stock_entry import test_stock_entry from erpnext.stock.doctype.stock_entry import test_stock_entry
from erpnext.stock.doctype.stock_entry.stock_entry_handler.manufacturing import ManufactureStockEntry
from erpnext.stock.doctype.warehouse.test_warehouse import create_warehouse from erpnext.stock.doctype.warehouse.test_warehouse import create_warehouse
from erpnext.stock.utils import get_bin from erpnext.stock.utils import get_bin
from erpnext.tests.utils import ERPNextTestSuite from erpnext.tests.utils import ERPNextTestSuite
@@ -683,7 +684,10 @@ class TestWorkOrder(ERPNextTestSuite):
def test_cost_center_for_manufacture(self): def test_cost_center_for_manufacture(self):
wo_order = make_wo_order_test_record() wo_order = make_wo_order_test_record()
ste = make_stock_entry(wo_order.name, "Material Transfer for Manufacture", wo_order.qty) ste = frappe.get_doc(
make_stock_entry(wo_order.name, "Material Transfer for Manufacture", wo_order.qty)
)
ste.save()
self.assertEqual(ste.get("items")[0].get("cost_center"), "_Test Cost Center - _TC") self.assertEqual(ste.get("items")[0].get("cost_center"), "_Test Cost Center - _TC")
def test_operation_time_with_batch_size(self): def test_operation_time_with_batch_size(self):
@@ -1319,7 +1323,6 @@ class TestWorkOrder(ERPNextTestSuite):
stock_entry = frappe.get_doc(make_stock_entry(wo_order.name, "Manufacture", 10)) stock_entry = frappe.get_doc(make_stock_entry(wo_order.name, "Manufacture", 10))
stock_entry.set_work_order_details() stock_entry.set_work_order_details()
stock_entry.set_serial_no_batch_for_finished_good()
for row in stock_entry.items: for row in stock_entry.items:
if row.item_code == fg_item: if row.item_code == fg_item:
self.assertTrue(row.serial_and_batch_bundle) self.assertTrue(row.serial_and_batch_bundle)
@@ -1360,7 +1363,6 @@ class TestWorkOrder(ERPNextTestSuite):
stock_entry = frappe.get_doc(make_stock_entry(wo_order.name, "Manufacture", 10)) stock_entry = frappe.get_doc(make_stock_entry(wo_order.name, "Manufacture", 10))
stock_entry.set_work_order_details() stock_entry.set_work_order_details()
stock_entry.set_serial_no_batch_for_finished_good()
for row in stock_entry.items: for row in stock_entry.items:
if row.item_code == fg_item: if row.item_code == fg_item:
self.assertTrue(row.serial_and_batch_bundle) self.assertTrue(row.serial_and_batch_bundle)
@@ -4291,7 +4293,6 @@ class TestWorkOrder(ERPNextTestSuite):
) )
material_transfer_entry.submit() material_transfer_entry.submit()
manufacture_entry = frappe.get_doc(make_stock_entry(wo_order.name, "Manufacture", 1)) manufacture_entry = frappe.get_doc(make_stock_entry(wo_order.name, "Manufacture", 1))
manufacture_entry.save() manufacture_entry.save()

View File

@@ -2087,8 +2087,9 @@ class WorkOrder(Document):
additional_items = frappe._dict() additional_items = frappe._dict()
for row in stock_entry.items: for row in stock_entry.items:
if row.item_code not in required_items: item_code = row.original_item if row.original_item else row.item_code
additional_items.setdefault(row.item_code, []).append(row) if item_code not in required_items:
additional_items.setdefault(item_code, []).append(row)
self.flags.ignore_validate_update_after_submit = True self.flags.ignore_validate_update_after_submit = True
@@ -2453,10 +2454,6 @@ def make_stock_entry(
stock_entry.set_stock_entry_type() stock_entry.set_stock_entry_type()
stock_entry.is_additional_transfer_entry = is_additional_transfer_entry stock_entry.is_additional_transfer_entry = is_additional_transfer_entry
stock_entry.get_items() stock_entry.get_items()
stock_entry.set_secondary_items_from_job_card()
if purpose != "Disassemble":
stock_entry.set_serial_no_batch_for_finished_good()
return stock_entry.as_dict() return stock_entry.as_dict()
@@ -2817,11 +2814,9 @@ def get_reserved_qty_for_production(
@frappe.whitelist() @frappe.whitelist()
def make_stock_return_entry(work_order: str): def make_stock_return_entry(work_order: str):
from erpnext.stock.doctype.stock_entry.stock_entry import get_available_materials from erpnext.stock.doctype.stock_entry.stock_entry_handler.manufacturing import (
ManufactureStockEntry,
non_consumed_items = get_available_materials(work_order) )
if not non_consumed_items:
return
wo_doc = frappe.get_cached_doc("Work Order", work_order) wo_doc = frappe.get_cached_doc("Work Order", work_order)
@@ -2831,9 +2826,11 @@ def make_stock_return_entry(work_order: str):
stock_entry.work_order = work_order stock_entry.work_order = work_order
stock_entry.purpose = "Material Transfer for Manufacture" stock_entry.purpose = "Material Transfer for Manufacture"
stock_entry.bom_no = wo_doc.bom_no stock_entry.bom_no = wo_doc.bom_no
stock_entry.add_transfered_raw_materials_in_items()
stock_entry.set_stock_entry_type() stock_entry.set_stock_entry_type()
ste_cls = ManufactureStockEntry(stock_entry)
ste_cls.add_raw_materials_based_on_transfer()
ste_cls.return_available_materials_in_source_wh()
return stock_entry return stock_entry

View File

@@ -91,14 +91,14 @@ class Project(Document):
def validate(self): def validate(self):
if not self.is_new(): if not self.is_new():
self.copy_from_template() # nosemgrep self.copy_from_template()
self.send_welcome_email() self.send_welcome_email()
self.update_costing() self.update_costing()
self.update_percent_complete() self.update_percent_complete()
self.validate_from_to_dates("expected_start_date", "expected_end_date") self.validate_from_to_dates("expected_start_date", "expected_end_date")
self.validate_from_to_dates("actual_start_date", "actual_end_date") self.validate_from_to_dates("actual_start_date", "actual_end_date")
def copy_from_template(self): # nosemgrep def copy_from_template(self, trigger=None):
""" """
Copy tasks from template Copy tasks from template
""" """
@@ -107,11 +107,15 @@ class Project(Document):
if not self.expected_start_date: if not self.expected_start_date:
# project starts today # project starts today
self.expected_start_date = today() self.expected_start_date = today()
if trigger == "after_insert":
self.db_set("expected_start_date", self.expected_start_date)
template = frappe.get_doc("Project Template", self.project_template) template = frappe.get_doc("Project Template", self.project_template)
if not self.project_type: if not self.project_type:
self.project_type = template.project_type self.project_type = template.project_type
if trigger == "after_insert":
self.db_set("project_type", self.project_type)
# create tasks from template # create tasks from template
project_tasks = [] project_tasks = []
@@ -164,6 +168,40 @@ class Project(Document):
self.check_depends_on_value(template_task, project_task, project_tasks) self.check_depends_on_value(template_task, project_task, project_tasks)
self.check_for_parent_tasks(template_task, project_task, project_tasks) self.check_for_parent_tasks(template_task, project_task, project_tasks)
def set_consumed_material_cost(self):
parent_doc = frappe.qb.DocType("Stock Entry")
child_doc = frappe.qb.DocType("Stock Entry Detail")
lcv_doc = frappe.qb.DocType("Landed Cost Taxes and Charges")
amount = (
qb.from_(child_doc)
.select(Sum(child_doc.amount))
.where(
(child_doc.project == self.name)
& (child_doc.docstatus == 1)
& ((child_doc.t_warehouse.isnull()) | (child_doc.t_warehouse == ""))
)
).run(as_list=1)
amount = flt(amount[0][0]) if amount else 0
additional_costs = (
qb.from_(parent_doc)
.join(lcv_doc)
.on(parent_doc.name == lcv_doc.parent)
.select(Sum(lcv_doc.base_amount))
.where(
(parent_doc.project == self.name)
& (parent_doc.docstatus == 1)
& (parent_doc.purpose == "Manufacture")
)
).run(as_list=1)
additional_cost_amt = flt(additional_costs[0][0]) if additional_costs else 0
amount += additional_cost_amt
self.total_consumed_material_cost = amount
def check_depends_on_value(self, template_task, project_task, project_tasks): def check_depends_on_value(self, template_task, project_task, project_tasks):
if template_task.get("depends_on") and not project_task.get("depends_on"): if template_task.get("depends_on") and not project_task.get("depends_on"):
project_template_map = {pt.template_task: pt for pt in project_tasks} project_template_map = {pt.template_task: pt for pt in project_tasks}
@@ -201,7 +239,7 @@ class Project(Document):
self.db_update() self.db_update()
def after_insert(self): def after_insert(self):
self.copy_from_template() # nosemgrep self.copy_from_template("after_insert")
if self.sales_order: if self.sales_order:
frappe.db.set_value("Sales Order", self.sales_order, "project", self.name) frappe.db.set_value("Sales Order", self.sales_order, "project", self.name)

View File

@@ -768,7 +768,6 @@ def make_stock_entry(source_name: str, target_doc: str | Document | None = None)
target.set_actual_qty() target.set_actual_qty()
target.calculate_rate_and_amount(raise_error_if_no_rate=False) target.calculate_rate_and_amount(raise_error_if_no_rate=False)
target.stock_entry_type = target.purpose target.stock_entry_type = target.purpose
target.set_job_card_data()
if source.job_card: if source.job_card:
job_card_details = frappe.get_all( job_card_details = frappe.get_all(

View File

@@ -342,7 +342,7 @@ erpnext.stock.PurchaseReceiptController = class PurchaseReceiptController extend
make_retention_stock_entry() { make_retention_stock_entry() {
frappe.call({ frappe.call({
method: "erpnext.stock.doctype.stock_entry.stock_entry.move_sample_to_retention_warehouse", method: "erpnext.stock.doctype.stock_entry.stock_entry_handler.manufacturing.move_sample_to_retention_warehouse",
args: { args: {
company: cur_frm.doc.company, company: cur_frm.doc.company,
items: cur_frm.doc.items, items: cur_frm.doc.items,
@@ -455,7 +455,7 @@ var validate_sample_quantity = function (frm, cdt, cdn) {
var d = locals[cdt][cdn]; var d = locals[cdt][cdn];
if (d.sample_quantity && d.qty) { if (d.sample_quantity && d.qty) {
frappe.call({ frappe.call({
method: "erpnext.stock.doctype.stock_entry.stock_entry.validate_sample_quantity", method: "erpnext.stock.doctype.stock_entry.stock_entry_handler.manufacturing.validate_sample_quantity",
args: { args: {
batch_no: d.batch_no, batch_no: d.batch_no,
item_code: d.item_code, item_code: d.item_code,

View File

@@ -496,7 +496,7 @@ frappe.ui.form.on("Stock Entry", {
__("Expired Batches"), __("Expired Batches"),
function () { function () {
frappe.call({ frappe.call({
method: "erpnext.stock.doctype.stock_entry.stock_entry.get_expired_batch_items", method: "erpnext.stock.doctype.stock_entry.stock_entry_handler.serial_batch.get_expired_batch_items",
freeze: true, freeze: true,
callback: function (r) { callback: function (r) {
if (!r.exc && r.message) { if (!r.exc && r.message) {
@@ -670,7 +670,7 @@ frappe.ui.form.on("Stock Entry", {
make_retention_stock_entry: function (frm) { make_retention_stock_entry: function (frm) {
frappe.call({ frappe.call({
method: "erpnext.stock.doctype.stock_entry.stock_entry.move_sample_to_retention_warehouse", method: "erpnext.stock.doctype.stock_entry.stock_entry_handler.manufacturing.move_sample_to_retention_warehouse",
args: { args: {
company: frm.doc.company, company: frm.doc.company,
items: frm.doc.items, items: frm.doc.items,
@@ -939,7 +939,7 @@ frappe.ui.form.on("Stock Entry", {
if (frm.doc.purchase_order) { if (frm.doc.purchase_order) {
frm.set_value("subcontracting_order", ""); frm.set_value("subcontracting_order", "");
erpnext.utils.map_current_doc({ erpnext.utils.map_current_doc({
method: "erpnext.stock.doctype.stock_entry.stock_entry.get_items_from_subcontract_order", method: "erpnext.stock.doctype.stock_entry.stock_entry_handler.subcontracting.get_items_from_subcontract_order",
source_name: frm.doc.purchase_order, source_name: frm.doc.purchase_order,
target_doc: frm, target_doc: frm,
freeze: true, freeze: true,
@@ -951,7 +951,7 @@ frappe.ui.form.on("Stock Entry", {
if (frm.doc.subcontracting_order) { if (frm.doc.subcontracting_order) {
frm.set_value("purchase_order", ""); frm.set_value("purchase_order", "");
erpnext.utils.map_current_doc({ erpnext.utils.map_current_doc({
method: "erpnext.stock.doctype.stock_entry.stock_entry.get_items_from_subcontract_order", method: "erpnext.stock.doctype.stock_entry.stock_entry_handler.subcontracting.get_items_from_subcontract_order",
source_name: frm.doc.subcontracting_order, source_name: frm.doc.subcontracting_order,
target_doc: frm, target_doc: frm,
freeze: true, freeze: true,
@@ -1150,7 +1150,7 @@ var validate_sample_quantity = function (frm, cdt, cdn) {
var d = locals[cdt][cdn]; var d = locals[cdt][cdn];
if (d.sample_quantity && d.transfer_qty && frm.doc.purpose == "Material Receipt") { if (d.sample_quantity && d.transfer_qty && frm.doc.purpose == "Material Receipt") {
frappe.call({ frappe.call({
method: "erpnext.stock.doctype.stock_entry.stock_entry.validate_sample_quantity", method: "erpnext.stock.doctype.stock_entry.stock_entry_handler.manufacturing.validate_sample_quantity",
args: { args: {
batch_no: d.batch_no, batch_no: d.batch_no,
item_code: d.item_code, item_code: d.item_code,

View File

@@ -46,6 +46,7 @@
"target_address_display", "target_address_display",
"sb0", "sb0",
"scan_barcode", "scan_barcode",
"column_break_menu",
"last_scanned_warehouse", "last_scanned_warehouse",
"items_section", "items_section",
"items", "items",
@@ -369,6 +370,7 @@
{ {
"fieldname": "sb0", "fieldname": "sb0",
"fieldtype": "Section Break", "fieldtype": "Section Break",
"hide_border": 1,
"options": "Simple" "options": "Simple"
}, },
{ {
@@ -646,7 +648,7 @@
"depends_on": "eval:in_list([\"Material Issue\", \"Manufacture\", \"Repack\", \"Send to Subcontractor\", \"Material Transfer for Manufacture\", \"Material Consumption for Manufacture\", \"Disassemble\"], doc.purpose)", "depends_on": "eval:in_list([\"Material Issue\", \"Manufacture\", \"Repack\", \"Send to Subcontractor\", \"Material Transfer for Manufacture\", \"Material Consumption for Manufacture\", \"Disassemble\"], doc.purpose)",
"fieldname": "bom_info_section", "fieldname": "bom_info_section",
"fieldtype": "Section Break", "fieldtype": "Section Break",
"label": "BOM Info" "label": "Bill of Materials"
}, },
{ {
"collapsible": 1, "collapsible": 1,
@@ -695,8 +697,7 @@
}, },
{ {
"fieldname": "items_section", "fieldname": "items_section",
"fieldtype": "Section Break", "fieldtype": "Section Break"
"label": "Items"
}, },
{ {
"depends_on": "eval:doc.asset_repair", "depends_on": "eval:doc.asset_repair",
@@ -761,6 +762,10 @@
"fieldtype": "Link", "fieldtype": "Link",
"label": "Cost Center", "label": "Cost Center",
"options": "Cost Center" "options": "Cost Center"
},
{
"fieldname": "column_break_menu",
"fieldtype": "Column Break"
} }
], ],
"grid_page_length": 50, "grid_page_length": 50,
@@ -769,7 +774,7 @@
"index_web_pages_for_search": 1, "index_web_pages_for_search": 1,
"is_submittable": 1, "is_submittable": 1,
"links": [], "links": [],
"modified": "2026-03-04 19:03:23.426082", "modified": "2026-04-21 13:31:48.817309",
"modified_by": "Administrator", "modified_by": "Administrator",
"module": "Stock", "module": "Stock",
"name": "Stock Entry", "name": "Stock Entry",

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,41 @@
import frappe
from frappe import _
from frappe.utils import flt
from erpnext.manufacturing.doctype.bom.bom import get_backflush_based_on
class BaseStockEntry:
"""Shared foundation for all stock entry purpose handlers.
Provides common lazy-loaded work order document, backflush configuration,
and work order status validation used across multiple handler classes.
"""
def __init__(self, se_doc):
self.doc = se_doc
@property
def wo_doc(self):
if not getattr(self, "_wo_doc", None):
if self.doc.work_order:
self._wo_doc = frappe.get_doc("Work Order", self.doc.work_order)
return getattr(self, "_wo_doc", None)
@property
def backflush_based_on(self):
return get_backflush_based_on(self.doc.bom_no)
def _validate_work_order(self):
if not self.wo_doc:
return
msg = ""
if flt(self.wo_doc.docstatus) != 1:
msg = _("Work Order {0} must be submitted").format(self.doc.work_order)
if self.wo_doc.status == "Stopped":
msg = _("Transaction not allowed against stopped Work Order {0}").format(self.doc.work_order)
if msg:
frappe.throw(msg)

View File

@@ -0,0 +1,529 @@
from collections import defaultdict
import frappe
from frappe import _
from frappe.query_builder.functions import Sum
from frappe.utils import flt
from erpnext.stock.doctype.serial_no.serial_no import get_serial_nos
from erpnext.stock.serial_batch_bundle import SerialBatchCreation
from erpnext.stock.utils import get_combine_datetime
from .base import BaseStockEntry
from .manufacturing import (
ceil_qty_if_uom_has_whole_number,
get_bom_items,
get_production_item_details,
get_secondary_items,
)
class DisassembleStockEntry(BaseStockEntry):
def validate(self):
self.validate_warehouse()
def validate_warehouse(self):
for row in self.doc.items:
if not row.s_warehouse and not row.t_warehouse:
frappe.throw(_("Source or Target Warehouse is required for item {0}").format(row.item_code))
def validate_fg_completed_qty(self):
if not self.doc.source_stock_entry:
return
from erpnext.manufacturing.doctype.work_order.work_order import get_disassembly_available_qty
available_qty = get_disassembly_available_qty(self.doc.source_stock_entry, self.doc.name)
if flt(self.doc.fg_completed_qty) > available_qty:
frappe.throw(
_(
"Cannot disassemble {0} qty against Stock Entry {1}. Only {2} qty available to disassemble."
).format(
self.doc.fg_completed_qty,
self.doc.source_stock_entry,
available_qty,
),
title=_("Excess Disassembly"),
)
def add_items(self):
"""
Priority:
1. From a specific Manufacture Stock Entry (exact reversal)
2. From Work Order Manufacture Stock Entries (averaged reversal)
3. From BOM (standalone disassembly)
"""
# Auto-set source_stock_entry if WO has exactly one manufacture entry
if not self.doc.get("source_stock_entry") and self.doc.work_order:
manufacture_entries = frappe.get_all(
"Stock Entry",
filters={
"work_order": self.doc.work_order,
"purpose": "Manufacture",
"docstatus": 1,
},
pluck="name",
)
if len(manufacture_entries) == 1:
self.doc.source_stock_entry = manufacture_entries[0]
if self.doc.get("source_stock_entry"):
return self._add_items_for_disassembly_from_stock_entry()
if self.doc.work_order:
return self._add_items_for_disassembly_from_work_order()
return self._add_items_for_disassembly_from_bom()
def _add_items_for_disassembly_from_stock_entry(self):
source_fg_qty = frappe.db.get_value("Stock Entry", self.doc.source_stock_entry, "fg_completed_qty")
if not source_fg_qty:
frappe.throw(
_("Source Stock Entry {0} has no finished goods quantity").format(self.doc.source_stock_entry)
)
disassemble_qty = flt(self.doc.fg_completed_qty)
scale_factor = disassemble_qty / flt(source_fg_qty)
self._append_disassembly_row_from_source(
disassemble_qty=disassemble_qty,
scale_factor=scale_factor,
)
def _add_items_for_disassembly_from_work_order(self):
wo_produced_qty = frappe.db.get_value("Work Order", self.doc.work_order, "produced_qty")
wo_produced_qty = flt(wo_produced_qty)
if wo_produced_qty <= 0:
frappe.throw(_("Work Order {0} has no produced qty").format(self.doc.work_order))
disassemble_qty = flt(self.doc.fg_completed_qty)
if disassemble_qty <= 0:
frappe.throw(_("Disassemble Qty cannot be less than or equal to 0."))
scale_factor = disassemble_qty / wo_produced_qty
self._append_disassembly_row_from_source(
disassemble_qty=disassemble_qty,
scale_factor=scale_factor,
)
def _append_disassembly_row_from_source(self, disassemble_qty, scale_factor):
for source_row in self.get_items_from_manufacture_stock_entry():
self._append_disassembly_item(source_row, disassemble_qty, scale_factor)
def _get_disassembly_warehouses(self, source_row, disassemble_qty, scale_factor):
if source_row.is_finished_item:
return disassemble_qty, self.doc.from_warehouse or source_row.t_warehouse, ""
elif source_row.s_warehouse:
return flt(source_row.qty * scale_factor), "", self.doc.to_warehouse or source_row.s_warehouse
else:
return flt(source_row.qty * scale_factor), source_row.t_warehouse, ""
def _build_disassembly_item_dict(self, source_row, qty, s_warehouse, t_warehouse):
return {
"item_code": source_row.item_code,
"item_name": source_row.item_name,
"description": source_row.description,
"stock_uom": source_row.stock_uom,
"uom": source_row.uom,
"conversion_factor": source_row.conversion_factor,
"basic_rate": source_row.basic_rate,
"qty": qty,
"s_warehouse": s_warehouse,
"t_warehouse": t_warehouse,
"is_finished_item": source_row.is_finished_item,
"type": source_row.type,
"is_legacy_scrap_item": source_row.is_legacy_scrap_item,
"bom_secondary_item": source_row.bom_secondary_item,
"bom_no": source_row.bom_no,
"use_serial_batch_fields": 1 if (source_row.batch_no or source_row.serial_no) else 0,
}
def _append_disassembly_item(self, source_row, disassemble_qty, scale_factor):
qty, s_warehouse, t_warehouse = self._get_disassembly_warehouses(
source_row, disassemble_qty, scale_factor
)
item = self._build_disassembly_item_dict(source_row, qty, s_warehouse, t_warehouse)
if self.doc.source_stock_entry:
item.update({"against_stock_entry": self.doc.source_stock_entry, "ste_detail": source_row.name})
self.doc.append("items", item)
def _add_items_for_disassembly_from_bom(self):
if not self.doc.bom_no or not self.doc.fg_completed_qty:
frappe.throw(_("BOM and Finished Good Quantity is mandatory for Disassembly"))
self.add_raw_materials()
self.add_secondary_items()
self.add_finished_goods()
def add_raw_materials(self):
# Raw materials will be available after disassembly in target warehouse
items = get_bom_items(self.doc.bom_no, self.doc.use_multi_level_bom)
for row in items:
row["t_warehouse"] = self.doc.to_warehouse
row["from_warehouse"] = ""
row["is_finished_item"] = 0
row["qty"] = flt(row["qty"]) * flt(self.doc.fg_completed_qty)
row["uom"] = row.get("uom") or row.get("stock_uom")
self.doc.append("items", row)
def add_secondary_items(self):
# Secondary items will be removed from source warehouse
secondary_items = get_secondary_items(self.doc.bom_no, self.doc.work_order)
for row in secondary_items:
item_args = {}
fields = [
"item_code",
"item_name",
"uom",
"stock_uom",
"conversion_factor",
"item_group",
"description",
"type",
]
for field in fields:
item_args[field] = row.get(field)
item_args["is_legacy_scrap_item"] = row.get("is_legacy")
item_args["s_warehouse"] = self.doc.from_warehouse
item_args["uom"] = item_args.get("uom") or item_args.get("stock_uom")
item_args["bom_secondary_item"] = row.get("name")
row.qty = row.qty * self.doc.fg_completed_qty
if row.get("process_loss_per"):
row.qty -= flt(row.qty * row.get("process_loss_per") / 100)
item_args["qty"] = ceil_qty_if_uom_has_whole_number(row.qty, item_args["uom"])
self.doc.append("items", item_args)
def add_finished_goods(self):
item_details = get_production_item_details(self.doc.work_order, self.doc.bom_no)
item_details.update(
{
"conversion_factor": 1,
"uom": item_details.stock_uom,
"qty": self.doc.fg_completed_qty,
"t_warehouse": None,
"s_warehouse": self.doc.from_warehouse,
"is_finished_item": 1,
}
)
item_details["item_code"] = item_details["name"]
del item_details["name"]
self.doc.append("items", item_details)
def get_items_from_manufacture_stock_entry(self):
SE = frappe.qb.DocType("Stock Entry")
SED = frappe.qb.DocType("Stock Entry Detail")
query = frappe.qb.from_(SED).join(SE).on(SED.parent == SE.name).where(SE.docstatus == 1)
common_fields = [
SED.item_code,
SED.item_name,
SED.description,
SED.stock_uom,
SED.uom,
SED.basic_rate,
SED.conversion_factor,
SED.is_finished_item,
SED.type,
SED.is_legacy_scrap_item,
SED.bom_secondary_item,
SED.batch_no,
SED.serial_no,
SED.use_serial_batch_fields,
SED.s_warehouse,
SED.t_warehouse,
SED.bom_no,
]
if self.doc.source_stock_entry:
return (
query.select(SED.name, SED.qty, SED.transfer_qty, *common_fields)
.where(SE.name == self.doc.source_stock_entry)
.orderby(SED.idx)
.run(as_dict=True)
)
return (
query.select(Sum(SED.qty).as_("qty"), Sum(SED.transfer_qty).as_("transfer_qty"), *common_fields)
.where(SE.purpose == "Manufacture")
.where(SE.work_order == self.doc.work_order)
.groupby(SED.item_code)
.orderby(SED.idx)
.run(as_dict=True)
)
def on_submit(self):
self.set_serial_batch_for_disassembly()
self.update_disassembled_order()
def on_cancel(self):
self.update_disassembled_order()
def set_serial_batch_for_disassembly(self):
if self.doc.get("source_stock_entry"):
self._set_serial_batch_for_disassembly_from_stock_entry()
else:
self._set_serial_batch_for_disassembly_from_available_materials()
def _set_serial_batch_for_disassembly_from_stock_entry(self):
from erpnext.stock.doctype.serial_and_batch_bundle.serial_and_batch_bundle import (
get_voucher_wise_serial_batch_from_bundle,
)
source_fg_qty = flt(
frappe.db.get_value("Stock Entry", self.doc.source_stock_entry, "fg_completed_qty")
)
scale_factor = flt(self.doc.fg_completed_qty) / source_fg_qty if source_fg_qty else 0
bundle_data = get_voucher_wise_serial_batch_from_bundle(voucher_no=[self.doc.source_stock_entry])
source_rows_by_name = {r.name: r for r in self.get_items_from_manufacture_stock_entry()}
for row in self.doc.items:
if not row.ste_detail:
continue
source_row = source_rows_by_name.get(row.ste_detail)
if source_row:
self._apply_bundle_to_disassembly_row(row, source_row, bundle_data, scale_factor)
def _apply_bundle_to_disassembly_row(self, row, source_row, bundle_data, scale_factor):
source_warehouse = source_row.s_warehouse or source_row.t_warehouse
key = (source_row.item_code, source_warehouse, self.doc.source_stock_entry)
source_bundle = bundle_data.get(key, {})
batches = self._extract_batches(source_row, source_bundle, row, scale_factor)
serial_nos = self._extract_serial_nos(source_row, source_bundle, row)
self._set_serial_batch_bundle_for_disassembly_row(row, serial_nos, batches)
def _extract_batches(self, source_row, source_bundle, row, scale_factor):
batches = defaultdict(float)
if source_bundle.get("batch_nos"):
self._allocate_batches(batches, source_bundle["batch_nos"], row.transfer_qty, scale_factor)
elif source_row.batch_no:
batches[source_row.batch_no] = row.transfer_qty
return batches
def _allocate_batches(self, batches, batch_nos, transfer_qty, scale_factor):
qty_remaining = transfer_qty
for batch_no, batch_qty in batch_nos.items():
if qty_remaining <= 0:
break
alloc = min(abs(flt(batch_qty)) * scale_factor, qty_remaining)
batches[batch_no] = alloc
qty_remaining -= alloc
def _extract_serial_nos(self, source_row, source_bundle, row):
if source_bundle.get("serial_nos"):
return get_serial_nos(source_bundle["serial_nos"])[: int(row.transfer_qty)]
elif source_row.serial_no:
return get_serial_nos(source_row.serial_no)[: int(row.transfer_qty)]
return []
def _set_serial_batch_for_disassembly_from_available_materials(self):
available_materials = get_available_materials(self.doc.work_order, self.doc)
for row in self.doc.items:
warehouse = row.s_warehouse or row.t_warehouse
materials = available_materials.get((row.item_code, warehouse))
if materials:
self._apply_available_material_bundle(row, materials)
def _apply_available_material_bundle(self, row, materials):
batches = self._collect_available_batches(materials.batch_details, row.transfer_qty)
serial_nos = materials.serial_nos[: int(row.transfer_qty)] if materials.serial_nos else []
self._set_serial_batch_bundle_for_disassembly_row(row, serial_nos, batches)
def _collect_available_batches(self, batch_details, transfer_qty):
batches, qty = defaultdict(float), transfer_qty
for batch_no, batch_qty in batch_details.items():
if qty <= 0:
break
batch_qty = abs(batch_qty)
if batch_qty <= qty:
batches[batch_no], qty = batch_qty, qty - batch_qty
else:
batches[batch_no], qty = qty, 0
return batches
def _set_serial_batch_bundle_for_disassembly_row(self, row, serial_nos, batches):
if not serial_nos and not batches:
return
warehouse = row.s_warehouse or row.t_warehouse
bundle_doc = SerialBatchCreation(
{
"item_code": row.item_code,
"warehouse": warehouse,
"posting_datetime": get_combine_datetime(self.doc.posting_date, self.doc.posting_time),
"voucher_type": self.doc.doctype,
"voucher_no": self.doc.name,
"voucher_detail_no": row.name,
"qty": row.transfer_qty,
"type_of_transaction": "Inward" if row.t_warehouse else "Outward",
"company": self.doc.company,
"do_not_submit": True,
}
).make_serial_and_batch_bundle(serial_nos=serial_nos, batch_nos=batches)
row.serial_and_batch_bundle = bundle_doc.name
row.use_serial_batch_fields = 0
def update_disassembled_order(self):
if not self.doc.work_order:
return
if self.doc.fg_completed_qty:
pro_doc = frappe.get_doc("Work Order", self.doc.work_order)
pro_doc.run_method(
"update_disassembled_qty", self.doc.fg_completed_qty, self.doc._action == "cancel"
)
def get_available_materials(work_order, stock_entry_doc=None) -> dict:
data = get_stock_entry_data(work_order, stock_entry_doc=stock_entry_doc)
available_materials = {}
for row in data:
key = _get_material_key(row, stock_entry_doc)
if key not in available_materials:
available_materials[key] = frappe._dict(
{"item_details": row, "batch_details": defaultdict(float), "qty": 0, "serial_nos": []}
)
_update_material_qty(available_materials[key], row, stock_entry_doc)
return available_materials
def _get_material_key(row, stock_entry_doc):
if stock_entry_doc and stock_entry_doc.purpose == "Disassemble":
return (row.item_code, row.s_warehouse or row.warehouse)
if row.purpose != "Material Transfer for Manufacture":
return (row.item_code, row.s_warehouse)
return (row.item_code, row.warehouse)
def _update_material_qty(item_data, row, stock_entry_doc):
is_inward = row.purpose == "Material Transfer for Manufacture" or (
stock_entry_doc and stock_entry_doc.purpose == "Disassemble" and row.purpose == "Manufacture"
)
if is_inward:
_add_inward_material_qty(item_data, row)
else:
_deduct_consumed_material_qty(item_data, row)
def _add_inward_material_qty(item_data, row):
item_data.qty += row.qty
if row.batch_no:
item_data.batch_details[row.batch_no] += row.qty
elif row.batch_nos:
for batch_no, qty in row.batch_nos.items():
item_data.batch_details[batch_no] += qty
_extend_serial_nos_from_row(item_data, row)
def _extend_serial_nos_from_row(item_data, row):
sn = row.serial_no or row.serial_nos
if sn:
item_data.serial_nos.extend(get_serial_nos(sn))
item_data.serial_nos.sort()
def _deduct_consumed_material_qty(item_data, row):
item_data.qty -= row.qty
if row.batch_no:
item_data.batch_details[row.batch_no] -= row.qty
elif row.batch_nos:
for batch_no, qty in row.batch_nos.items():
item_data.batch_details[batch_no] += qty
_remove_serial_nos_from_available(item_data, row)
def _remove_serial_nos_from_available(item_data, row):
sn = row.serial_no or row.serial_nos
if not sn:
return
for serial_no in get_serial_nos(sn):
if serial_no in item_data.serial_nos:
item_data.serial_nos.remove(serial_no)
def get_stock_entry_data(work_order, stock_entry_doc=None):
data = _run_stock_entry_query(work_order, stock_entry_doc)
if not data:
return []
_enrich_with_bundle_data(data, stock_entry_doc)
return data
def _run_stock_entry_query(work_order, stock_entry_doc):
se = frappe.qb.DocType("Stock Entry")
sed = frappe.qb.DocType("Stock Entry Detail")
query = _build_stock_entry_base_query(se, sed, work_order)
query = _apply_stock_entry_purpose_filter(query, se, sed, stock_entry_doc)
return query.run(as_dict=1)
def _build_stock_entry_base_query(se, sed, work_order):
return (
frappe.qb.from_(se)
.from_(sed)
.select(
sed.item_name,
sed.original_item,
sed.item_code,
sed.qty,
sed.t_warehouse.as_("warehouse"),
sed.s_warehouse.as_("s_warehouse"),
sed.description,
sed.stock_uom,
sed.expense_account,
sed.cost_center,
sed.serial_and_batch_bundle,
sed.batch_no,
sed.serial_no,
se.purpose,
se.name,
)
.where((se.name == sed.parent) & (se.work_order == work_order) & (se.docstatus == 1))
.orderby(se.creation, sed.item_code, sed.idx)
)
def _apply_stock_entry_purpose_filter(query, se, sed, stock_entry_doc):
if stock_entry_doc and stock_entry_doc.purpose == "Disassemble":
query = query.where(se.purpose.isin(["Disassemble", "Manufacture"]))
return query.where(se.name != stock_entry_doc.name)
query = query.where(
se.purpose.isin(
["Manufacture", "Material Consumption for Manufacture", "Material Transfer for Manufacture"]
)
)
return query.where(sed.s_warehouse.isnotnull())
def _enrich_with_bundle_data(data, stock_entry_doc):
from erpnext.stock.doctype.serial_and_batch_bundle.serial_and_batch_bundle import (
get_voucher_wise_serial_batch_from_bundle,
)
voucher_nos = [row.get("name") for row in data if row.get("name")]
if not voucher_nos:
return
bundle_data = get_voucher_wise_serial_batch_from_bundle(voucher_no=voucher_nos)
for row in data:
key = _get_bundle_key(row, stock_entry_doc)
if bundle_data.get(key):
row.update(bundle_data.get(key))
def _get_bundle_key(row, stock_entry_doc):
if stock_entry_doc and stock_entry_doc.purpose == "Disassemble":
return (row.item_code, row.s_warehouse or row.warehouse, row.name)
if row.purpose != "Material Transfer for Manufacture":
return (row.item_code, row.s_warehouse, row.name)
return (row.item_code, row.warehouse, row.name)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,83 @@
import frappe
from frappe import _
from frappe.query_builder.functions import Sum
from .base import BaseStockEntry
from .manufacturing import get_bom_items
class MaterialReceiptStockEntry(BaseStockEntry):
def before_validate(self):
self.set_default_warehouse()
def validate(self):
self.validate_warehouse()
def set_default_warehouse(self):
for row in self.doc.items:
row.s_warehouse = None
if not row.t_warehouse and self.doc.to_warehouse:
row.t_warehouse = self.doc.to_warehouse
def validate_warehouse(self):
for row in self.doc.items:
if not row.t_warehouse:
frappe.throw(_("Target Warehouse is required for item {0}").format(row.item_code))
class BaseMaterialIssueStockEntry(BaseStockEntry):
def set_default_warehouse(self):
for row in self.doc.items:
row.t_warehouse = None
if not row.s_warehouse and self.doc.from_warehouse:
row.s_warehouse = self.doc.from_warehouse
def validate_warehouse(self):
for row in self.doc.items:
if not row.s_warehouse:
frappe.throw(_("Source Warehouse is required for item {0}").format(row.item_code))
class MaterialIssueStockEntry(BaseMaterialIssueStockEntry):
def before_validate(self):
self.set_default_warehouse()
def validate(self):
self.validate_warehouse()
def add_items(self):
self.add_raw_materials_based_on_bom()
def add_raw_materials_based_on_bom(self):
bom_items = get_bom_items(self.doc.bom_no, self.doc.use_multi_level_bom)
for row in bom_items:
row.s_warehouse = self.doc.from_warehouse
row.qty = row.qty * self.doc.fg_completed_qty
if not row.uom:
row.uom = row.stock_uom
self.doc.append("items", row)
def get_consumed_items(work_order):
"""Get all raw materials consumed through consumption entries for a work order."""
parent = frappe.qb.DocType("Stock Entry")
child = frappe.qb.DocType("Stock Entry Detail")
return (
frappe.qb.from_(parent)
.join(child)
.on(parent.name == child.parent)
.select(
child.item_code,
Sum(child.qty).as_("qty"),
child.original_item,
)
.where(
(parent.docstatus == 1)
& (parent.purpose == "Material Consumption for Manufacture")
& (parent.work_order == work_order)
)
.groupby(child.item_code, child.original_item)
).run(as_dict=True)

View File

@@ -0,0 +1,435 @@
import frappe
from frappe import _
from frappe.query_builder.functions import Sum
from frappe.utils import cstr, flt
from .base import BaseStockEntry
from .manufacturing import _check_bom_component_qty, get_bom_items
class BaseMaterialTransferStockEntry(BaseStockEntry):
def set_default_warehouse(self):
for row in self.doc.items:
if not row.t_warehouse and self.doc.to_warehouse:
row.t_warehouse = self.doc.to_warehouse
if not row.s_warehouse and self.doc.from_warehouse:
row.s_warehouse = self.doc.from_warehouse
def validate_warehouse(self):
for row in self.doc.items:
if not row.t_warehouse:
frappe.throw(_("Target Warehouse is required for item {0}").format(row.item_code))
if not row.s_warehouse:
frappe.throw(_("Source Warehouse is required for item {0}").format(row.item_code))
def validate_same_source_target_warehouse(self):
"""
Raises: frappe.ValidationError: If warehouses are same and no inventory dimensions differ
"""
if not frappe.get_single_value("Stock Settings", "validate_material_transfer_warehouses"):
return
from erpnext.stock.doctype.inventory_dimension.inventory_dimension import get_inventory_dimensions
inventory_dimensions = get_inventory_dimensions()
for item in self.doc.items:
if cstr(item.s_warehouse) == cstr(item.t_warehouse):
if not inventory_dimensions:
frappe.throw(
_(
"Row #{0}: Source and Target Warehouse cannot be the same for Material Transfer"
).format(item.idx),
title=_("Invalid Source and Target Warehouse"),
)
else:
difference_found = False
for dimension in inventory_dimensions:
fieldname = (
dimension.source_fieldname
if dimension.source_fieldname.startswith("to_")
else f"to_{dimension.source_fieldname}"
)
if (
item.get(dimension.source_fieldname)
and item.get(fieldname)
and item.get(dimension.source_fieldname) != item.get(fieldname)
):
difference_found = True
break
if not difference_found:
frappe.throw(
_(
"Row #{0}: Source, Target Warehouse and Inventory Dimensions cannot be the exact same for Material Transfer"
).format(item.idx),
title=_("Invalid Source and Target Warehouse"),
)
class MaterialTransferStockEntry(BaseMaterialTransferStockEntry):
def before_validate(self):
self.set_default_warehouse()
def validate(self):
self.validate_warehouse()
self.validate_same_source_target_warehouse()
def on_submit(self):
self.update_subcontract_order_supplied_items()
def on_cancel(self):
self.update_subcontract_order_supplied_items()
def update_subcontract_order_supplied_items(self):
if not self.doc.get(self.doc.subcontract_data.order_field):
return
from .subcontracting import SendToSubcontractorStockEntry
SendToSubcontractorStockEntry(self.doc).update_subcontract_order_supplied_items()
class MaterialTransferForManufactureStockEntry(BaseMaterialTransferStockEntry):
def before_validate(self):
self.set_default_warehouse()
def validate(self):
self.validate_warehouse()
self.validate_component_and_quantities()
self.validate_same_source_target_warehouse()
def validate_component_and_quantities(self):
if not frappe.db.get_single_value("Manufacturing Settings", "validate_components_quantities_per_bom"):
return
if not self.doc.fg_completed_qty:
return
_check_bom_component_qty(self.doc, get_bom_items(self.doc.bom_no, self.doc.use_multi_level_bom))
def add_items(self):
item_dict = self.get_pending_raw_materials()
for item in item_dict.values():
item["s_warehouse"] = item.get("from_warehouse")
if self.wo_doc and not item.get("t_warehouse"):
item["t_warehouse"] = self.wo_doc.wip_warehouse
for item_code in item_dict:
self.doc.append("items", item_dict[item_code])
def get_pending_raw_materials(self):
"""Return pending raw material qty to transfer, capped at what's still needed."""
item_dict = self.get_work_order_required_items()
max_qty = flt(self.wo_doc.qty)
allow_overproduction = self._is_overproduction_allowed(max_qty)
for item, item_details in item_dict.items():
item_dict[item]["qty"] = self._calculate_item_transfer_qty(
item_details, allow_overproduction, max_qty
)
item_dict[item]["transfer_qty"] = flt(item_dict[item]["qty"]) * flt(
item_dict[item].get("conversion_factor") or 1
)
item_dict = {k: v for k, v in item_dict.items() if v["qty"]}
if not item_dict:
frappe.msgprint(_("All items have already been transferred for this Work Order."))
return item_dict
def _is_overproduction_allowed(self, max_qty):
overproduction_pct = flt(
frappe.db.get_single_value("Manufacturing Settings", "overproduction_percentage_for_work_order")
)
extra_materials_pct = flt(
frappe.db.get_single_value("Manufacturing Settings", "transfer_extra_materials_percentage")
)
to_transfer_qty = flt(self.wo_doc.material_transferred_for_manufacturing) + flt(
self.doc.fg_completed_qty
)
limit_pct = extra_materials_pct or overproduction_pct
transfer_limit_qty = max_qty + (max_qty * limit_pct / 100)
return transfer_limit_qty >= to_transfer_qty
def _calculate_item_transfer_qty(self, item_details, allow_overproduction, max_qty):
pending_to_issue = flt(item_details.required_qty) - flt(item_details.transferred_qty)
desire_to_transfer = flt(self.doc.fg_completed_qty) * flt(item_details.required_qty) / max_qty
can_transfer = (
desire_to_transfer <= pending_to_issue
or (desire_to_transfer > 0 and self.backflush_based_on == "Material Transferred for Manufacture")
or allow_overproduction
)
return _resolve_transfer_qty(desire_to_transfer, pending_to_issue, can_transfer)
def get_work_order_required_items(self):
"""Gets Work Order Required Items for Material Transfer for Manufacture."""
work_order = self.wo_doc
consider_job_card = work_order.transfer_material_against == "Job Card" and self.doc.get("job_card")
job_card_items = self.get_job_card_item_codes() if consider_job_card else []
wip_warehouse = self._resolve_wip_warehouse(work_order)
extra_pct = flt(
frappe.db.get_single_value("Manufacturing Settings", "transfer_extra_materials_percentage")
)
item_dict = frappe._dict()
for d in work_order.get("required_items"):
self._add_required_item(
item_dict, d, consider_job_card, job_card_items, wip_warehouse, extra_pct, work_order
)
return item_dict
def _resolve_wip_warehouse(self, work_order):
if not frappe.db.get_value("Warehouse", work_order.wip_warehouse, "is_group"):
return work_order.wip_warehouse
return None
def _add_required_item(
self, item_dict, d, consider_job_card, job_card_items, wip_warehouse, extra_pct, work_order
):
if consider_job_card and d.item_code not in job_card_items:
return
additional_qty = extra_pct * flt(d.required_qty) / 100 if extra_pct else 0.0
transfer_pending = (
(flt(d.required_qty) + additional_qty) > flt(d.transferred_qty)
if additional_qty
else flt(d.required_qty) > flt(d.transferred_qty)
)
can_transfer = transfer_pending or self.backflush_based_on == "Material Transferred for Manufacture"
if not can_transfer or not d.include_item_in_manufacturing:
return
self._build_required_item_row(item_dict, d, consider_job_card, wip_warehouse, work_order)
def _build_required_item_row(self, item_dict, d, consider_job_card, wip_warehouse, work_order):
item_row = d.as_dict()
item_row["idx"] = len(item_dict) + 1
if consider_job_card:
item_row["job_card_item"] = self._get_job_card_item(d.item_code)
if d.source_warehouse and not frappe.db.get_value("Warehouse", d.source_warehouse, "is_group"):
item_row["from_warehouse"] = d.source_warehouse
item_row["to_warehouse"] = wip_warehouse
if item_row["allow_alternative_item"]:
item_row["allow_alternative_item"] = work_order.allow_alternative_item
item_dict.setdefault(d.item_code, item_row)
def _get_job_card_item(self, item_code):
return (
frappe.db.get_value("Job Card Item", {"item_code": item_code, "parent": self.doc.get("job_card")})
or None
)
def get_job_card_item_codes(self):
if not self.doc.get("job_card"):
return []
return frappe.get_all(
"Job Card Item", filters={"parent": self.doc.get("job_card")}, pluck="item_code", distinct=True
)
def on_submit(self):
self.update_job_card_and_work_order()
def on_cancel(self):
self.update_job_card_and_work_order()
def update_job_card_and_work_order(self):
if self.doc.job_card:
job_doc = frappe.get_doc("Job Card", self.doc.job_card)
job_doc.set_transferred_qty(update_status=True)
job_doc.set_transferred_qty_in_job_card_item(self.doc)
if self.doc.work_order:
self._validate_work_order()
if self.doc.fg_completed_qty:
if self.doc.docstatus == 1:
self.wo_doc.add_additional_items(self.doc)
else:
self.wo_doc.remove_additional_items(self.doc)
self.wo_doc.run_method("update_work_order_qty")
self.wo_doc.run_method("update_status")
if not self.wo_doc.operations:
self.wo_doc.set_actual_dates()
class MaterialRequestStockEntry(BaseMaterialTransferStockEntry):
def before_validate(self):
self.set_default_warehouse()
def validate(self):
self.validate_warehouse()
self.validate_material_request()
def get_material_request(self, item_row):
material_request = item_row.material_request or None
material_request_item = item_row.material_request_item or None
if self.doc.outgoing_stock_entry:
parent_se = frappe.get_value(
"Stock Entry Detail",
item_row.ste_detail,
["material_request", "material_request_item"],
as_dict=True,
)
if parent_se:
material_request = parent_se.material_request
material_request_item = parent_se.material_request_item
return material_request, material_request_item
def validate_material_request(self):
for row in self.doc.items:
material_request, material_request_item = self.get_material_request(row)
if not material_request:
return
mreq_item = frappe.db.get_value(
"Material Request Item",
{"name": material_request_item, "parent": material_request},
["item_code", "warehouse", "idx"],
as_dict=True,
)
if mreq_item.item_code != row.item_code:
frappe.throw(
_("Item for row {0} does not match Material Request").format(row.idx),
frappe.MappingMismatchError,
)
def on_submit(self):
self.update_transferred_qty()
if self.doc.add_to_transit:
self.set_material_request_transfer_status("In Transit")
if self.doc.outgoing_stock_entry:
self.set_material_request_transfer_status("Completed")
def on_cancel(self):
self.update_transferred_qty()
if self.doc.add_to_transit:
self.set_material_request_transfer_status("Not Started")
if self.doc.outgoing_stock_entry:
self.set_material_request_transfer_status("In Transit")
def update_transferred_qty(self):
if not self.doc.outgoing_stock_entry:
return
stock_entries, child_list = self._collect_transferred_qtys()
if not stock_entries:
return
self._bulk_update_transferred_qty(stock_entries, child_list)
self._update_per_transferred_field()
def _get_item_transferred_qty(self, item):
sed = frappe.qb.DocType("Stock Entry Detail")
result = (
frappe.qb.from_(sed)
.select(Sum(sed.transfer_qty).as_("qty"))
.where(
(sed.against_stock_entry == item.against_stock_entry)
& (sed.ste_detail == item.ste_detail)
& (sed.docstatus == 1)
)
).run(as_dict=True)
return result[0].qty if result and result[0].qty else 0.0
def _validate_item_transferred_qty(self, item, transferred_qty):
if item.docstatus != 1:
return
transfer_qty = frappe.get_value("Stock Entry Detail", item.ste_detail, "transfer_qty")
if transferred_qty > transfer_qty:
frappe.throw(
_("Row {0}: Transferred quantity cannot be greater than the requested quantity.").format(
item.idx
)
)
def _collect_transferred_qtys(self):
stock_entries, child_list = {}, []
for item in self.doc.items:
if not (item.against_stock_entry and item.ste_detail):
continue
transferred_qty = self._get_item_transferred_qty(item)
self._validate_item_transferred_qty(item, transferred_qty)
child_list.append(item.ste_detail)
stock_entries[(item.against_stock_entry, item.ste_detail)] = transferred_qty
return stock_entries, child_list
def _bulk_update_transferred_qty(self, stock_entries, child_list):
sed = frappe.qb.DocType("Stock Entry Detail")
case_expr = self._build_case_expr(sed, stock_entries)
(
frappe.qb.update(sed)
.set(sed.transferred_qty, case_expr.else_(sed.transferred_qty))
.where(sed.name.isin(child_list))
).run()
def _build_case_expr(self, sed, stock_entries):
from pypika import Case
case_expr = Case()
for (parent, name), qty in stock_entries.items():
case_expr = case_expr.when((sed.parent == parent) & (sed.name == name), qty)
return case_expr
def _update_per_transferred_field(self):
self.doc._update_percent_field_in_targets(self._get_per_transferred_config(), update_modified=True)
def _get_per_transferred_config(self):
return {
"source_dt": "Stock Entry Detail",
"target_field": "transferred_qty",
"target_ref_field": "qty",
"target_dt": "Stock Entry Detail",
"join_field": "ste_detail",
"target_parent_dt": "Stock Entry",
"target_parent_field": "per_transferred",
"source_field": "qty",
"percent_join_field": "against_stock_entry",
}
def set_material_request_transfer_status(self, status):
material_requests = []
parent_se = (
frappe.get_value("Stock Entry", self.doc.outgoing_stock_entry, "add_to_transit")
if self.doc.outgoing_stock_entry
else None
)
for item in self.doc.items:
mr = item.get("material_request")
if mr not in material_requests and self.doc.outgoing_stock_entry and parent_se:
mr = frappe.get_value("Stock Entry Detail", item.ste_detail, "material_request")
if mr and mr not in material_requests:
status = self._update_mr_transfer_status(mr, status, material_requests)
def _update_mr_transfer_status(self, material_request, status, material_requests):
material_requests.append(material_request)
if status == "Completed":
qty = get_transferred_qty(material_request)
if qty.get("transfer_qty") > qty.get("transferred_qty"):
status = "In Transit"
frappe.db.set_value("Material Request", material_request, "transfer_status", status)
return status
def _resolve_transfer_qty(desire_to_transfer, pending_to_issue, can_transfer):
# "No need for transfer but qty still pending" can occur when transferring multiple RM in different Stock Entries
if can_transfer:
return desire_to_transfer if desire_to_transfer > 0 else pending_to_issue
return pending_to_issue if pending_to_issue > 0 else 0
def get_transferred_qty(material_request):
sed = frappe.qb.DocType("Stock Entry Detail")
return (
frappe.qb.from_(sed)
.select(Sum(sed.transfer_qty).as_("transfer_qty"), Sum(sed.transferred_qty).as_("transferred_qty"))
.where((sed.material_request == material_request) & (sed.docstatus == 1))
).run(as_dict=True)[0]

View File

@@ -0,0 +1,373 @@
from collections import defaultdict
import frappe
from frappe import _
from frappe.utils import cint, cstr, flt, nowdate
from erpnext.manufacturing.doctype.bom.bom import get_backflush_based_on
from erpnext.stock.serial_batch_bundle import SerialBatchCreation, get_serial_or_batch_items
from erpnext.stock.utils import get_combine_datetime
from .base import BaseStockEntry
class StockEntrySABB(BaseStockEntry):
def make_serial_and_batch_bundle_for_outward(self):
serial_or_batch_items = get_serial_or_batch_items(self.doc.items)
if not serial_or_batch_items:
return
serial_nos, batch_nos = self.get_serial_batch_fields_for_subcontracting_inward()
already_picked_serial_nos = []
for row in self.doc.items:
if row.use_serial_batch_fields or not row.s_warehouse:
continue
if row.item_code not in serial_or_batch_items:
continue
bundle_doc = self._create_or_update_bundle_for_row(
row, serial_nos, batch_nos, already_picked_serial_nos
)
if not bundle_doc:
continue
for entry in bundle_doc.entries:
if entry.serial_no:
already_picked_serial_nos.append(entry.serial_no)
row.serial_and_batch_bundle = bundle_doc.name
def _create_or_update_bundle_for_row(self, row, serial_nos, batch_nos, already_picked_serial_nos):
if row.serial_and_batch_bundle and abs(row.transfer_qty) != abs(
frappe.get_cached_value("Serial and Batch Bundle", row.serial_and_batch_bundle, "total_qty")
):
return SerialBatchCreation(
{
"item_code": row.item_code,
"warehouse": row.s_warehouse,
"serial_and_batch_bundle": row.serial_and_batch_bundle,
"type_of_transaction": "Outward",
"ignore_serial_nos": already_picked_serial_nos,
"qty": row.transfer_qty * -1,
}
).update_serial_and_batch_entries(
serial_nos=serial_nos.get(row.name), batch_nos=batch_nos.get(row.name)
)
if not row.serial_and_batch_bundle and frappe.get_single_value(
"Stock Settings", "auto_create_serial_and_batch_bundle_for_outward"
):
return SerialBatchCreation(
{
"item_code": row.item_code,
"warehouse": row.s_warehouse,
"posting_datetime": get_combine_datetime(self.doc.posting_date, self.doc.posting_time),
"voucher_type": self.doc.doctype,
"voucher_detail_no": row.name,
"qty": row.transfer_qty * -1,
"ignore_serial_nos": already_picked_serial_nos,
"type_of_transaction": "Outward",
"company": self.doc.company,
"do_not_submit": True,
}
).make_serial_and_batch_bundle(
serial_nos=serial_nos.get(row.name), batch_nos=batch_nos.get(row.name)
)
return None
def get_serial_nos_and_batches_from_sres(self, scio_detail, only_pending=True):
serial_nos, batch_nos = [], frappe._dict()
table = frappe.qb.DocType("Stock Reservation Entry")
child_table = frappe.qb.DocType("Serial and Batch Entry")
query = (
frappe.qb.from_(table)
.join(child_table)
.on(table.name == child_table.parent)
.select(child_table.serial_no, child_table.batch_no, child_table.qty)
.where((table.docstatus == 1) & (table.voucher_detail_no == scio_detail))
)
if only_pending:
query = query.where(child_table.qty != child_table.delivered_qty)
else:
query = query.where(child_table.delivered_qty > 0)
for d in query.run(as_dict=True):
if d.serial_no and d.serial_no not in serial_nos:
serial_nos.append(d.serial_no)
if d.batch_no and d.batch_no not in batch_nos:
batch_nos[d.batch_no] = d.qty
return serial_nos, batch_nos
def get_serial_batch_fields_for_subcontracting_inward(self):
serial_nos, batch_nos = frappe._dict(), frappe._dict()
for row in self.doc.items:
if self.doc.purpose in [
"Return Raw Material to Customer",
"Subcontracting Delivery",
"Subcontracting Return",
]:
if not row.serial_and_batch_bundle:
serial_nos_list, batch_nos_list = self.get_serial_nos_and_batches_from_sres(
row.scio_detail, only_pending=self.doc.purpose != "Subcontracting Return"
)
if len(batch_nos_list) > 1:
row.use_serial_batch_fields = 0
if row.use_serial_batch_fields:
if serial_nos_list and not row.serial_no:
row.serial_no = "\n".join(serial_nos_list)
if batch_nos_list and not row.batch_no:
row.batch_no = next(iter(batch_nos_list.keys()))
serial_nos[row.name], batch_nos[row.name] = serial_nos_list, batch_nos_list
return serial_nos, batch_nos
def get_available_reserved_materials(self):
from erpnext.stock.doctype.stock_reservation_entry.stock_reservation_entry import (
get_reserved_materials,
)
voucher_no = self.doc.work_order or self.doc.subcontracting_order
reserved_entries = get_reserved_materials(voucher_no)
if not reserved_entries:
return {}
itemwise_serial_batch_qty = frappe._dict()
for d in reserved_entries:
key = (d.item_code, d.warehouse)
if key not in itemwise_serial_batch_qty:
itemwise_serial_batch_qty[key] = frappe._dict(
{
"serial_no": [],
"batch_no": defaultdict(float),
"batchwise_sn": defaultdict(list),
}
)
details = itemwise_serial_batch_qty[key]
if d.batch_no:
details.batch_no[d.batch_no] += d.qty
if d.serial_no:
details.batchwise_sn[d.batch_no].extend(d.serial_no.split("\n"))
elif d.serial_no:
details.serial_no.append(d.serial_no)
return itemwise_serial_batch_qty
def set_serial_batch_based_on_reservation(self):
if self.doc.work_order and frappe.get_cached_value(
"Work Order", self.doc.work_order, "reserve_stock"
):
skip_transfer = frappe.get_cached_value("Work Order", self.doc.work_order, "skip_transfer")
backflush_based_on = get_backflush_based_on(self.doc.bom_no)
if (
self.doc.purpose not in ["Material Transfer for Manufacture"]
and backflush_based_on != "BOM"
and not skip_transfer
):
return
reservation_entries = self.get_available_reserved_materials()
if not reservation_entries:
return
new_items_to_add = []
for d in self.doc.items:
if d.serial_and_batch_bundle or d.serial_no or d.batch_no:
continue
key = (d.item_code, d.s_warehouse)
if details := reservation_entries.get(key):
self._apply_batch_reservation_to_item(d, details, new_items_to_add)
d.use_serial_batch_fields = 1
for new_row in new_items_to_add:
self.doc.append("items", new_row)
self._sort_and_reindex_items()
def _apply_batch_reservation_to_item(self, d, details, new_items_to_add):
original_qty = d.qty
if batches := details.get("batch_no"):
original_qty = self._distribute_batches_to_item(
d, batches, details, new_items_to_add, original_qty
)
if details.get("serial_no"):
d.serial_no = "\n".join(details.get("serial_no")[: cint(d.qty)])
def _distribute_batches_to_item(self, d, batches, details, new_items_to_add, original_qty):
for batch_no, qty in batches.items():
if original_qty <= 0:
break
if qty <= 0:
continue
if d.batch_no:
original_qty, _ = self._make_overflow_batch_row(
d, batches, details, new_items_to_add, batch_no, qty, original_qty
)
else:
self._assign_batch_to_item(d, batches, details, batch_no, qty)
return original_qty
def _make_overflow_batch_row(self, d, batches, details, new_items_to_add, batch_no, qty, original_qty):
new_row = frappe.copy_doc(d)
new_row.name = None
new_row.batch_no = batch_no
new_row.qty = qty
new_row.idx = d.idx + 1
if new_row.batch_no and details.get("batchwise_sn"):
new_row.serial_no = "\n".join(details.get("batchwise_sn")[new_row.batch_no][: cint(new_row.qty)])
new_items_to_add.append(new_row)
batches[batch_no] -= qty
return original_qty - qty, new_row
def _assign_batch_to_item(self, d, batches, details, batch_no, qty):
if qty >= d.qty:
d.batch_no = batch_no
batches[batch_no] -= d.qty
else:
d.batch_no = batch_no
d.qty = qty
batches[batch_no] = 0
if d.batch_no and details.get("batchwise_sn"):
d.serial_no = "\n".join(details.get("batchwise_sn")[d.batch_no][: cint(d.qty)])
def _sort_and_reindex_items(self):
sorted_items = sorted(self.doc.items, key=lambda x: x.item_code)
if self.doc.purpose == "Manufacture":
# ensure finished item at last
sorted_items = sorted(sorted_items, key=lambda x: cstr(x.t_warehouse))
for idx, row in enumerate(sorted_items, start=1):
row.idx = idx
self.doc.set("items", sorted_items)
def create_serial_and_batch_bundle(parent_doc, row, child, type_of_transaction=None):
item_details = frappe.get_cached_value(
"Item", child.item_code, ["has_serial_no", "has_batch_no"], as_dict=1
)
if not (item_details.has_serial_no or item_details.has_batch_no):
return
doc = _make_bundle_doc(parent_doc, child, type_of_transaction or "Inward")
_populate_bundle_entries(doc, row, child)
if not doc.entries:
return None
return doc.insert(ignore_permissions=True).name
def _make_bundle_doc(parent_doc, child, type_of_transaction):
return frappe.get_doc(
{
"doctype": "Serial and Batch Bundle",
"voucher_type": "Stock Entry",
"item_code": child.item_code,
"warehouse": child.warehouse,
"type_of_transaction": type_of_transaction,
"posting_date": parent_doc.posting_date,
"posting_time": parent_doc.posting_time,
}
)
def _populate_bundle_entries(doc, row, child):
precision = frappe.get_precision("Stock Entry Detail", "qty")
if row.serial_nos and row.batches_to_be_consume:
_append_serial_batch_entries(doc, row, child, precision)
elif row.serial_nos:
doc.has_serial_no = 1
for serial_no in row.serial_nos:
doc.append("entries", {"serial_no": serial_no, "warehouse": row.warehouse, "qty": -1})
elif row.batches_to_be_consume:
_append_batch_entries(doc, row)
def _append_serial_batch_entries(doc, row, child, precision):
doc.has_serial_no = 1
doc.has_batch_no = 1
batchwise_serial_nos = get_batchwise_serial_nos(child.item_code, row)
for batch_no, qty in row.batches_to_be_consume.items():
while flt(qty, precision) > 0:
qty -= 1
doc.append(
"entries",
{
"batch_no": batch_no,
"serial_no": batchwise_serial_nos.get(batch_no).pop(0),
"warehouse": row.warehouse,
"qty": -1,
},
)
def _append_batch_entries(doc, row):
precision = frappe.get_precision("Serial and Batch Entry", "qty")
doc.has_batch_no = 1
for batch_no, qty in row.batches_to_be_consume.items():
if flt(qty, precision) > 0:
doc.append(
"entries", {"batch_no": batch_no, "warehouse": row.warehouse, "qty": flt(qty, precision) * -1}
)
def get_batchwise_serial_nos(item_code, row):
batchwise_serial_nos = {}
for batch_no in row.batches_to_be_consume:
serial_nos = frappe.get_all(
"Serial No",
filters={"item_code": item_code, "batch_no": batch_no, "name": ("in", row.serial_nos)},
)
if serial_nos:
batchwise_serial_nos[batch_no] = sorted([serial_no.name for serial_no in serial_nos])
return batchwise_serial_nos
@frappe.whitelist()
def get_expired_batch_items():
expired_batches = get_expired_batches()
if not expired_batches:
return []
return _enrich_expired_batches_with_stock(expired_batches)
def _enrich_expired_batches_with_stock(expired_batches):
from erpnext.stock.doctype.serial_and_batch_bundle.serial_and_batch_bundle import get_auto_batch_nos
expired_batches_stock = get_auto_batch_nos(
frappe._dict({"batch_no": list(expired_batches.keys()), "for_stock_levels": True})
)
for row in expired_batches_stock:
row.update(expired_batches.get(row.batch_no))
return expired_batches_stock
def get_expired_batches():
batch = frappe.qb.DocType("Batch")
data = (
frappe.qb.from_(batch)
.select(batch.item, batch.name.as_("batch_no"), batch.stock_uom)
.where((batch.expiry_date <= nowdate()) & (batch.expiry_date.isnotnull()))
).run(as_dict=True)
if not data:
return []
expired_batches = frappe._dict()
for row in data:
expired_batches[row.batch_no] = row
return expired_batches

View File

@@ -0,0 +1,264 @@
import json
import frappe
from frappe import _, bold
from frappe.model.document import Document
from frappe.query_builder.functions import Sum
from frappe.utils import flt
from erpnext.stock.utils import get_bin
from .base import BaseStockEntry
class SendToSubcontractorStockEntry(BaseStockEntry):
def validate(self):
self.validate_subcontract_order()
def validate_subcontract_order(self):
"""Throw exception if more raw material is transferred against Subcontract Order than in
the raw materials supplied table"""
backflush_raw_materials_based_on = frappe.db.get_single_value(
"Buying Settings", "backflush_raw_materials_of_subcontract_based_on"
)
if backflush_raw_materials_based_on == "BOM":
subcontract_order = frappe.get_doc(
self.doc.subcontract_data.order_doctype, self.doc.get(self.doc.subcontract_data.order_field)
)
for se_item in self.doc.items:
self.validate_subcontracting_order_for_bom(se_item, subcontract_order)
elif backflush_raw_materials_based_on == "Material Transferred for Subcontract":
for row in self.doc.items:
self.validate_subcontracting_order_for_transfer(row)
def validate_subcontracting_order_for_bom(self, child_row, subcontract_order):
item_code = child_row.original_item or child_row.item_code
required_qty = self._get_required_qty_for_bom(item_code, child_row, subcontract_order)
qty_allowance = flt(frappe.db.get_single_value("Buying Settings", "over_transfer_allowance"))
total_allowed = required_qty + (required_qty * qty_allowance / 100)
self._validate_transfer_qty(child_row, item_code, total_allowed)
self._link_rm_detail_if_missing(child_row, item_code)
def _get_required_qty_for_bom(self, item_code, child_row, subcontract_order):
required_qty = sum(
flt(d.required_qty) for d in subcontract_order.supplied_items if d.rm_item_code == item_code
)
if not required_qty and child_row.allow_alternative_item:
original_item_code = frappe.get_value(
"Item Alternative", {"alternative_item_code": item_code}, "item_code"
)
required_qty = sum(
flt(d.required_qty)
for d in subcontract_order.supplied_items
if d.rm_item_code == original_item_code
)
if not required_qty:
frappe.throw(
_("Item {0} not found in 'Raw Materials Supplied' table in {1} {2}").format(
item_code,
self.doc.subcontract_data.order_doctype,
self.doc.get(self.doc.subcontract_data.order_field),
)
)
return required_qty
def _validate_transfer_qty(self, child_row, item_code, total_allowed):
total_supplied = self.get_total_supplied_qty(child_row)
total_returned = (
self.get_total_returned_qty(child_row)
if self.doc.subcontract_data.order_doctype == "Subcontracting Order"
else 0
)
if flt(
total_supplied + child_row.transfer_qty - total_returned, child_row.precision("transfer_qty")
) > flt(total_allowed, child_row.precision("transfer_qty")):
frappe.throw(
_("Row #{0}: Item {1} cannot be transferred more than {2} against {3} {4}").format(
child_row.idx,
item_code,
total_allowed,
self.doc.subcontract_data.order_doctype,
self.doc.get(self.doc.subcontract_data.order_field),
)
)
def _link_rm_detail_if_missing(self, child_row, item_code):
if not child_row.get(self.doc.subcontract_data.rm_detail_field):
order_rm_detail = self.get_order_rm_detail(child_row)
if order_rm_detail:
child_row.db_set(self.doc.subcontract_data.rm_detail_field, order_rm_detail)
elif not child_row.allow_alternative_item:
frappe.throw(
_("Row {0}# Item {1} not found in 'Raw Materials Supplied' table in {2} {3}").format(
child_row.idx,
item_code,
self.doc.subcontract_data.order_doctype,
self.doc.get(self.doc.subcontract_data.order_field),
)
)
def validate_subcontracting_order_for_transfer(self, child_row):
if not child_row.subcontracted_item:
frappe.throw(
_("Row {0}: Subcontracted Item is mandatory for the raw material {1}").format(
child_row.idx, bold(child_row.item_code)
)
)
elif not child_row.get(self.doc.subcontract_data.rm_detail_field):
order_rm_detail = self.get_order_rm_detail(child_row)
if order_rm_detail:
child_row.db_set(self.doc.subcontract_data.rm_detail_field, order_rm_detail)
def get_total_supplied_qty(self, child_row):
se = frappe.qb.DocType("Stock Entry")
sed = frappe.qb.DocType("Stock Entry Detail")
order_filter = self._get_supplied_qty_order_filter(se, sed, child_row)
return (
frappe.qb.from_(se)
.inner_join(sed)
.on(se.name == sed.parent)
.select(Sum(sed.transfer_qty))
.where(
(se.purpose == "Send to Subcontractor")
& (se.docstatus == 1)
& (sed.item_code == child_row.item_code)
& order_filter
)
).run()[0][0] or 0
def _get_supplied_qty_order_filter(self, se, sed, child_row):
if self.doc.subcontract_data.order_doctype == "Purchase Order":
return (se.purchase_order == self.doc.purchase_order) & (sed.po_detail == self.doc.po_detail)
return (se.subcontracting_order == self.doc.subcontracting_order) & (
sed.sco_rm_detail == child_row.sco_rm_detail
)
def get_total_returned_qty(self, child_row):
se = frappe.qb.DocType("Stock Entry")
sed = frappe.qb.DocType("Stock Entry Detail")
return (
frappe.qb.from_(se)
.inner_join(sed)
.on(se.name == sed.parent)
.select(Sum(sed.transfer_qty))
.where(
(se.purpose == "Material Transfer")
& (se.docstatus == 1)
& (se.is_return == 1)
& (sed.item_code == child_row.item_code)
& (sed.sco_rm_detail == child_row.sco_rm_detail)
& (se.subcontracting_order == self.doc.subcontracting_order)
)
).run()[0][0] or 0
def get_order_rm_detail(self, child_row):
filters = {
"parent": self.doc.get(self.doc.subcontract_data.order_field),
"docstatus": 1,
"rm_item_code": child_row.item_code,
"main_item_code": child_row.subcontracted_item,
}
return frappe.db.get_value(self.doc.subcontract_data.order_supplied_items_field, filters, "name")
def on_submit(self):
self.update_subcontract_order_supplied_items()
def on_cancel(self):
self.update_subcontract_order_supplied_items()
def update_subcontract_order_supplied_items(self):
if not self.doc.get(self.doc.subcontract_data.order_field):
return
order_supplied_items = self._get_order_supplied_items()
supplied_items = self._get_supplied_items_details()
self._update_supplied_items_in_order(order_supplied_items, supplied_items)
self._update_reserved_qty_for_subcontracting(order_supplied_items)
def _get_order_supplied_items(self):
return frappe.db.get_all(
self.doc.subcontract_data.order_supplied_items_field,
filters={"parent": self.doc.get(self.doc.subcontract_data.order_field)},
fields=["name", "rm_item_code", "reserve_warehouse"],
)
def _get_supplied_items_details(self):
return get_supplied_items(
self.doc.get(self.doc.subcontract_data.order_field),
self.doc.subcontract_data.rm_detail_field,
self.doc.subcontract_data.order_field,
)
def _update_supplied_items_in_order(self, order_supplied_items, supplied_items):
for row in order_supplied_items:
item = supplied_items.get(row.name) or {
"supplied_qty": 0,
"returned_qty": 0,
"total_supplied_qty": 0,
}
frappe.db.set_value(self.doc.subcontract_data.order_supplied_items_field, row.name, item)
def _update_reserved_qty_for_subcontracting(self, order_supplied_items):
item_wh = {x.get("rm_item_code"): x.get("reserve_warehouse") for x in order_supplied_items}
for d in self.doc.get("items"):
item_code = d.get("original_item") or d.get("item_code")
reserve_warehouse = item_wh.get(item_code)
if not (reserve_warehouse and item_code):
continue
stock_bin = get_bin(item_code, reserve_warehouse)
stock_bin.update_reserved_qty_for_sub_contracting()
def get_supplied_items(
subcontract_order, rm_detail_field="sco_rm_detail", subcontract_order_field="subcontracting_order"
):
fields = [
"`tabStock Entry Detail`.`transfer_qty`",
"`tabStock Entry`.`is_return`",
f"`tabStock Entry Detail`.`{rm_detail_field}`",
"`tabStock Entry Detail`.`item_code`",
]
filters = [
["Stock Entry", "docstatus", "=", 1],
["Stock Entry", subcontract_order_field, "=", subcontract_order],
]
supplied_item_details = {}
for row in frappe.get_all("Stock Entry", fields=fields, filters=filters):
if not row.get(rm_detail_field):
continue
key = row.get(rm_detail_field)
if key not in supplied_item_details:
supplied_item_details.setdefault(
key, frappe._dict({"supplied_qty": 0, "returned_qty": 0, "total_supplied_qty": 0})
)
supplied_item = supplied_item_details[key]
if row.is_return:
supplied_item.returned_qty += row.transfer_qty
else:
supplied_item.supplied_qty += row.transfer_qty
supplied_item.total_supplied_qty = flt(supplied_item.supplied_qty) - flt(supplied_item.returned_qty)
return supplied_item_details
@frappe.whitelist()
def get_items_from_subcontract_order(source_name: str, target_doc: str | Document | None = None):
from erpnext.controllers.subcontracting_controller import make_rm_stock_entry
if isinstance(target_doc, str):
target_doc = frappe.get_doc(json.loads(target_doc))
order_doctype = "Purchase Order" if target_doc.purchase_order else "Subcontracting Order"
target_doc = make_rm_stock_entry(
subcontract_order=source_name, order_doctype=order_doctype, target_doc=target_doc
)
return target_doc

View File

@@ -909,6 +909,7 @@ class TestStockEntry(ERPNextTestSuite):
rm_cost += d.amount rm_cost += d.amount
fg_cost = next(filter(lambda x: x.item_code == "_Test FG Item", s.get("items"))).amount fg_cost = next(filter(lambda x: x.item_code == "_Test FG Item", s.get("items"))).amount
secondary_item_cost = next(filter(lambda x: x.type or x.is_legacy_scrap_item, s.get("items"))).amount secondary_item_cost = next(filter(lambda x: x.type or x.is_legacy_scrap_item, s.get("items"))).amount
self.assertEqual(fg_cost, flt(rm_cost - secondary_item_cost, 2)) self.assertEqual(fg_cost, flt(rm_cost - secondary_item_cost, 2))
# When Stock Entry has only FG + Scrap # When Stock Entry has only FG + Scrap
@@ -1783,7 +1784,7 @@ class TestStockEntry(ERPNextTestSuite):
def test_use_serial_and_batch_fields(self): def test_use_serial_and_batch_fields(self):
item = make_item( item = make_item(
"Test Use Serial and Batch Item SN Item", "Test Use Serial and Batch Item SN Item - A",
{"has_serial_no": 1, "is_stock_item": 1}, {"has_serial_no": 1, "is_stock_item": 1},
) )
@@ -2266,7 +2267,7 @@ class TestStockEntry(ERPNextTestSuite):
make_stock_entry(item_code=rm_item2, target=warehouse, qty=5, purpose="Material Receipt") make_stock_entry(item_code=rm_item2, target=warehouse, qty=5, purpose="Material Receipt")
bom_no = make_bom(item=fg_item, raw_materials=[rm_item1, rm_item2]).name bom_no = make_bom(item=fg_item, raw_materials=[rm_item1, rm_item2]).name
se = make_stock_entry(item_code=fg_item, qty=1, purpose="Manufacture", do_not_save=True) se = make_stock_entry(item_code=fg_item, qty=1, purpose="Repack", do_not_save=True)
se.from_bom = 1 se.from_bom = 1
se.use_multi_level_bom = 1 se.use_multi_level_bom = 1
se.bom_no = bom_no se.bom_no = bom_no
@@ -2304,7 +2305,6 @@ class TestStockEntry(ERPNextTestSuite):
se.to_warehouse = warehouse se.to_warehouse = warehouse
se.get_items() se.get_items()
# Verify FG as source (being consumed) # Verify FG as source (being consumed)
fg_items = [d for d in se.items if d.is_finished_item] fg_items = [d for d in se.items if d.is_finished_item]
self.assertEqual(len(fg_items), 1) self.assertEqual(len(fg_items), 1)
@@ -2331,7 +2331,9 @@ class TestStockEntry(ERPNextTestSuite):
"Stock Settings", {"sample_retention_warehouse": "_Test Warehouse 1 - _TC"} "Stock Settings", {"sample_retention_warehouse": "_Test Warehouse 1 - _TC"}
) )
def test_sample_retention_stock_entry(self): def test_sample_retention_stock_entry(self):
from erpnext.stock.doctype.stock_entry.stock_entry import move_sample_to_retention_warehouse from erpnext.stock.doctype.stock_entry.stock_entry_handler.manufacturing import (
move_sample_to_retention_warehouse,
)
warehouse = "_Test Warehouse - _TC" warehouse = "_Test Warehouse - _TC"
retain_sample_item = make_item( retain_sample_item = make_item(
@@ -2493,6 +2495,390 @@ class TestStockEntry(ERPNextTestSuite):
self.assertEqual(se.items[2].amount, 5) self.assertEqual(se.items[2].amount, 5)
class TestStockEntryCoverage(ERPNextTestSuite):
"""Tests for functions previously lacking dedicated coverage."""
# ── ceil_qty_if_uom_has_whole_number ──────────────────────────────────────
def test_ceil_qty_rounds_up_for_whole_number_uom(self):
from erpnext.stock.doctype.stock_entry.stock_entry_handler.manufacturing import (
ceil_qty_if_uom_has_whole_number,
)
frappe.set_value("UOM", "Nos", "must_be_whole_number", 1)
self.assertEqual(ceil_qty_if_uom_has_whole_number(2.3, "Nos"), 3)
frappe.set_value("UOM", "Nos", "must_be_whole_number", 0)
def test_ceil_qty_no_rounding_for_decimal_uom(self):
from erpnext.stock.doctype.stock_entry.stock_entry_handler.manufacturing import (
ceil_qty_if_uom_has_whole_number,
)
frappe.set_value("UOM", "Nos", "must_be_whole_number", 0)
self.assertEqual(ceil_qty_if_uom_has_whole_number(2.3, "Nos"), 2.3)
# ── get_uom_details ────────────────────────────────────────────────────────
def test_get_uom_details_returns_conversion_factor_and_transfer_qty(self):
from erpnext.stock.doctype.stock_entry.stock_entry import get_uom_details
result = get_uom_details("_Test Item", "Nos", 5)
self.assertEqual(flt(result.get("conversion_factor")), 1.0)
self.assertEqual(flt(result.get("transfer_qty")), 5.0)
# ── get_warehouse_details ──────────────────────────────────────────────────
def test_get_warehouse_details_returns_actual_qty_and_rate(self):
import json
from frappe.utils import nowdate, nowtime
from erpnext.stock.doctype.stock_entry.stock_entry import get_warehouse_details
make_stock_entry(
item_code="_Test Item",
target="_Test Warehouse - _TC",
qty=10,
basic_rate=100,
)
args = {
"item_code": "_Test Item",
"warehouse": "_Test Warehouse - _TC",
"posting_date": nowdate(),
"posting_time": nowtime(),
}
result = get_warehouse_details(json.dumps(args))
self.assertGreater(result.get("actual_qty", 0), 0)
self.assertGreater(result.get("basic_rate", 0), 0)
def test_get_warehouse_details_empty_args_returns_empty_dict(self):
from erpnext.stock.doctype.stock_entry.stock_entry import get_warehouse_details
self.assertEqual(get_warehouse_details({}), {})
# ── get_work_order_details ─────────────────────────────────────────────────
def test_get_work_order_details_returns_correct_fields(self):
from erpnext.stock.doctype.stock_entry.stock_entry import get_work_order_details
bom_no = frappe.db.get_value("BOM", {"item": "_Test FG Item 2", "is_default": 1, "docstatus": 1})
wo = frappe.new_doc("Work Order")
wo.update(
{
"company": "_Test Company",
"fg_warehouse": "_Test Warehouse 1 - _TC",
"production_item": "_Test FG Item 2",
"bom_no": bom_no,
"qty": 3.0,
"stock_uom": "_Test UOM",
"wip_warehouse": "_Test Warehouse - _TC",
}
)
wo.insert()
wo.submit()
result = get_work_order_details(wo.name, "_Test Company")
self.assertEqual(result["bom_no"], bom_no)
self.assertEqual(result["fg_completed_qty"], 3.0)
self.assertEqual(result["from_bom"], 1)
self.assertEqual(result["wip_warehouse"], wo.wip_warehouse)
# ── get_production_item_details ────────────────────────────────────────────
def test_get_production_item_details_from_bom(self):
from erpnext.stock.doctype.stock_entry.stock_entry_handler.manufacturing import (
get_production_item_details,
)
bom_no = frappe.db.get_value("BOM", {"item": "_Test FG Item 2", "is_default": 1, "docstatus": 1})
result = get_production_item_details(bom_no=bom_no)
self.assertEqual(result.name, "_Test FG Item 2")
self.assertIsNotNone(result.stock_uom)
def test_get_production_item_details_from_work_order(self):
from erpnext.stock.doctype.stock_entry.stock_entry_handler.manufacturing import (
get_production_item_details,
)
bom_no = frappe.db.get_value("BOM", {"item": "_Test FG Item 2", "is_default": 1, "docstatus": 1})
wo = frappe.new_doc("Work Order")
wo.update(
{
"company": "_Test Company",
"fg_warehouse": "_Test Warehouse 1 - _TC",
"production_item": "_Test FG Item 2",
"bom_no": bom_no,
"qty": 1.0,
"stock_uom": "_Test UOM",
"wip_warehouse": "_Test Warehouse - _TC",
}
)
wo.insert()
wo.submit()
result = get_production_item_details(work_order=wo.name)
self.assertEqual(result.name, "_Test FG Item 2")
# ── get_bom_items ──────────────────────────────────────────────────────────
def test_get_bom_items_returns_raw_materials_with_structure(self):
from erpnext.stock.doctype.stock_entry.stock_entry_handler.manufacturing import get_bom_items
bom_no = frappe.db.get_value("BOM", {"item": "_Test FG Item 2", "is_default": 1, "docstatus": 1})
items = get_bom_items(bom_no)
self.assertGreater(len(items), 0)
for item in items:
self.assertIn("item_code", item)
self.assertIn("qty", item)
def test_get_bom_items_scales_qty_proportionally(self):
from erpnext.stock.doctype.stock_entry.stock_entry_handler.manufacturing import get_bom_items
bom_no = frappe.db.get_value("BOM", {"item": "_Test FG Item 2", "is_default": 1, "docstatus": 1})
items_1 = {i["item_code"]: i["qty"] for i in get_bom_items(bom_no, qty=1)}
items_2 = {i["item_code"]: i["qty"] for i in get_bom_items(bom_no, qty=2)}
for item_code, qty_at_1 in items_1.items():
self.assertAlmostEqual(items_2[item_code], qty_at_1 * 2, places=4)
# ── validate_sample_quantity ───────────────────────────────────────────────
@ERPNextTestSuite.change_settings(
"Stock Settings", {"sample_retention_warehouse": "_Test Warehouse 1 - _TC"}
)
def test_validate_sample_quantity_raises_when_sample_exceeds_received_qty(self):
from erpnext.stock.doctype.stock_entry.stock_entry_handler.manufacturing import (
validate_sample_quantity,
)
item = make_item(
"_Sample Qty Excess Item",
{"is_stock_item": 1, "retain_sample": 1, "sample_quantity": 2},
)
self.assertRaises(frappe.ValidationError, validate_sample_quantity, item.name, 10, 5)
# ── get_expired_batches ────────────────────────────────────────────────────
def test_get_expired_batches_includes_expired_batch(self):
from erpnext.stock.doctype.batch.test_batch import make_new_batch
from erpnext.stock.doctype.stock_entry.stock_entry_handler.serial_batch import (
get_expired_batches,
)
item = make_item("_Test Expired Batch Item", {"is_stock_item": 1, "has_batch_no": 1})
batch = make_new_batch(
batch_id=frappe.generate_hash("", 5),
item_code=item.name,
expiry_date=add_days(today(), -1),
)
expired = get_expired_batches()
self.assertIn(batch.name, expired)
self.assertEqual(expired[batch.name].item, item.name)
def test_get_expired_batches_excludes_future_batch(self):
from erpnext.stock.doctype.batch.test_batch import make_new_batch
from erpnext.stock.doctype.stock_entry.stock_entry_handler.serial_batch import (
get_expired_batches,
)
item = make_item("_Test Future Batch Item", {"is_stock_item": 1, "has_batch_no": 1})
future_batch = make_new_batch(
batch_id=frappe.generate_hash("", 5),
item_code=item.name,
expiry_date=add_days(today(), 10),
)
expired = get_expired_batches()
self.assertNotIn(future_batch.name, expired)
# ── validate_source_stock_entry ────────────────────────────────────────────
def test_validate_source_stock_entry_skips_when_no_source(self):
se = frappe.new_doc("Stock Entry")
se.source_stock_entry = None
se.validate_source_stock_entry() # must not raise
def test_validate_source_stock_entry_throws_on_work_order_mismatch(self):
source_se = make_stock_entry(
item_code="_Test Item",
target="_Test Warehouse - _TC",
qty=1,
basic_rate=100,
)
frappe.db.set_value("Stock Entry", source_se.name, "work_order", "WO-FAKE-SOURCE-001")
se = frappe.new_doc("Stock Entry")
se.source_stock_entry = source_se.name
se.work_order = "WO-FAKE-TARGET-999"
self.assertRaises(frappe.ValidationError, se.validate_source_stock_entry)
def test_validate_source_stock_entry_passes_with_matching_work_order(self):
source_se = make_stock_entry(
item_code="_Test Item",
target="_Test Warehouse - _TC",
qty=1,
basic_rate=100,
)
frappe.db.set_value("Stock Entry", source_se.name, "work_order", "WO-SAME-001")
se = frappe.new_doc("Stock Entry")
se.source_stock_entry = source_se.name
se.work_order = "WO-SAME-001"
se.validate_source_stock_entry() # must not raise
# ── validate_job_card_fg_item ──────────────────────────────────────────────
def test_validate_job_card_fg_item_skips_when_no_job_card(self):
se = frappe.new_doc("Stock Entry")
se.job_card = None
se.validate_job_card_fg_item() # must not raise
def test_validate_job_card_fg_item_throws_when_fg_item_mismatches(self):
wrong_fg = make_item("_JC Wrong FG Item", {"is_stock_item": 1}).name
jc_name = frappe.db.get_value("Job Card", {"docstatus": 1, "finished_good": ("!=", "")})
if not jc_name:
return # skip if no suitable job card in test data
jc = frappe.db.get_value("Job Card", jc_name, ["finished_good"], as_dict=1)
if jc.finished_good == wrong_fg:
return # skip if the wrong_fg happens to match
se = frappe.new_doc("Stock Entry")
se.job_card = jc_name
se.append("items", {"item_code": wrong_fg, "is_finished_item": 1, "qty": 1})
self.assertRaises(frappe.ValidationError, se.validate_job_card_fg_item)
# ── validate_job_card_item ─────────────────────────────────────────────────
def test_validate_job_card_item_skips_when_no_job_card(self):
se = frappe.new_doc("Stock Entry")
se.job_card = None
se.validate_job_card_item() # must not raise
def test_validate_job_card_item_skips_for_manufacture_purpose(self):
se = frappe.new_doc("Stock Entry")
se.job_card = "SOME-JC-001"
se.purpose = "Manufacture"
se.validate_job_card_item() # must not raise even with a job card set
@ERPNextTestSuite.change_settings("Manufacturing Settings", {"job_card_excess_transfer": 0})
def test_validate_job_card_item_throws_when_job_card_item_ref_missing(self):
jc_name = frappe.db.get_value("Job Card", {"docstatus": 1})
if not jc_name:
return # skip if no job cards in test data
se = frappe.new_doc("Stock Entry")
se.job_card = jc_name
se.purpose = "Material Transfer for Manufacture"
se.append(
"items",
{
"item_code": "_Test Item",
"s_warehouse": "_Test Warehouse - _TC",
"qty": 1,
"job_card_item": None,
},
)
self.assertRaises(frappe.ValidationError, se.validate_job_card_item)
# ── get_available_materials ────────────────────────────────────────────────
def test_get_available_materials_tracks_transferred_qty(self):
from erpnext.manufacturing.doctype.production_plan.test_production_plan import make_bom
from erpnext.manufacturing.doctype.work_order.work_order import (
make_stock_entry as _make_stock_entry,
)
from erpnext.stock.doctype.stock_entry.stock_entry_handler.disassemble import (
get_available_materials,
)
fg = make_item("_AM FG Item", {"is_stock_item": 1}).name
rm = make_item("_AM RM Item", {"is_stock_item": 1}).name
source_wh = "_Test Warehouse - _TC"
wip_wh = "_Test Warehouse 1 - _TC"
make_stock_entry(item_code=rm, target=source_wh, qty=20, purpose="Material Receipt")
bom_no = make_bom(item=fg, raw_materials=[rm]).name
wo = frappe.new_doc("Work Order")
wo.update(
{
"company": "_Test Company",
"fg_warehouse": "_Test Warehouse 1 - _TC",
"production_item": fg,
"bom_no": bom_no,
"qty": 2.0,
"stock_uom": "Nos",
"wip_warehouse": wip_wh,
}
)
wo.insert()
wo.submit()
transfer_se = frappe.get_doc(_make_stock_entry(wo.name, "Material Transfer for Manufacture", 2))
for d in transfer_se.items:
d.s_warehouse = source_wh
transfer_se.insert()
transfer_se.submit()
materials = get_available_materials(wo.name)
self.assertGreater(len(materials), 0)
key = (rm, wip_wh)
self.assertIn(key, materials)
self.assertGreater(materials[key].qty, 0)
def test_get_available_materials_reduces_qty_after_consumption(self):
from erpnext.manufacturing.doctype.production_plan.test_production_plan import make_bom
from erpnext.manufacturing.doctype.work_order.work_order import (
make_stock_entry as _make_stock_entry,
)
from erpnext.stock.doctype.stock_entry.stock_entry_handler.disassemble import (
get_available_materials,
)
fg = make_item("_AM2 FG Item", {"is_stock_item": 1}).name
rm = make_item("_AM2 RM Item", {"is_stock_item": 1}).name
source_wh = "_Test Warehouse - _TC"
wip_wh = "_Test Warehouse 1 - _TC"
make_stock_entry(item_code=rm, target=source_wh, qty=20, purpose="Material Receipt")
bom_no = make_bom(item=fg, raw_materials=[rm]).name
wo = frappe.new_doc("Work Order")
wo.update(
{
"company": "_Test Company",
"fg_warehouse": "_Test Warehouse 1 - _TC",
"production_item": fg,
"bom_no": bom_no,
"qty": 2.0,
"stock_uom": "Nos",
"wip_warehouse": wip_wh,
}
)
wo.insert()
wo.submit()
transfer_se = frappe.get_doc(_make_stock_entry(wo.name, "Material Transfer for Manufacture", 2))
for d in transfer_se.items:
d.s_warehouse = source_wh
transfer_se.insert()
transfer_se.submit()
manufacture_se = frappe.get_doc(_make_stock_entry(wo.name, "Manufacture", 2))
manufacture_se.insert()
manufacture_se.submit()
materials = get_available_materials(wo.name)
key = (rm, wip_wh)
if key in materials:
self.assertEqual(materials[key].qty, 0)
def make_serialized_item(self, **args): def make_serialized_item(self, **args):
args = frappe._dict(args) args = frappe._dict(args)
se = frappe.copy_doc(self.globalTestRecords["Stock Entry"][0]) se = frappe.copy_doc(self.globalTestRecords["Stock Entry"][0])

View File

@@ -1,8 +1,24 @@
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors # Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# License: GNU General Public License v3. See license.txt # License: GNU General Public License v3. See license.txt
import frappe
from frappe import _, bold
from frappe.model.document import Document from frappe.model.document import Document
from frappe.utils import (
cint,
cstr,
flt,
format_time,
formatdate,
get_link_to_form,
getdate,
nowdate,
)
from erpnext.stock.doctype.stock_reconciliation.stock_reconciliation import (
OpeningEntryAccountError,
)
from erpnext.stock.stock_ledger import NegativeStockError, get_previous_sle, is_negative_stock_allowed
class StockEntryDetail(Document): class StockEntryDetail(Document):
@@ -73,4 +89,143 @@ class StockEntryDetail(Document):
valuation_rate: DF.Currency valuation_rate: DF.Currency
# end: auto-generated types # end: auto-generated types
pass def validate_batch(self):
if not self.batch_no:
return
disabled = frappe.db.get_value("Batch", self.batch_no, "disabled")
if disabled:
frappe.throw(_("Batch {0} of Item {1} is disabled.").format(self.batch_no, self.item_code))
return
expiry_date = frappe.db.get_value("Batch", self.batch_no, "expiry_date")
if expiry_date and getdate(self.parent_doc.posting_date) > getdate(expiry_date):
frappe.throw(_("Batch {0} of Item {1} has expired.").format(self.batch_no, self.item_code))
def validate_and_update_item_details(self, item_details, company, purpose):
if flt(self.qty) and flt(self.qty) < 0:
frappe.throw(
_("Row {0}: The item {1}, quantity must be positive number").format(
self.idx, bold(self.item_code)
)
)
if item_details.get("is_stock_item") != 1:
frappe.throw(_("{0} is not a stock Item").format(self.item_code))
reset_fields = ("stock_uom", "item_name")
for field in reset_fields:
self.set(field, item_details.get(field))
update_fields = (
"uom",
"description",
"expense_account",
"cost_center",
"conversion_factor",
"barcode",
)
for field in update_fields:
if not self.get(field):
self.set(field, item_details.get(field))
if field == "conversion_factor" and self.uom == item_details.get("stock_uom"):
self.set(field, item_details.get(field))
if not self.transfer_qty and self.qty:
self.transfer_qty = flt(
flt(self.qty) * flt(self.conversion_factor), self.precision("transfer_qty")
)
if purpose == "Subcontracting Delivery":
self.expense_account = frappe.get_value("Company", company, "default_expense_account")
def validate_expense_account(self, is_opening, purpose):
if not self.expense_account:
frappe.throw(
_(
"Please enter <b>Difference Account</b> or set default "
"<b>Stock Adjustment Account</b> for company {0}"
).format(bold(self.parent_doc.company))
)
acc_details = frappe.get_cached_value(
"Account",
self.expense_account,
["account_type", "report_type"],
as_dict=True,
)
if is_opening == "Yes" and acc_details.report_type == "Profit and Loss":
frappe.throw(
_(
"Difference Account must be a Asset/Liability type account "
"(Temporary Opening), since this Stock Entry is an Opening Entry"
),
OpeningEntryAccountError,
)
if acc_details.account_type == "Stock":
frappe.throw(
_("At row #{0}: the Difference Account must not be a Stock type account...").format(
self.idx, get_link_to_form("Account", self.expense_account)
),
title=_("Difference Account in Items Table"),
)
if (
purpose not in ["Material Issue", "Subcontracting Delivery"]
and acc_details.account_type == "Cost of Goods Sold"
):
frappe.msgprint(
_("At row #{0}: you have selected the Difference Account {1}...").format(
self.idx, bold(get_link_to_form("Account", self.expense_account))
),
indicator="orange",
alert=1,
)
def set_transfer_qty(self):
if not flt(self.conversion_factor):
frappe.throw(_("Row {0}: UOM Conversion Factor is mandatory").format(self.idx))
self.transfer_qty = flt(flt(self.qty) * flt(self.conversion_factor), self.precision("transfer_qty"))
if not flt(self.transfer_qty):
frappe.throw(
_("Row {0}: Qty in Stock UOM can not be zero.").format(self.idx), title=_("Zero quantity")
)
def set_actual_qty(self, posting_date, posting_time):
previous_sle = get_previous_sle(
{
"item_code": self.item_code,
"warehouse": self.s_warehouse or self.t_warehouse,
"posting_date": posting_date,
"posting_time": posting_time,
}
)
# get actual stock at source warehouse
self.actual_qty = previous_sle.get("qty_after_transaction") or 0
def delink_asset_repair_sabb(self, asset_repair):
if not self.serial_and_batch_bundle:
return
voucher_detail_no = frappe.db.get_value(
"Asset Repair Consumed Item",
{"parent": asset_repair, "serial_and_batch_bundle": self.serial_and_batch_bundle},
"name",
)
if not voucher_detail_no:
return
doc = frappe.get_doc("Serial and Batch Bundle", self.serial_and_batch_bundle)
doc.db_set(
{
"voucher_type": "Asset Repair",
"voucher_no": asset_repair,
"voucher_detail_no": voucher_detail_no,
}
)

View File

@@ -123,8 +123,9 @@ class ManufactureEntry:
available_serial_batches = self.get_transferred_serial_batches() available_serial_batches = self.get_transferred_serial_batches()
for item_code, _dict in item_dict.items(): for item_code, _dict in item_dict.items():
_dict.from_warehouse = self.source_wh.get(item_code) or self.wip_warehouse _dict.s_warehouse = self.source_wh.get(item_code) or self.wip_warehouse
_dict.to_warehouse = "" _dict.t_warehouse = ""
_dict.item_code = item_code
if backflush_based_on != "BOM" and not frappe.db.get_value( if backflush_based_on != "BOM" and not frappe.db.get_value(
"Job Card", self.job_card, "skip_material_transfer" "Job Card", self.job_card, "skip_material_transfer"
@@ -138,7 +139,7 @@ class ManufactureEntry:
_dict.qty = calculated_qty _dict.qty = calculated_qty
self.update_available_serial_batches(_dict, available_serial_batches) self.update_available_serial_batches(_dict, available_serial_batches)
self.stock_entry.add_to_stock_entry_detail(item_dict) self.stock_entry.append("items", _dict)
def parse_available_serial_batches(self, item_dict, available_serial_batches): def parse_available_serial_batches(self, item_dict, available_serial_batches):
key = (item_dict.item_code, item_dict.from_warehouse) key = (item_dict.item_code, item_dict.from_warehouse)
@@ -309,8 +310,8 @@ class ManufactureEntry:
item = get_item_defaults(self.production_item, self.company) item = get_item_defaults(self.production_item, self.company)
args = { args = {
"to_warehouse": self.fg_warehouse, "t_warehouse": self.fg_warehouse,
"from_warehouse": "", "s_warehouse": "",
"qty": self.for_quantity - self.process_loss_qty, "qty": self.for_quantity - self.process_loss_qty,
"item_name": item.item_name, "item_name": item.item_name,
"description": item.description, "description": item.description,
@@ -318,6 +319,7 @@ class ManufactureEntry:
"expense_account": item.get("expense_account"), "expense_account": item.get("expense_account"),
"cost_center": item.get("buying_cost_center"), "cost_center": item.get("buying_cost_center"),
"is_finished_item": 1, "is_finished_item": 1,
"item_code": self.production_item,
} }
self.stock_entry.add_to_stock_entry_detail({self.production_item: args}, bom_no=self.bom_no) self.stock_entry.append("items", args)

View File

@@ -1898,3 +1898,32 @@ def update_serial_batch_delivered_qty(row, name, is_cancelled=False):
) )
query.run() query.run()
def get_reserved_materials(voucher_no):
doctype = frappe.qb.DocType("Stock Reservation Entry")
serial_batch_doc = frappe.qb.DocType("Serial and Batch Entry")
query = (
frappe.qb.from_(doctype)
.inner_join(serial_batch_doc)
.on(doctype.name == serial_batch_doc.parent)
.select(
serial_batch_doc.serial_no,
serial_batch_doc.batch_no,
serial_batch_doc.qty,
doctype.item_code,
doctype.warehouse,
doctype.name,
doctype.transferred_qty,
doctype.consumed_qty,
)
.where(
(doctype.docstatus == 1)
& (doctype.voucher_no == voucher_no)
& (serial_batch_doc.delivered_qty < serial_batch_doc.qty)
)
.orderby(serial_batch_doc.idx)
)
return query.run(as_dict=True)

View File

@@ -372,13 +372,14 @@ class SubcontractingInwardOrder(SubcontractingController):
items_dict = { items_dict = {
rm_item.get("rm_item_code"): { rm_item.get("rm_item_code"): {
"scio_detail": rm_item.get("name"), "scio_detail": rm_item.get("name"),
"item_code": rm_item.get("rm_item_code"),
"qty": calculate_qty_as_per_bom(rm_item), "qty": calculate_qty_as_per_bom(rm_item),
"to_warehouse": rm_item.get("warehouse"), "t_warehouse": rm_item.get("warehouse"),
"stock_uom": rm_item.get("stock_uom"), "stock_uom": rm_item.get("stock_uom"),
} }
} }
stock_entry.add_to_stock_entry_detail(items_dict) stock_entry.append("items", items_dict[rm_item.get("rm_item_code")])
if target_doc: if target_doc:
return stock_entry return stock_entry
@@ -413,13 +414,16 @@ class SubcontractingInwardOrder(SubcontractingController):
items_dict = { items_dict = {
rm_item.get("rm_item_code"): { rm_item.get("rm_item_code"): {
"scio_detail": rm_item.get("name"), "scio_detail": rm_item.get("name"),
"item_code": rm_item.get("rm_item_code"),
"qty": rm_item.received_qty - rm_item.work_order_qty - rm_item.returned_qty, "qty": rm_item.received_qty - rm_item.work_order_qty - rm_item.returned_qty,
"from_warehouse": rm_item.get("warehouse"), "s_warehouse": rm_item.get("warehouse"),
"stock_uom": rm_item.get("stock_uom"), "stock_uom": rm_item.get("stock_uom"),
} }
} }
stock_entry.add_to_stock_entry_detail(items_dict) ste_item = items_dict[rm_item.get("rm_item_code")]
if ste_item.get("qty"):
stock_entry.append("items", ste_item)
if target_doc: if target_doc:
return stock_entry return stock_entry
@@ -465,14 +469,15 @@ class SubcontractingInwardOrder(SubcontractingController):
items_dict = { items_dict = {
fg_item.item_code: { fg_item.item_code: {
"qty": qty, "qty": qty,
"from_warehouse": fg_item.delivery_warehouse, "item_code": fg_item.item_code,
"s_warehouse": fg_item.delivery_warehouse,
"stock_uom": fg_item.stock_uom, "stock_uom": fg_item.stock_uom,
"scio_detail": fg_item.name, "scio_detail": fg_item.name,
"is_finished_item": 1, "is_finished_item": 1,
} }
} }
stock_entry.add_to_stock_entry_detail(items_dict) stock_entry.append("items", items_dict[fg_item.item_code])
if ( if (
frappe.get_single_value("Selling Settings", "deliver_secondary_items") frappe.get_single_value("Selling Settings", "deliver_secondary_items")
@@ -490,14 +495,15 @@ class SubcontractingInwardOrder(SubcontractingController):
items_dict = { items_dict = {
secondary_item.item_code: { secondary_item.item_code: {
"qty": secondary_item.produced_qty - secondary_item.delivered_qty, "qty": secondary_item.produced_qty - secondary_item.delivered_qty,
"from_warehouse": secondary_item.warehouse, "item_code": secondary_item.item_code,
"s_warehouse": secondary_item.warehouse,
"stock_uom": secondary_item.stock_uom, "stock_uom": secondary_item.stock_uom,
"scio_detail": secondary_item.name, "scio_detail": secondary_item.name,
"type": secondary_item.type, "type": secondary_item.type,
} }
} }
stock_entry.add_to_stock_entry_detail(items_dict) stock_entry.append("items", items_dict[secondary_item.item_code])
if target_doc: if target_doc:
return stock_entry return stock_entry
@@ -536,13 +542,14 @@ class SubcontractingInwardOrder(SubcontractingController):
items_dict = { items_dict = {
fg_item.item_code: { fg_item.item_code: {
"qty": qty, "qty": qty,
"item_code": fg_item.item_code,
"stock_uom": fg_item.stock_uom, "stock_uom": fg_item.stock_uom,
"scio_detail": fg_item.name, "scio_detail": fg_item.name,
"is_finished_item": 1, "is_finished_item": 1,
} }
} }
stock_entry.add_to_stock_entry_detail(items_dict) stock_entry.append("items", items_dict[fg_item.item_code])
if target_doc: if target_doc:
return stock_entry return stock_entry

View File

@@ -240,6 +240,7 @@ class IntegrationTestSubcontractingInwardOrder(ERPNextTestSuite):
delivery = frappe.new_doc("Stock Entry").update(scio.make_subcontracting_delivery()) delivery = frappe.new_doc("Stock Entry").update(scio.make_subcontracting_delivery())
delivery.items[0].use_serial_batch_fields = 1 delivery.items[0].use_serial_batch_fields = 1
delivery.save() delivery.save()
delivery.submit()
delivery_serial_list, _ = get_serial_batch_list_from_item(delivery.items[0]) delivery_serial_list, _ = get_serial_batch_list_from_item(delivery.items[0])
self.assertEqual(sorted(serial_list), sorted(delivery_serial_list)) self.assertEqual(sorted(serial_list), sorted(delivery_serial_list))