refactor: split stock_entry.py into multiple files for better readability

This commit is contained in:
Rohit Waghchaure
2026-05-18 20:16:13 +05:30
parent ae9c632e39
commit e4b5e6bd1e
24 changed files with 3126 additions and 2781 deletions

View File

@@ -1426,7 +1426,7 @@ def make_rm_stock_entry(
} }
} }
target_doc.add_to_stock_entry_detail(items_dict) target_doc.append("items", items_dict[rm_item_code])
stock_entry = get_mapped_doc( stock_entry = get_mapped_doc(
order_doctype, order_doctype,

View File

@@ -2002,7 +2002,7 @@ def get_secondary_items_from_sub_assemblies(bom_no, company, qty, secondary_item
return secondary_items return secondary_items
def get_backflush_based_on(bom_no): def get_backflush_based_on(bom_no=None):
backflush_based_on = None backflush_based_on = None
if bom_no: if bom_no:
backflush_based_on = frappe.get_cached_value("BOM", bom_no, "backflush_based_on") backflush_based_on = frappe.get_cached_value("BOM", bom_no, "backflush_based_on")

View File

@@ -1476,6 +1476,8 @@ class JobCard(Document):
@frappe.whitelist() @frappe.whitelist()
def make_stock_entry_for_semi_fg_item(self, auto_submit: bool = False): def make_stock_entry_for_semi_fg_item(self, auto_submit: bool = False):
from erpnext.stock.doctype.stock_entry.stock_entry_handler.manufacturing import ManufactureStockEntry
def get_consumed_process_loss(): def get_consumed_process_loss():
table = frappe.qb.DocType("Stock Entry") table = frappe.qb.DocType("Stock Entry")
query = ( query = (
@@ -1511,9 +1513,7 @@ class JobCard(Document):
ste.stock_entry.flags.ignore_mandatory = True ste.stock_entry.flags.ignore_mandatory = True
wo_doc = frappe.get_doc("Work Order", self.work_order) wo_doc = frappe.get_doc("Work Order", self.work_order)
add_additional_cost(ste.stock_entry, wo_doc, self) add_additional_cost(ste.stock_entry, wo_doc, self)
ManufactureStockEntry(ste.stock_entry).add_secondary_items_from_job_card()
ste.stock_entry.pro_doc = frappe.get_doc("Work Order", self.work_order)
ste.stock_entry.set_secondary_items_from_job_card()
for row in ste.stock_entry.items: for row in ste.stock_entry.items:
if (row.type or row.is_legacy_scrap_item) and not row.t_warehouse: if (row.type or row.is_legacy_scrap_item) and not row.t_warehouse:
row.t_warehouse = self.target_warehouse row.t_warehouse = self.target_warehouse

View File

@@ -31,6 +31,7 @@ from erpnext.stock.doctype.serial_and_batch_bundle.test_serial_and_batch_bundle
) )
from erpnext.stock.doctype.serial_no.serial_no import get_serial_nos from erpnext.stock.doctype.serial_no.serial_no import get_serial_nos
from erpnext.stock.doctype.stock_entry import test_stock_entry from erpnext.stock.doctype.stock_entry import test_stock_entry
from erpnext.stock.doctype.stock_entry.stock_entry_handler.manufacturing import ManufactureStockEntry
from erpnext.stock.doctype.warehouse.test_warehouse import create_warehouse from erpnext.stock.doctype.warehouse.test_warehouse import create_warehouse
from erpnext.stock.utils import get_bin from erpnext.stock.utils import get_bin
from erpnext.tests.utils import ERPNextTestSuite from erpnext.tests.utils import ERPNextTestSuite
@@ -1319,7 +1320,7 @@ class TestWorkOrder(ERPNextTestSuite):
stock_entry = frappe.get_doc(make_stock_entry(wo_order.name, "Manufacture", 10)) stock_entry = frappe.get_doc(make_stock_entry(wo_order.name, "Manufacture", 10))
stock_entry.set_work_order_details() stock_entry.set_work_order_details()
stock_entry.set_serial_no_batch_for_finished_good() ManufactureStockEntry(stock_entry).set_serial_nos_for_finished_good()
for row in stock_entry.items: for row in stock_entry.items:
if row.item_code == fg_item: if row.item_code == fg_item:
self.assertTrue(row.serial_and_batch_bundle) self.assertTrue(row.serial_and_batch_bundle)
@@ -1360,7 +1361,7 @@ class TestWorkOrder(ERPNextTestSuite):
stock_entry = frappe.get_doc(make_stock_entry(wo_order.name, "Manufacture", 10)) stock_entry = frappe.get_doc(make_stock_entry(wo_order.name, "Manufacture", 10))
stock_entry.set_work_order_details() stock_entry.set_work_order_details()
stock_entry.set_serial_no_batch_for_finished_good() ManufactureStockEntry(stock_entry).set_serial_nos_for_finished_good()
for row in stock_entry.items: for row in stock_entry.items:
if row.item_code == fg_item: if row.item_code == fg_item:
self.assertTrue(row.serial_and_batch_bundle) self.assertTrue(row.serial_and_batch_bundle)

View File

@@ -2453,10 +2453,6 @@ def make_stock_entry(
stock_entry.set_stock_entry_type() stock_entry.set_stock_entry_type()
stock_entry.is_additional_transfer_entry = is_additional_transfer_entry stock_entry.is_additional_transfer_entry = is_additional_transfer_entry
stock_entry.get_items() stock_entry.get_items()
stock_entry.set_secondary_items_from_job_card()
if purpose != "Disassemble":
stock_entry.set_serial_no_batch_for_finished_good()
return stock_entry.as_dict() return stock_entry.as_dict()
@@ -2817,11 +2813,9 @@ def get_reserved_qty_for_production(
@frappe.whitelist() @frappe.whitelist()
def make_stock_return_entry(work_order: str): def make_stock_return_entry(work_order: str):
from erpnext.stock.doctype.stock_entry.stock_entry import get_available_materials from erpnext.stock.doctype.stock_entry.stock_entry_handler.manufacturing import (
ManufactureStockEntry,
non_consumed_items = get_available_materials(work_order) )
if not non_consumed_items:
return
wo_doc = frappe.get_cached_doc("Work Order", work_order) wo_doc = frappe.get_cached_doc("Work Order", work_order)
@@ -2831,9 +2825,11 @@ def make_stock_return_entry(work_order: str):
stock_entry.work_order = work_order stock_entry.work_order = work_order
stock_entry.purpose = "Material Transfer for Manufacture" stock_entry.purpose = "Material Transfer for Manufacture"
stock_entry.bom_no = wo_doc.bom_no stock_entry.bom_no = wo_doc.bom_no
stock_entry.add_transfered_raw_materials_in_items()
stock_entry.set_stock_entry_type() stock_entry.set_stock_entry_type()
ste_cls = ManufactureStockEntry(stock_entry)
ste_cls.add_raw_materials_based_on_transfer()
ste_cls.return_available_materials_in_source_wh()
return stock_entry return stock_entry

View File

@@ -164,6 +164,40 @@ class Project(Document):
self.check_depends_on_value(template_task, project_task, project_tasks) self.check_depends_on_value(template_task, project_task, project_tasks)
self.check_for_parent_tasks(template_task, project_task, project_tasks) self.check_for_parent_tasks(template_task, project_task, project_tasks)
def set_consumed_material_cost(self):
parent_doc = frappe.qb.DocType("Stock Entry")
child_doc = frappe.qb.DocType("Stock Entry Detail")
lcv_doc = frappe.qb.DocType("Landed Cost Taxes and Charges")
amount = (
qb.from_(child_doc)
.select(Sum(child_doc.amount))
.where(
(child_doc.project == self.name)
& (child_doc.docstatus == 1)
& ((child_doc.t_warehouse.isnull()) | (child_doc.t_warehouse == ""))
)
).run(as_list=1)
amount = flt(amount[0][0]) if amount else 0
additional_costs = (
qb.from_(parent_doc)
.join(lcv_doc)
.on(parent_doc.name == lcv_doc.parent)
.select(Sum(lcv_doc.base_amount))
.where(
(parent_doc.project == self.name)
& (parent_doc.docstatus == 1)
& (parent_doc.purpose == "Manufacture")
)
).run(as_list=1)
additional_cost_amt = flt(additional_costs[0][0]) if additional_costs else 0
amount += additional_cost_amt
self.total_consumed_material_cost = amount
def check_depends_on_value(self, template_task, project_task, project_tasks): def check_depends_on_value(self, template_task, project_task, project_tasks):
if template_task.get("depends_on") and not project_task.get("depends_on"): if template_task.get("depends_on") and not project_task.get("depends_on"):
project_template_map = {pt.template_task: pt for pt in project_tasks} project_template_map = {pt.template_task: pt for pt in project_tasks}

View File

@@ -768,7 +768,6 @@ def make_stock_entry(source_name: str, target_doc: str | Document | None = None)
target.set_actual_qty() target.set_actual_qty()
target.calculate_rate_and_amount(raise_error_if_no_rate=False) target.calculate_rate_and_amount(raise_error_if_no_rate=False)
target.stock_entry_type = target.purpose target.stock_entry_type = target.purpose
target.set_job_card_data()
if source.job_card: if source.job_card:
job_card_details = frappe.get_all( job_card_details = frappe.get_all(

View File

@@ -342,7 +342,7 @@ erpnext.stock.PurchaseReceiptController = class PurchaseReceiptController extend
make_retention_stock_entry() { make_retention_stock_entry() {
frappe.call({ frappe.call({
method: "erpnext.stock.doctype.stock_entry.stock_entry.move_sample_to_retention_warehouse", method: "erpnext.stock.doctype.stock_entry.stock_entry_handler.manufacturing.move_sample_to_retention_warehouse",
args: { args: {
company: cur_frm.doc.company, company: cur_frm.doc.company,
items: cur_frm.doc.items, items: cur_frm.doc.items,
@@ -455,7 +455,7 @@ var validate_sample_quantity = function (frm, cdt, cdn) {
var d = locals[cdt][cdn]; var d = locals[cdt][cdn];
if (d.sample_quantity && d.qty) { if (d.sample_quantity && d.qty) {
frappe.call({ frappe.call({
method: "erpnext.stock.doctype.stock_entry.stock_entry.validate_sample_quantity", method: "erpnext.stock.doctype.stock_entry.stock_entry_handler.manufacturing.validate_sample_quantity",
args: { args: {
batch_no: d.batch_no, batch_no: d.batch_no,
item_code: d.item_code, item_code: d.item_code,

View File

@@ -496,7 +496,7 @@ frappe.ui.form.on("Stock Entry", {
__("Expired Batches"), __("Expired Batches"),
function () { function () {
frappe.call({ frappe.call({
method: "erpnext.stock.doctype.stock_entry.stock_entry.get_expired_batch_items", method: "erpnext.stock.doctype.stock_entry.stock_entry_handler.serial_batch.get_expired_batch_items",
freeze: true, freeze: true,
callback: function (r) { callback: function (r) {
if (!r.exc && r.message) { if (!r.exc && r.message) {
@@ -670,7 +670,7 @@ frappe.ui.form.on("Stock Entry", {
make_retention_stock_entry: function (frm) { make_retention_stock_entry: function (frm) {
frappe.call({ frappe.call({
method: "erpnext.stock.doctype.stock_entry.stock_entry.move_sample_to_retention_warehouse", method: "erpnext.stock.doctype.stock_entry.stock_entry_handler.manufacturing.move_sample_to_retention_warehouse",
args: { args: {
company: frm.doc.company, company: frm.doc.company,
items: frm.doc.items, items: frm.doc.items,
@@ -1150,7 +1150,7 @@ var validate_sample_quantity = function (frm, cdt, cdn) {
var d = locals[cdt][cdn]; var d = locals[cdt][cdn];
if (d.sample_quantity && d.transfer_qty && frm.doc.purpose == "Material Receipt") { if (d.sample_quantity && d.transfer_qty && frm.doc.purpose == "Material Receipt") {
frappe.call({ frappe.call({
method: "erpnext.stock.doctype.stock_entry.stock_entry.validate_sample_quantity", method: "erpnext.stock.doctype.stock_entry.stock_entry_handler.manufacturing.validate_sample_quantity",
args: { args: {
batch_no: d.batch_no, batch_no: d.batch_no,
item_code: d.item_code, item_code: d.item_code,

View File

@@ -46,6 +46,7 @@
"target_address_display", "target_address_display",
"sb0", "sb0",
"scan_barcode", "scan_barcode",
"column_break_menu",
"last_scanned_warehouse", "last_scanned_warehouse",
"items_section", "items_section",
"items", "items",
@@ -369,6 +370,7 @@
{ {
"fieldname": "sb0", "fieldname": "sb0",
"fieldtype": "Section Break", "fieldtype": "Section Break",
"hide_border": 1,
"options": "Simple" "options": "Simple"
}, },
{ {
@@ -646,7 +648,7 @@
"depends_on": "eval:in_list([\"Material Issue\", \"Manufacture\", \"Repack\", \"Send to Subcontractor\", \"Material Transfer for Manufacture\", \"Material Consumption for Manufacture\", \"Disassemble\"], doc.purpose)", "depends_on": "eval:in_list([\"Material Issue\", \"Manufacture\", \"Repack\", \"Send to Subcontractor\", \"Material Transfer for Manufacture\", \"Material Consumption for Manufacture\", \"Disassemble\"], doc.purpose)",
"fieldname": "bom_info_section", "fieldname": "bom_info_section",
"fieldtype": "Section Break", "fieldtype": "Section Break",
"label": "BOM Info" "label": "Bill of Materials"
}, },
{ {
"collapsible": 1, "collapsible": 1,
@@ -695,8 +697,7 @@
}, },
{ {
"fieldname": "items_section", "fieldname": "items_section",
"fieldtype": "Section Break", "fieldtype": "Section Break"
"label": "Items"
}, },
{ {
"depends_on": "eval:doc.asset_repair", "depends_on": "eval:doc.asset_repair",
@@ -761,6 +762,10 @@
"fieldtype": "Link", "fieldtype": "Link",
"label": "Cost Center", "label": "Cost Center",
"options": "Cost Center" "options": "Cost Center"
},
{
"fieldname": "column_break_menu",
"fieldtype": "Column Break"
} }
], ],
"grid_page_length": 50, "grid_page_length": 50,
@@ -769,7 +774,7 @@
"index_web_pages_for_search": 1, "index_web_pages_for_search": 1,
"is_submittable": 1, "is_submittable": 1,
"links": [], "links": [],
"modified": "2026-03-04 19:03:23.426082", "modified": "2026-04-21 13:31:48.817309",
"modified_by": "Administrator", "modified_by": "Administrator",
"module": "Stock", "module": "Stock",
"name": "Stock Entry", "name": "Stock Entry",

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,535 @@
from collections import defaultdict
import frappe
from frappe import _
from frappe.query_builder.functions import Sum
from frappe.utils import flt
from erpnext.stock.doctype.serial_no.serial_no import get_serial_nos
from erpnext.stock.serial_batch_bundle import SerialBatchCreation
from erpnext.stock.utils import get_combine_datetime
from .manufacturing import ceil_qty_if_uom_has_whole_number, get_bom_items, get_secondary_items
class DisassembleStockEntry:
def __init__(self, se_doc):
self.doc = se_doc
def validate(self):
self.validate_warehouse()
def validate_warehouse(self):
for row in self.doc.items:
if not row.s_warehouse and not row.t_warehouse:
frappe.throw(_("Source or Target Warehouse is required for item {0}").format(row.item_code))
def validate_fg_completed_qty(self):
if not self.doc.source_stock_entry:
return
from erpnext.manufacturing.doctype.work_order.work_order import get_disassembly_available_qty
available_qty = get_disassembly_available_qty(self.doc.source_stock_entry, self.doc.name)
if flt(self.doc.fg_completed_qty) > available_qty:
frappe.throw(
_(
"Cannot disassemble {0} qty against Stock Entry {1}. Only {2} qty available to disassemble."
).format(
self.doc.fg_completed_qty,
self.doc.source_stock_entry,
available_qty,
),
title=_("Excess Disassembly"),
)
def add_items(self):
"""
Priority:
1. From a specific Manufacture Stock Entry (exact reversal)
2. From Work Order Manufacture Stock Entries (averaged reversal)
3. From BOM (standalone disassembly)
"""
# Auto-set source_stock_entry if WO has exactly one manufacture entry
if not self.doc.get("source_stock_entry") and self.doc.work_order:
manufacture_entries = frappe.get_all(
"Stock Entry",
filters={
"work_order": self.doc.work_order,
"purpose": "Manufacture",
"docstatus": 1,
},
pluck="name",
limit_page_length=2,
)
if len(manufacture_entries) == 1:
self.doc.source_stock_entry = manufacture_entries[0]
if self.doc.get("source_stock_entry"):
return self._add_items_for_disassembly_from_stock_entry()
if self.doc.work_order:
return self._add_items_for_disassembly_from_work_order()
return self._add_items_for_disassembly_from_bom()
def _add_items_for_disassembly_from_stock_entry(self):
source_fg_qty = frappe.db.get_value("Stock Entry", self.doc.source_stock_entry, "fg_completed_qty")
if not source_fg_qty:
frappe.throw(
_("Source Stock Entry {0} has no finished goods quantity").format(self.doc.source_stock_entry)
)
disassemble_qty = flt(self.doc.fg_completed_qty)
scale_factor = disassemble_qty / flt(source_fg_qty)
self._append_disassembly_row_from_source(
disassemble_qty=disassemble_qty,
scale_factor=scale_factor,
)
def _add_items_for_disassembly_from_work_order(self):
wo_produced_qty = frappe.db.get_value("Work Order", self.doc.work_order, "produced_qty")
wo_produced_qty = flt(wo_produced_qty)
if wo_produced_qty <= 0:
frappe.throw(_("Work Order {0} has no produced qty").format(self.doc.work_order))
disassemble_qty = flt(self.doc.fg_completed_qty)
if disassemble_qty <= 0:
frappe.throw(_("Disassemble Qty cannot be less than or equal to 0."))
scale_factor = disassemble_qty / wo_produced_qty
self._append_disassembly_row_from_source(
disassemble_qty=disassemble_qty,
scale_factor=scale_factor,
)
def _append_disassembly_row_from_source(self, disassemble_qty, scale_factor):
for source_row in self.get_items_from_manufacture_stock_entry():
if source_row.is_finished_item:
qty = disassemble_qty
s_warehouse = self.doc.from_warehouse or source_row.t_warehouse
t_warehouse = ""
elif source_row.s_warehouse:
# RM: was consumed FROM s_warehouse -> return TO s_warehouse
qty = flt(source_row.qty * scale_factor)
s_warehouse = ""
t_warehouse = self.doc.to_warehouse or source_row.s_warehouse
else:
# Scrap/secondary: was produced TO t_warehouse -> take FROM t_warehouse
qty = flt(source_row.qty * scale_factor)
s_warehouse = source_row.t_warehouse
t_warehouse = ""
item = {
"item_code": source_row.item_code,
"item_name": source_row.item_name,
"description": source_row.description,
"stock_uom": source_row.stock_uom,
"uom": source_row.uom,
"conversion_factor": source_row.conversion_factor,
"basic_rate": source_row.basic_rate,
"qty": qty,
"s_warehouse": s_warehouse,
"t_warehouse": t_warehouse,
"is_finished_item": source_row.is_finished_item,
"type": source_row.type,
"is_legacy_scrap_item": source_row.is_legacy_scrap_item,
"bom_secondary_item": source_row.bom_secondary_item,
"bom_no": source_row.bom_no,
# batch and serial bundles built on submit
"use_serial_batch_fields": 1 if (source_row.batch_no or source_row.serial_no) else 0,
}
if self.doc.source_stock_entry:
item.update(
{
"against_stock_entry": self.doc.source_stock_entry,
"ste_detail": source_row.name,
}
)
self.doc.append("items", item)
def _add_items_for_disassembly_from_bom(self):
if not self.doc.bom_no or not self.doc.fg_completed_qty:
frappe.throw(_("BOM and Finished Good Quantity is mandatory for Disassembly"))
self.add_raw_materials()
self.add_secondary_items()
self.add_finished_goods()
def add_raw_materials(self):
# Raw materials will be available after disassembly in target warehouse
items = get_bom_items(self.doc.bom_no, self.doc.use_multi_level_bom)
for row in items:
row["t_warehouse"] = self.doc.to_warehouse
row["from_warehouse"] = ""
row["is_finished_item"] = 0
row["qty"] = flt(row["qty"]) * flt(self.doc.fg_completed_qty)
row["uom"] = row.get("uom") or row.get("stock_uom")
self.doc.append("items", row)
def add_secondary_items(self):
# Secondary items will be removed from source warehouse
secondary_items = get_secondary_items(self.doc.bom_no, self.doc.work_order)
for row in secondary_items:
item_args = {}
fields = [
"item_code",
"item_name",
"uom",
"stock_uom",
"conversion_factor",
"item_group",
"description",
"type",
]
for field in fields:
item_args[field] = row.get(field)
item_args["is_legacy_scrap_item"] = row.get("is_legacy")
item_args["s_warehouse"] = self.doc.from_warehouse
item_args["uom"] = item_args.get("uom") or item_args.get("stock_uom")
item_args["bom_secondary_item"] = row.get("name")
row.qty = row.qty * self.doc.fg_completed_qty
item_args["qty"] = ceil_qty_if_uom_has_whole_number(row.qty, item_args["uom"])
self.doc.append("items", item_args)
def get_production_item_details(self):
if self.doc.work_order:
production_item = frappe.get_cached_value("Work Order", self.doc.work_order, "production_item")
else:
production_item = frappe.get_cached_value("BOM", self.doc.bom_no, "item")
item_details = frappe.get_cached_value(
"Item",
production_item,
["item_name", "item_group", "description", "stock_uom", "name"],
as_dict=1,
)
return item_details
def add_finished_goods(self):
# Fininshed good will be removed from source warehouse
item_details = self.get_production_item_details()
item_details.update(
{
"conversion_factor": 1,
"uom": item_details.stock_uom,
"qty": self.doc.fg_completed_qty,
"t_warehouse": None,
"s_warehouse": self.doc.from_warehouse,
"is_finished_item": 1,
}
)
item_details["item_code"] = item_details["name"]
del item_details["name"]
self.doc.append("items", item_details)
def get_items_from_manufacture_stock_entry(self):
SE = frappe.qb.DocType("Stock Entry")
SED = frappe.qb.DocType("Stock Entry Detail")
query = frappe.qb.from_(SED).join(SE).on(SED.parent == SE.name).where(SE.docstatus == 1)
common_fields = [
SED.item_code,
SED.item_name,
SED.description,
SED.stock_uom,
SED.uom,
SED.basic_rate,
SED.conversion_factor,
SED.is_finished_item,
SED.type,
SED.is_legacy_scrap_item,
SED.bom_secondary_item,
SED.batch_no,
SED.serial_no,
SED.use_serial_batch_fields,
SED.s_warehouse,
SED.t_warehouse,
SED.bom_no,
]
if self.doc.source_stock_entry:
return (
query.select(SED.name, SED.qty, SED.transfer_qty, *common_fields)
.where(SE.name == self.doc.source_stock_entry)
.orderby(SED.idx)
.run(as_dict=True)
)
return (
query.select(Sum(SED.qty).as_("qty"), Sum(SED.transfer_qty).as_("transfer_qty"), *common_fields)
.where(SE.purpose == "Manufacture")
.where(SE.work_order == self.doc.work_order)
.groupby(SED.item_code)
.orderby(SED.idx)
.run(as_dict=True)
)
def on_submit(self):
self.set_serial_batch_for_disassembly()
def set_serial_batch_for_disassembly(self):
if self.doc.get("source_stock_entry"):
self._set_serial_batch_for_disassembly_from_stock_entry()
else:
self._set_serial_batch_for_disassembly_from_available_materials()
def _set_serial_batch_for_disassembly_from_stock_entry(self):
from erpnext.stock.doctype.serial_and_batch_bundle.serial_and_batch_bundle import (
get_voucher_wise_serial_batch_from_bundle,
)
source_fg_qty = flt(
frappe.db.get_value("Stock Entry", self.doc.source_stock_entry, "fg_completed_qty")
)
scale_factor = flt(self.doc.fg_completed_qty) / source_fg_qty if source_fg_qty else 0
bundle_data = get_voucher_wise_serial_batch_from_bundle(voucher_no=[self.doc.source_stock_entry])
source_rows_by_name = {r.name: r for r in self.get_items_from_manufacture_stock_entry()}
for row in self.doc.items:
if not row.ste_detail:
continue
source_row = source_rows_by_name.get(row.ste_detail)
if not source_row:
continue
source_warehouse = source_row.s_warehouse or source_row.t_warehouse
key = (source_row.item_code, source_warehouse, self.doc.source_stock_entry)
source_bundle = bundle_data.get(key, {})
batches = defaultdict(float)
serial_nos = []
if source_bundle.get("batch_nos"):
qty_remaining = row.transfer_qty
for batch_no, batch_qty in source_bundle["batch_nos"].items():
if qty_remaining <= 0:
break
alloc = min(abs(flt(batch_qty)) * scale_factor, qty_remaining)
batches[batch_no] = alloc
qty_remaining -= alloc
elif source_row.batch_no:
batches[source_row.batch_no] = row.transfer_qty
if source_bundle.get("serial_nos"):
serial_nos = get_serial_nos(source_bundle["serial_nos"])[: int(row.transfer_qty)]
elif source_row.serial_no:
serial_nos = get_serial_nos(source_row.serial_no)[: int(row.transfer_qty)]
self._set_serial_batch_bundle_for_disassembly_row(row, serial_nos, batches)
def _set_serial_batch_for_disassembly_from_available_materials(self):
available_materials = get_available_materials(self.doc.work_order, self.doc)
for row in self.doc.items:
warehouse = row.s_warehouse or row.t_warehouse
materials = available_materials.get((row.item_code, warehouse))
if not materials:
continue
batches = defaultdict(float)
serial_nos = []
qty = row.transfer_qty
for batch_no, batch_qty in materials.batch_details.items():
if qty <= 0:
break
batch_qty = abs(batch_qty)
if batch_qty <= qty:
batches[batch_no] = batch_qty
qty -= batch_qty
else:
batches[batch_no] = qty
qty = 0
if materials.serial_nos:
serial_nos = materials.serial_nos[: int(row.transfer_qty)]
self._set_serial_batch_bundle_for_disassembly_row(row, serial_nos, batches)
def _set_serial_batch_bundle_for_disassembly_row(self, row, serial_nos, batches):
if not serial_nos and not batches:
return
warehouse = row.s_warehouse or row.t_warehouse
bundle_doc = SerialBatchCreation(
{
"item_code": row.item_code,
"warehouse": warehouse,
"posting_datetime": get_combine_datetime(self.doc.posting_date, self.doc.posting_time),
"voucher_type": self.doc.doctype,
"voucher_no": self.doc.name,
"voucher_detail_no": row.name,
"qty": row.transfer_qty,
"type_of_transaction": "Inward" if row.t_warehouse else "Outward",
"company": self.doc.company,
"do_not_submit": True,
}
).make_serial_and_batch_bundle(serial_nos=serial_nos, batch_nos=batches)
row.serial_and_batch_bundle = bundle_doc.name
row.use_serial_batch_fields = 0
def get_available_materials(work_order, stock_entry_doc=None) -> dict:
data = get_stock_entry_data(work_order, stock_entry_doc=stock_entry_doc)
available_materials = {}
for row in data:
key = (row.item_code, row.warehouse)
if row.purpose != "Material Transfer for Manufacture":
key = (row.item_code, row.s_warehouse)
if stock_entry_doc and stock_entry_doc.purpose == "Disassemble":
key = (row.item_code, row.s_warehouse or row.warehouse)
if key not in available_materials:
available_materials.setdefault(
key,
frappe._dict(
{"item_details": row, "batch_details": defaultdict(float), "qty": 0, "serial_nos": []}
),
)
item_data = available_materials[key]
if row.purpose == "Material Transfer for Manufacture" or (
stock_entry_doc and stock_entry_doc.purpose == "Disassemble" and row.purpose == "Manufacture"
):
item_data.qty += row.qty
if row.batch_no:
item_data.batch_details[row.batch_no] += row.qty
elif row.batch_nos:
for batch_no, qty in row.batch_nos.items():
item_data.batch_details[batch_no] += qty
if row.serial_no:
item_data.serial_nos.extend(get_serial_nos(row.serial_no))
item_data.serial_nos.sort()
elif row.serial_nos:
item_data.serial_nos.extend(get_serial_nos(row.serial_nos))
item_data.serial_nos.sort()
else:
# Consume raw material qty in case of 'Manufacture' or 'Material Consumption for Manufacture'
item_data.qty -= row.qty
if row.batch_no:
item_data.batch_details[row.batch_no] -= row.qty
elif row.batch_nos:
for batch_no, qty in row.batch_nos.items():
item_data.batch_details[batch_no] += qty
if row.serial_no:
for serial_no in get_serial_nos(row.serial_no):
if serial_no in item_data.serial_nos:
item_data.serial_nos.remove(serial_no)
elif row.serial_nos:
for serial_no in get_serial_nos(row.serial_nos):
if serial_no in item_data.serial_nos:
item_data.serial_nos.remove(serial_no)
return available_materials
def get_stock_entry_data(work_order, stock_entry_doc=None):
from erpnext.stock.doctype.serial_and_batch_bundle.serial_and_batch_bundle import (
get_voucher_wise_serial_batch_from_bundle,
)
stock_entry = frappe.qb.DocType("Stock Entry")
stock_entry_detail = frappe.qb.DocType("Stock Entry Detail")
data = (
frappe.qb.from_(stock_entry)
.from_(stock_entry_detail)
.select(
stock_entry_detail.item_name,
stock_entry_detail.original_item,
stock_entry_detail.item_code,
stock_entry_detail.qty,
(stock_entry_detail.t_warehouse).as_("warehouse"),
(stock_entry_detail.s_warehouse).as_("s_warehouse"),
stock_entry_detail.description,
stock_entry_detail.stock_uom,
stock_entry_detail.expense_account,
stock_entry_detail.cost_center,
stock_entry_detail.serial_and_batch_bundle,
stock_entry_detail.batch_no,
stock_entry_detail.serial_no,
stock_entry.purpose,
stock_entry.name,
)
.where(
(stock_entry.name == stock_entry_detail.parent)
& (stock_entry.work_order == work_order)
& (stock_entry.docstatus == 1)
)
.orderby(stock_entry.creation, stock_entry_detail.item_code, stock_entry_detail.idx)
)
if stock_entry_doc and stock_entry_doc.purpose == "Disassemble":
data = data.where(
stock_entry.purpose.isin(
[
"Disassemble",
"Manufacture",
]
)
)
data = data.where(stock_entry.name != stock_entry_doc.name)
else:
data = data.where(
stock_entry.purpose.isin(
[
"Manufacture",
"Material Consumption for Manufacture",
"Material Transfer for Manufacture",
]
)
)
data = data.where(stock_entry_detail.s_warehouse.isnotnull())
data = data.run(as_dict=1)
if not data:
return []
voucher_nos = [row.get("name") for row in data if row.get("name")]
if voucher_nos:
bundle_data = get_voucher_wise_serial_batch_from_bundle(voucher_no=voucher_nos)
for row in data:
key = (row.item_code, row.warehouse, row.name)
if row.purpose != "Material Transfer for Manufacture":
key = (row.item_code, row.s_warehouse, row.name)
if stock_entry_doc and stock_entry_doc.purpose == "Disassemble":
key = (row.item_code, row.s_warehouse or row.warehouse, row.name)
if bundle_data.get(key):
row.update(bundle_data.get(key))
return data

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,90 @@
import frappe
from frappe import _
from frappe.query_builder.functions import Sum
from .manufacturing import get_bom_items
class MaterialReceiptStockEntry:
def __init__(self, se_doc):
self.doc = se_doc
def before_validate(self):
self.set_default_warehouse()
def validate(self):
self.validate_warehouse()
def set_default_warehouse(self):
for row in self.doc.items:
if not row.t_warehouse and self.doc.to_warehouse:
row.t_warehouse = self.doc.to_warehouse
row.s_warehouse = None
def validate_warehouse(self):
for row in self.doc.items:
if not row.t_warehouse:
frappe.throw(_("Target Warehouse is required for item {0}").format(row.item_code))
class BaseMaterialIssueStockEntry:
def __init__(self, se_doc):
self.doc = se_doc
def set_default_warehouse(self):
for row in self.doc.items:
if not row.s_warehouse and self.doc.from_warehouse:
row.s_warehouse = self.doc.from_warehouse
row.t_warehouse = None
def validate_warehouse(self):
for row in self.doc.items:
if not row.s_warehouse:
frappe.throw(_("Source Warehouse is required for item {0}").format(row.item_code))
class MaterialIssueStockEntry(BaseMaterialIssueStockEntry):
def before_validate(self):
self.set_default_warehouse()
def validate(self):
self.validate_warehouse()
def add_items(self):
self.add_raw_materials_based_on_bom()
def add_raw_materials_based_on_bom(self):
bom_items = get_bom_items(self.doc.bom_no, self.doc.use_multi_level_bom)
for row in bom_items:
row.s_warehouse = self.doc.from_warehouse
row.qty = row.qty * self.doc.fg_completed_qty
if not row.uom:
row.uom = row.stock_uom
self.doc.append("items", row)
def get_consumed_items(self):
"""Get all raw materials consumed through consumption entries"""
parent = frappe.qb.DocType("Stock Entry")
child = frappe.qb.DocType("Stock Entry Detail")
query = (
frappe.qb.from_(parent)
.join(child)
.on(parent.name == child.parent)
.select(
child.item_code,
Sum(child.qty).as_("qty"),
child.original_item,
)
.where(
(parent.docstatus == 1)
& (parent.purpose == "Material Consumption for Manufacture")
& (parent.work_order == self.work_order)
)
.groupby(child.item_code, child.original_item)
)
return query.run(as_dict=True)

View File

@@ -0,0 +1,486 @@
import frappe
from frappe import _, bold
from frappe.query_builder.functions import Sum
from frappe.utils import cstr, flt, get_link_to_form
from erpnext.manufacturing.doctype.bom.bom import get_backflush_based_on
from .manufacturing import get_bom_items
class BaseMaterialTransferStockEntry:
def __init__(self, se_doc):
self.doc = se_doc
def set_default_warehouse(self):
for row in self.doc.items:
if not row.t_warehouse and self.doc.to_warehouse:
row.t_warehouse = self.doc.to_warehouse
if not row.s_warehouse and self.doc.from_warehouse:
row.s_warehouse = self.doc.from_warehouse
def validate_warehouse(self):
for row in self.doc.items:
if not row.t_warehouse:
frappe.throw(_("Target Warehouse is required for item {0}").format(row.item_code))
if not row.s_warehouse:
frappe.throw(_("Source Warehouse is required for item {0}").format(row.item_code))
def validate_same_source_target_warehouse(self):
"""
Raises: frappe.ValidationError: If warehouses are same and no inventory dimensions differ
"""
if not frappe.get_single_value("Stock Settings", "validate_material_transfer_warehouses"):
return
from erpnext.stock.doctype.inventory_dimension.inventory_dimension import get_inventory_dimensions
inventory_dimensions = get_inventory_dimensions()
for item in self.doc.items:
if cstr(item.s_warehouse) == cstr(item.t_warehouse):
if not inventory_dimensions:
frappe.throw(
_(
"Row #{0}: Source and Target Warehouse cannot be the same for Material Transfer"
).format(item.idx),
title=_("Invalid Source and Target Warehouse"),
)
else:
difference_found = False
for dimension in inventory_dimensions:
fieldname = (
dimension.source_fieldname
if dimension.source_fieldname.startswith("to_")
else f"to_{dimension.source_fieldname}"
)
if (
item.get(dimension.source_fieldname)
and item.get(fieldname)
and item.get(dimension.source_fieldname) != item.get(fieldname)
):
difference_found = True
break
if not difference_found:
frappe.throw(
_(
"Row #{0}: Source, Target Warehouse and Inventory Dimensions cannot be the exact same for Material Transfer"
).format(item.idx),
title=_("Invalid Source and Target Warehouse"),
)
@property
def wo_doc(self):
if not getattr(self, "_wo_doc", None):
if self.doc.work_order:
self._wo_doc = frappe.get_doc("Work Order", self.doc.work_order)
return getattr(self, "_wo_doc", None)
@property
def backflush_based_on(self):
return get_backflush_based_on(self.doc.bom_no)
class MaterialTransferStockEntry(BaseMaterialTransferStockEntry):
def before_validate(self):
self.set_default_warehouse()
def validate(self):
self.validate_warehouse()
self.validate_same_source_target_warehouse()
class MaterialTransferForManufactureStockEntry(BaseMaterialTransferStockEntry):
def before_validate(self):
self.set_default_warehouse()
def validate(self):
self.validate_warehouse()
self.validate_component_and_quantities()
self.validate_same_source_target_warehouse()
def validate_component_and_quantities(self):
if not frappe.db.get_single_value("Manufacturing Settings", "validate_components_quantities_per_bom"):
return
if not self.doc.fg_completed_qty:
return
precision = frappe.get_precision("Stock Entry Detail", "qty")
bom_items = get_bom_items(self.doc.bom_no, self.doc.use_multi_level_bom)
for row in bom_items:
row.qty = row.qty * self.doc.fg_completed_qty
if matched_item := self.get_matched_items(row.item_code):
if flt(row.qty, precision) != flt(matched_item.qty, precision):
frappe.throw(
_(
"For the item {0}, the consumed quantity should be {1} according to the BOM {2}."
).format(
bold(row.item_code),
flt(row.qty),
get_link_to_form("BOM", self.doc.bom_no),
),
title=_("Incorrect Component Quantity"),
)
else:
frappe.throw(
_("According to the BOM {0}, the Item '{1}' is missing in the stock entry.").format(
get_link_to_form("BOM", self.doc.bom_no), bold(row.item_code)
),
title=_("Missing Item"),
)
def add_items(self):
item_dict = self.get_pending_raw_materials()
if self.doc.to_warehouse and self.wo_doc:
for item in item_dict.values():
item["s_warehouse"] = item.get("from_warehouse")
item["t_warehouse"] = self.wo_doc.wip_warehouse
for item_code in item_dict:
self.doc.append("items", item_dict[item_code])
def get_pending_raw_materials(self):
"""
issue (item quantity) that is pending to issue or desire to transfer,
whichever is less
"""
item_dict = self.get_work_order_required_items()
max_qty = flt(self.wo_doc.qty)
allow_overproduction = False
overproduction_percentage = flt(
frappe.db.get_single_value("Manufacturing Settings", "overproduction_percentage_for_work_order")
)
transfer_extra_materials_percentage = flt(
frappe.db.get_single_value("Manufacturing Settings", "transfer_extra_materials_percentage")
)
to_transfer_qty = flt(self.wo_doc.material_transferred_for_manufacturing) + flt(
self.doc.fg_completed_qty
)
transfer_limit_qty = max_qty + ((max_qty * overproduction_percentage) / 100)
if transfer_extra_materials_percentage:
transfer_limit_qty = max_qty + ((max_qty * transfer_extra_materials_percentage) / 100)
if transfer_limit_qty >= to_transfer_qty:
allow_overproduction = True
for item, item_details in item_dict.items():
pending_to_issue = flt(item_details.required_qty) - flt(item_details.transferred_qty)
desire_to_transfer = flt(self.doc.fg_completed_qty) * flt(item_details.required_qty) / max_qty
if (
desire_to_transfer <= pending_to_issue
or (
desire_to_transfer > 0
and self.backflush_based_on == "Material Transferred for Manufacture"
)
or allow_overproduction
):
# "No need for transfer but qty still pending to transfer" case can occur
# when transferring multiple RM in different Stock Entries
item_dict[item]["qty"] = desire_to_transfer if (desire_to_transfer > 0) else pending_to_issue
elif pending_to_issue > 0:
item_dict[item]["qty"] = pending_to_issue
else:
item_dict[item]["qty"] = 0
# delete items with 0 qty
list_of_items = list(item_dict.keys())
for item in list_of_items:
if not item_dict[item]["qty"]:
del item_dict[item]
# show some message
if not len(item_dict):
frappe.msgprint(_("""All items have already been transferred for this Work Order."""))
return item_dict
def get_work_order_required_items(self):
"""
Gets Work Order Required Items only if Stock Entry purpose is **Material Transferred for Manufacture**.
"""
item_dict, job_card_items = frappe._dict(), []
work_order = self.wo_doc
consider_job_card = work_order.transfer_material_against == "Job Card" and self.doc.get("job_card")
if consider_job_card:
job_card_items = self.get_job_card_item_codes()
if not frappe.db.get_value("Warehouse", work_order.wip_warehouse, "is_group"):
wip_warehouse = work_order.wip_warehouse
else:
wip_warehouse = None
transfer_extra_materials_percentage = flt(
frappe.db.get_single_value("Manufacturing Settings", "transfer_extra_materials_percentage")
)
for d in work_order.get("required_items"):
if consider_job_card and (d.item_code not in job_card_items):
continue
additional_qty = 0.0
if transfer_extra_materials_percentage:
additional_qty = transfer_extra_materials_percentage * flt(d.required_qty) / 100
transfer_pending = flt(d.required_qty) > flt(d.transferred_qty)
if additional_qty:
transfer_pending = (flt(d.required_qty) + additional_qty) > flt(d.transferred_qty)
can_transfer = transfer_pending or (
self.backflush_based_on == "Material Transferred for Manufacture"
)
if not can_transfer:
continue
if d.include_item_in_manufacturing:
item_row = d.as_dict()
item_row["idx"] = len(item_dict) + 1
if consider_job_card:
job_card_item = frappe.db.get_value(
"Job Card Item", {"item_code": d.item_code, "parent": self.doc.get("job_card")}
)
item_row["job_card_item"] = job_card_item or None
if d.source_warehouse and not frappe.db.get_value(
"Warehouse", d.source_warehouse, "is_group"
):
item_row["from_warehouse"] = d.source_warehouse
item_row["to_warehouse"] = wip_warehouse
if item_row["allow_alternative_item"]:
item_row["allow_alternative_item"] = work_order.allow_alternative_item
item_dict.setdefault(d.item_code, item_row)
return item_dict
def get_job_card_item_codes(self):
if not self.doc.get("job_card"):
return []
return frappe.get_all(
"Job Card Item", filters={"parent": self.doc.get("job_card")}, pluck="item_code", distinct=True
)
def on_submit(self):
self.update_job_card_and_work_order()
def on_cancel(self):
self.update_job_card_and_work_order()
def update_job_card_and_work_order(self):
def _validate_work_order(pro_doc):
msg, title = "", ""
if flt(pro_doc.docstatus) != 1:
msg = _("Work Order {0} must be submitted").format(self.doc.work_order)
if pro_doc.status == "Stopped":
msg = _("Transaction not allowed against stopped Work Order {0}").format(self.doc.work_order)
if msg:
frappe.throw(_(msg), title=title)
if self.doc.job_card:
job_doc = frappe.get_doc("Job Card", self.doc.job_card)
job_doc.set_transferred_qty(update_status=True)
job_doc.set_transferred_qty_in_job_card_item(self.doc)
if self.doc.work_order:
_validate_work_order(self.wo_doc)
if self.doc.fg_completed_qty:
if self.doc.docstatus == 1:
self.wo_doc.add_additional_items(self.doc)
else:
self.wo_doc.remove_additional_items(self.doc)
self.wo_doc.run_method("update_work_order_qty")
self.wo_doc.run_method("update_status")
if not self.wo_doc.operations:
self.wo_doc.set_actual_dates()
class MaterialRequestStockEntry(BaseMaterialTransferStockEntry):
def before_validate(self):
self.set_default_warehouse()
def validate(self):
self.validate_warehouse()
self.validate_material_request()
def get_material_request(self, item_row):
material_request = item_row.material_request or None
material_request_item = item_row.material_request_item or None
if self.doc.outgoing_stock_entry:
parent_se = frappe.get_value(
"Stock Entry Detail",
item_row.ste_detail,
["material_request", "material_request_item"],
as_dict=True,
)
if parent_se:
material_request = parent_se.material_request
material_request_item = parent_se.material_request_item
return material_request, material_request_item
def validate_material_request(self):
for row in self.doc.items:
material_request, material_request_item = self.get_material_request(row)
if not material_request:
return
mreq_item = frappe.db.get_value(
"Material Request Item",
{"name": material_request_item, "parent": material_request},
["item_code", "warehouse", "idx"],
as_dict=True,
)
if mreq_item.item_code != row.item_code:
frappe.throw(
_("Item for row {0} does not match Material Request").format(self.idx),
frappe.MappingMismatchError,
)
def on_submit(self):
self.update_transferred_qty()
if self.doc.add_to_transit:
self.set_material_request_transfer_status("In Transit")
if self.doc.outgoing_stock_entry:
self.set_material_request_transfer_status("Completed")
def on_cancel(self):
self.update_transferred_qty()
if self.doc.add_to_transit:
self.set_material_request_transfer_status("Not Started")
if self.doc.outgoing_stock_entry:
self.set_material_request_transfer_status("In Transit")
def update_transferred_qty(self):
if not self.doc.outgoing_stock_entry:
return
stock_entries, child_list = self._collect_transferred_qtys()
if not stock_entries:
return
self._bulk_update_transferred_qty(stock_entries, child_list)
self._update_per_transferred_field()
def _get_item_transferred_qty(self, item):
sed = frappe.qb.DocType("Stock Entry Detail")
result = (
frappe.qb.from_(sed)
.select(Sum(sed.transfer_qty).as_("qty"))
.where(
(sed.against_stock_entry == item.against_stock_entry)
& (sed.ste_detail == item.ste_detail)
& (sed.docstatus == 1)
)
).run(as_dict=True)
return result[0].qty if result and result[0].qty else 0.0
def _validate_item_transferred_qty(self, item, transferred_qty):
if item.docstatus != 1:
return
transfer_qty = frappe.get_value("Stock Entry Detail", item.ste_detail, "transfer_qty")
if transferred_qty > transfer_qty:
frappe.throw(
_("Row {0}: Transferred quantity cannot be greater than the requested quantity.").format(
item.idx
)
)
def _collect_transferred_qtys(self):
stock_entries, child_list = {}, []
for item in self.doc.items:
if not (item.against_stock_entry and item.ste_detail):
continue
transferred_qty = self._get_item_transferred_qty(item)
self._validate_item_transferred_qty(item, transferred_qty)
child_list.append(item.ste_detail)
stock_entries[(item.against_stock_entry, item.ste_detail)] = transferred_qty
return stock_entries, child_list
def _bulk_update_transferred_qty(self, stock_entries, child_list):
from pypika import Case
sed = frappe.qb.DocType("Stock Entry Detail")
case_expr = Case()
for (parent, name), qty in stock_entries.items():
case_expr = case_expr.when((sed.parent == parent) & (sed.name == name), qty)
(
frappe.qb.update(sed)
.set(sed.transferred_qty, case_expr.else_(sed.transferred_qty))
.where(sed.name.isin(child_list))
).run()
def _update_per_transferred_field(self):
self.doc._update_percent_field_in_targets(
{
"source_dt": "Stock Entry Detail",
"target_field": "transferred_qty",
"target_ref_field": "qty",
"target_dt": "Stock Entry Detail",
"join_field": "ste_detail",
"target_parent_dt": "Stock Entry",
"target_parent_field": "per_transferred",
"source_field": "qty",
"percent_join_field": "against_stock_entry",
},
update_modified=True,
)
def set_material_request_transfer_status(self, status):
material_requests = []
parent_se = None
if self.doc.outgoing_stock_entry:
parent_se = frappe.get_value("Stock Entry", self.doc.outgoing_stock_entry, "add_to_transit")
for item in self.doc.items:
material_request = item.get("material_request")
if material_request not in material_requests:
if self.doc.outgoing_stock_entry and parent_se:
material_request = frappe.get_value(
"Stock Entry Detail", item.ste_detail, "material_request"
)
if material_request and material_request not in material_requests:
material_requests.append(material_request)
if status == "Completed":
qty = get_transferred_qty(material_request)
if qty.get("transfer_qty") > qty.get("transferred_qty"):
status = "In Transit"
frappe.db.set_value("Material Request", material_request, "transfer_status", status)
def get_transferred_qty(material_request):
sed = frappe.qb.DocType("Stock Entry Detail")
query = (
frappe.qb.from_(sed)
.select(
Sum(sed.transfer_qty).as_("transfer_qty"),
Sum(sed.transferred_qty).as_("transferred_qty"),
)
.where((sed.material_request == material_request) & (sed.docstatus == 1))
).run(as_dict=True)
return query[0]

View File

@@ -0,0 +1,370 @@
from collections import defaultdict
import frappe
from frappe import _
from frappe.utils import cint, cstr, flt, nowdate
from erpnext.manufacturing.doctype.bom.bom import get_backflush_based_on
from erpnext.stock.serial_batch_bundle import SerialBatchCreation, get_serial_or_batch_items
from erpnext.stock.utils import get_combine_datetime
class StockEntrySABB:
def __init__(self, se_doc):
self.doc = se_doc
def make_serial_and_batch_bundle_for_outward(self):
serial_or_batch_items = get_serial_or_batch_items(self.doc.items)
if not serial_or_batch_items:
return
serial_nos, batch_nos = self.get_serial_batch_fields_for_subcontracting_inward()
already_picked_serial_nos = []
for row in self.doc.items:
if row.use_serial_batch_fields:
continue
if not row.s_warehouse:
continue
if row.item_code not in serial_or_batch_items:
continue
bundle_doc = None
if row.serial_and_batch_bundle and abs(row.transfer_qty) != abs(
frappe.get_cached_value("Serial and Batch Bundle", row.serial_and_batch_bundle, "total_qty")
):
bundle_doc = SerialBatchCreation(
{
"item_code": row.item_code,
"warehouse": row.s_warehouse,
"serial_and_batch_bundle": row.serial_and_batch_bundle,
"type_of_transaction": "Outward",
"ignore_serial_nos": already_picked_serial_nos,
"qty": row.transfer_qty * -1,
}
).update_serial_and_batch_entries(
serial_nos=serial_nos.get(row.name), batch_nos=batch_nos.get(row.name)
)
elif not row.serial_and_batch_bundle and frappe.get_single_value(
"Stock Settings", "auto_create_serial_and_batch_bundle_for_outward"
):
bundle_doc = SerialBatchCreation(
{
"item_code": row.item_code,
"warehouse": row.s_warehouse,
"posting_datetime": get_combine_datetime(
self.doc.posting_date, self.doc.posting_time
),
"voucher_type": self.doc.doctype,
"voucher_detail_no": row.name,
"qty": row.transfer_qty * -1,
"ignore_serial_nos": already_picked_serial_nos,
"type_of_transaction": "Outward",
"company": self.doc.company,
"do_not_submit": True,
}
).make_serial_and_batch_bundle(
serial_nos=serial_nos.get(row.name), batch_nos=batch_nos.get(row.name)
)
if not bundle_doc:
continue
for entry in bundle_doc.entries:
if not entry.serial_no:
continue
already_picked_serial_nos.append(entry.serial_no)
row.serial_and_batch_bundle = bundle_doc.name
def get_serial_nos_and_batches_from_sres(self, scio_detail, only_pending=True):
serial_nos, batch_nos = [], frappe._dict()
table = frappe.qb.DocType("Stock Reservation Entry")
child_table = frappe.qb.DocType("Serial and Batch Entry")
query = (
frappe.qb.from_(table)
.join(child_table)
.on(table.name == child_table.parent)
.select(child_table.serial_no, child_table.batch_no, child_table.qty)
.where((table.docstatus == 1) & (table.voucher_detail_no == scio_detail))
)
if only_pending:
query = query.where(child_table.qty != child_table.delivered_qty)
else:
query = query.where(child_table.delivered_qty > 0)
for d in query.run(as_dict=True):
if d.serial_no and d.serial_no not in serial_nos:
serial_nos.append(d.serial_no)
if d.batch_no and d.batch_no not in batch_nos:
batch_nos[d.batch_no] = d.qty
return serial_nos, batch_nos
def get_serial_batch_fields_for_subcontracting_inward(self):
serial_nos, batch_nos = frappe._dict(), frappe._dict()
for row in self.doc.items:
if self.doc.purpose in [
"Return Raw Material to Customer",
"Subcontracting Delivery",
"Subcontracting Return",
]:
if not row.serial_and_batch_bundle:
serial_nos_list, batch_nos_list = self.get_serial_nos_and_batches_from_sres(
row.scio_detail, only_pending=self.doc.purpose != "Subcontracting Return"
)
if len(batch_nos_list) > 1:
row.use_serial_batch_fields = 0
if row.use_serial_batch_fields:
if serial_nos_list and not row.serial_no:
row.serial_no = "\n".join(serial_nos_list)
if batch_nos_list and not row.batch_no:
row.batch_no = next(iter(batch_nos_list.keys()))
serial_nos[row.name], batch_nos[row.name] = serial_nos_list, batch_nos_list
return serial_nos, batch_nos
def get_available_reserved_materials(self):
from erpnext.stock.doctype.stock_reservation_entry.stock_reservation_entry import (
get_reserved_materials,
)
voucher_no = self.doc.work_order or self.doc.subcontracting_order
reserved_entries = get_reserved_materials(voucher_no)
if not reserved_entries:
return {}
itemwise_serial_batch_qty = frappe._dict()
for d in reserved_entries:
key = (d.item_code, d.warehouse)
if key not in itemwise_serial_batch_qty:
itemwise_serial_batch_qty[key] = frappe._dict(
{
"serial_no": [],
"batch_no": defaultdict(float),
"batchwise_sn": defaultdict(list),
}
)
details = itemwise_serial_batch_qty[key]
if d.batch_no:
details.batch_no[d.batch_no] += d.qty
if d.serial_no:
details.batchwise_sn[d.batch_no].extend(d.serial_no.split("\n"))
elif d.serial_no:
details.serial_no.append(d.serial_no)
return itemwise_serial_batch_qty
def set_serial_batch_based_on_reservation(self):
if self.doc.work_order and frappe.get_cached_value(
"Work Order", self.doc.work_order, "reserve_stock"
):
skip_transfer = frappe.get_cached_value("Work Order", self.doc.work_order, "skip_transfer")
backflush_based_on = get_backflush_based_on(self.doc.bom_no)
if (
self.doc.purpose not in ["Material Transfer for Manufacture"]
and backflush_based_on != "BOM"
and not skip_transfer
):
return
reservation_entries = self.get_available_reserved_materials()
if not reservation_entries:
return
new_items_to_add = []
for d in self.doc.items:
if d.serial_and_batch_bundle or d.serial_no or d.batch_no:
continue
key = (d.item_code, d.s_warehouse)
if details := reservation_entries.get(key):
original_qty = d.qty
if batches := details.get("batch_no"):
for batch_no, qty in batches.items():
if original_qty <= 0:
break
if qty <= 0:
continue
if d.batch_no and original_qty > 0:
new_row = frappe.copy_doc(d)
new_row.name = None
new_row.batch_no = batch_no
new_row.qty = qty
new_row.idx = d.idx + 1
if new_row.batch_no and details.get("batchwise_sn"):
new_row.serial_no = "\n".join(
details.get("batchwise_sn")[new_row.batch_no][: cint(new_row.qty)]
)
new_items_to_add.append(new_row)
original_qty -= qty
batches[batch_no] -= qty
if qty >= d.qty and not d.batch_no:
d.batch_no = batch_no
batches[batch_no] -= d.qty
if d.batch_no and details.get("batchwise_sn"):
d.serial_no = "\n".join(
details.get("batchwise_sn")[d.batch_no][: cint(d.qty)]
)
elif not d.batch_no:
d.batch_no = batch_no
d.qty = qty
original_qty -= qty
batches[batch_no] = 0
if d.batch_no and details.get("batchwise_sn"):
d.serial_no = "\n".join(
details.get("batchwise_sn")[d.batch_no][: cint(d.qty)]
)
if details.get("serial_no"):
d.serial_no = "\n".join(details.get("serial_no")[: cint(d.qty)])
d.use_serial_batch_fields = 1
for new_row in new_items_to_add:
self.doc.append("items", new_row)
sorted_items = sorted(self.doc.items, key=lambda x: x.item_code)
if self.doc.purpose == "Manufacture":
# ensure finished item at last
sorted_items = sorted(sorted_items, key=lambda x: cstr(x.t_warehouse))
idx = 0
for row in sorted_items:
idx += 1
row.idx = idx
self.doc.set("items", sorted_items)
def create_serial_and_batch_bundle(parent_doc, row, child, type_of_transaction=None):
item_details = frappe.get_cached_value(
"Item", child.item_code, ["has_serial_no", "has_batch_no"], as_dict=1
)
if not (item_details.has_serial_no or item_details.has_batch_no):
return
if not type_of_transaction:
type_of_transaction = "Inward"
doc = frappe.get_doc(
{
"doctype": "Serial and Batch Bundle",
"voucher_type": "Stock Entry",
"item_code": child.item_code,
"warehouse": child.warehouse,
"type_of_transaction": type_of_transaction,
"posting_date": parent_doc.posting_date,
"posting_time": parent_doc.posting_time,
}
)
precision = frappe.get_precision("Stock Entry Detail", "qty")
if row.serial_nos and row.batches_to_be_consume:
doc.has_serial_no = 1
doc.has_batch_no = 1
batchwise_serial_nos = get_batchwise_serial_nos(child.item_code, row)
for batch_no, qty in row.batches_to_be_consume.items():
while flt(qty, precision) > 0:
qty -= 1
doc.append(
"entries",
{
"batch_no": batch_no,
"serial_no": batchwise_serial_nos.get(batch_no).pop(0),
"warehouse": row.warehouse,
"qty": -1,
},
)
elif row.serial_nos:
doc.has_serial_no = 1
for serial_no in row.serial_nos:
doc.append("entries", {"serial_no": serial_no, "warehouse": row.warehouse, "qty": -1})
elif row.batches_to_be_consume:
precision = frappe.get_precision("Serial and Batch Entry", "qty")
doc.has_batch_no = 1
for batch_no, qty in row.batches_to_be_consume.items():
if flt(qty, precision) > 0:
qty = flt(qty, precision)
doc.append("entries", {"batch_no": batch_no, "warehouse": row.warehouse, "qty": qty * -1})
if not doc.entries:
return None
return doc.insert(ignore_permissions=True).name
def get_batchwise_serial_nos(item_code, row):
batchwise_serial_nos = {}
for batch_no in row.batches_to_be_consume:
serial_nos = frappe.get_all(
"Serial No",
filters={"item_code": item_code, "batch_no": batch_no, "name": ("in", row.serial_nos)},
)
if serial_nos:
batchwise_serial_nos[batch_no] = sorted([serial_no.name for serial_no in serial_nos])
return batchwise_serial_nos
@frappe.whitelist()
def get_expired_batch_items():
from erpnext.stock.doctype.serial_and_batch_bundle.serial_and_batch_bundle import get_auto_batch_nos
expired_batches = get_expired_batches()
if not expired_batches:
return []
expired_batches_stock = get_auto_batch_nos(
frappe._dict(
{
"batch_no": list(expired_batches.keys()),
"for_stock_levels": True,
}
)
)
for row in expired_batches_stock:
row.update(expired_batches.get(row.batch_no))
return expired_batches_stock
def get_expired_batches():
batch = frappe.qb.DocType("Batch")
data = (
frappe.qb.from_(batch)
.select(batch.item, batch.name.as_("batch_no"), batch.stock_uom)
.where((batch.expiry_date <= nowdate()) & (batch.expiry_date.isnotnull()))
).run(as_dict=True)
if not data:
return []
expired_batches = frappe._dict()
for row in data:
expired_batches[row.batch_no] = row
return expired_batches

View File

@@ -0,0 +1,243 @@
import frappe
from frappe import _, bold
from frappe.query_builder.functions import Sum
from frappe.utils import flt
from erpnext.stock.utils import get_bin
class SendToSubcontractorStockEntry:
def __init__(self, se_doc):
self.doc = se_doc
def validate(self):
self.validate_subcontract_order()
def validate_subcontract_order(self):
"""Throw exception if more raw material is transferred against Subcontract Order than in
the raw materials supplied table"""
backflush_raw_materials_based_on = frappe.db.get_single_value(
"Buying Settings", "backflush_raw_materials_of_subcontract_based_on"
)
if backflush_raw_materials_based_on == "BOM":
subcontract_order = frappe.get_doc(
self.doc.subcontract_data.order_doctype, self.doc.get(self.doc.subcontract_data.order_field)
)
for se_item in self.doc.items:
self.validate_subcontracting_order_for_bom(se_item, subcontract_order)
elif backflush_raw_materials_based_on == "Material Transferred for Subcontract":
for row in self.doc.items:
self.validate_subcontracting_order_for_transfer(row)
def validate_subcontracting_order_for_bom(self, child_row, subcontract_order):
def get_required_qty(item_code):
return sum(
flt(d.required_qty) for d in subcontract_order.supplied_items if d.rm_item_code == item_code
)
qty_allowance = flt(frappe.db.get_single_value("Buying Settings", "over_transfer_allowance"))
item_code = child_row.original_item or child_row.item_code
required_qty = get_required_qty(item_code)
if not required_qty and child_row.allow_alternative_item:
original_item_code = frappe.get_value(
"Item Alternative", {"alternative_item_code": item_code}, "item_code"
)
required_qty = get_required_qty(original_item_code)
if not required_qty:
frappe.throw(
_("Item {0} not found in 'Raw Materials Supplied' table in {1} {2}").format(
item_code,
self.doc.subcontract_data.order_doctype,
self.doc.get(self.doc.subcontract_data.order_field),
)
)
total_allowed = required_qty + (required_qty * (qty_allowance / 100))
total_supplied = self.get_total_supplied_qty(child_row)
total_returned = 0
if self.doc.subcontract_data.order_doctype == "Subcontracting Order":
total_returned = self.get_total_returned_qty(child_row)
if flt(
total_supplied + child_row.transfer_qty - total_returned, child_row.precision("transfer_qty")
) > flt(total_allowed, child_row.precision("transfer_qty")):
frappe.throw(
_("Row #{0}: Item {1} cannot be transferred more than {2} against {3} {4}").format(
child_row.idx,
item_code,
total_allowed,
self.doc.subcontract_data.order_doctype,
self.doc.get(self.doc.subcontract_data.order_field),
)
)
elif not self.doc.get(self.doc.subcontract_data.rm_detail_field):
order_rm_detail = self.get_order_rm_detail(child_row)
if order_rm_detail:
child_row.db_set(self.doc.subcontract_data.rm_detail_field, order_rm_detail)
else:
if not child_row.allow_alternative_item:
frappe.throw(
_("Row {0}# Item {1} not found in 'Raw Materials Supplied' table in {2} {3}").format(
child_row.idx,
item_code,
self.doc.subcontract_data.order_doctype,
self.doc.get(self.doc.subcontract_data.order_field),
)
)
def validate_subcontracting_order_for_transfer(self, child_row):
if not self.doc.subcontracted_item:
frappe.throw(
_("Row {0}: Subcontracted Item is mandatory for the raw material {1}").format(
child_row.idx, bold(child_row.item_code)
)
)
elif not self.doc.get(self.doc.subcontract_data.rm_detail_field):
order_rm_detail = self.get_order_rm_detail(child_row)
if order_rm_detail:
child_row.db_set(self.doc.subcontract_data.rm_detail_field, order_rm_detail)
def get_total_supplied_qty(self, child_row):
se = frappe.qb.DocType("Stock Entry")
se_detail = frappe.qb.DocType("Stock Entry Detail")
return (
frappe.qb.from_(se)
.inner_join(se_detail)
.on(se.name == se_detail.parent)
.select(Sum(se_detail.transfer_qty))
.where(
(se.purpose == "Send to Subcontractor")
& (se.docstatus == 1)
& (se_detail.item_code == child_row.item_code)
& (
(
(se.purchase_order == self.doc.purchase_order)
& (se_detail.po_detail == self.doc.po_detail)
)
if self.doc.subcontract_data.order_doctype == "Purchase Order"
else (
(se.subcontracting_order == self.doc.subcontracting_order)
& (se_detail.sco_rm_detail == child_row.sco_rm_detail)
)
)
)
).run()[0][0] or 0
def get_total_returned_qty(self, child_row):
se = frappe.qb.DocType("Stock Entry")
se_detail = frappe.qb.DocType("Stock Entry Detail")
return (
frappe.qb.from_(se)
.inner_join(se_detail)
.on(se.name == se_detail.parent)
.select(Sum(se_detail.transfer_qty))
.where(
(se.purpose == "Material Transfer")
& (se.docstatus == 1)
& (se.is_return == 1)
& (se_detail.item_code == child_row.item_code)
& (se_detail.sco_rm_detail == child_row.sco_rm_detail)
& (se.subcontracting_order == self.doc.subcontracting_order)
)
).run()[0][0] or 0
def get_order_rm_detail(self, child_row):
filters = {
"parent": self.doc.get(self.doc.subcontract_data.order_field),
"docstatus": 1,
"rm_item_code": child_row.item_code,
"main_item_code": child_row.subcontracted_item,
}
return frappe.db.get_value(self.doc.subcontract_data.order_supplied_items_field, filters, "name")
def on_submit(self):
self.update_subcontract_order_supplied_items()
def on_cancel(self):
self.update_subcontract_order_supplied_items()
def update_subcontract_order_supplied_items(self):
if not self.doc.get(self.doc.subcontract_data.order_field):
return
# Get Subcontract Order Supplied Items Details
order_supplied_items = frappe.db.get_all(
self.doc.subcontract_data.order_supplied_items_field,
filters={"parent": self.doc.get(self.doc.subcontract_data.order_field)},
fields=["name", "rm_item_code", "reserve_warehouse"],
)
# Get Items Supplied in Stock Entries against Subcontract Order
supplied_items = get_supplied_items(
self.doc.get(self.doc.subcontract_data.order_field),
self.doc.subcontract_data.rm_detail_field,
self.doc.subcontract_data.order_field,
)
for row in order_supplied_items:
key, item = row.name, {}
if not supplied_items.get(key):
# no stock transferred against Subcontract Order Supplied Items row
item = {"supplied_qty": 0, "returned_qty": 0, "total_supplied_qty": 0}
else:
item = supplied_items.get(key)
frappe.db.set_value(self.doc.subcontract_data.order_supplied_items_field, row.name, item)
# RM Item-Reserve Warehouse Dict
item_wh = {x.get("rm_item_code"): x.get("reserve_warehouse") for x in order_supplied_items}
for d in self.doc.get("items"):
# Update reserved sub contracted quantity in bin based on Supplied Item Details and
item_code = d.get("original_item") or d.get("item_code")
reserve_warehouse = item_wh.get(item_code)
if not (reserve_warehouse and item_code):
continue
stock_bin = get_bin(item_code, reserve_warehouse)
stock_bin.update_reserved_qty_for_sub_contracting()
def get_supplied_items(
subcontract_order, rm_detail_field="sco_rm_detail", subcontract_order_field="subcontracting_order"
):
fields = [
"`tabStock Entry Detail`.`transfer_qty`",
"`tabStock Entry`.`is_return`",
f"`tabStock Entry Detail`.`{rm_detail_field}`",
"`tabStock Entry Detail`.`item_code`",
]
filters = [
["Stock Entry", "docstatus", "=", 1],
["Stock Entry", subcontract_order_field, "=", subcontract_order],
]
supplied_item_details = {}
for row in frappe.get_all("Stock Entry", fields=fields, filters=filters):
if not row.get(rm_detail_field):
continue
key = row.get(rm_detail_field)
if key not in supplied_item_details:
supplied_item_details.setdefault(
key, frappe._dict({"supplied_qty": 0, "returned_qty": 0, "total_supplied_qty": 0})
)
supplied_item = supplied_item_details[key]
if row.is_return:
supplied_item.returned_qty += row.transfer_qty
else:
supplied_item.supplied_qty += row.transfer_qty
supplied_item.total_supplied_qty = flt(supplied_item.supplied_qty) - flt(supplied_item.returned_qty)
return supplied_item_details

View File

@@ -909,6 +909,7 @@ class TestStockEntry(ERPNextTestSuite):
rm_cost += d.amount rm_cost += d.amount
fg_cost = next(filter(lambda x: x.item_code == "_Test FG Item", s.get("items"))).amount fg_cost = next(filter(lambda x: x.item_code == "_Test FG Item", s.get("items"))).amount
secondary_item_cost = next(filter(lambda x: x.type or x.is_legacy_scrap_item, s.get("items"))).amount secondary_item_cost = next(filter(lambda x: x.type or x.is_legacy_scrap_item, s.get("items"))).amount
self.assertEqual(fg_cost, flt(rm_cost - secondary_item_cost, 2)) self.assertEqual(fg_cost, flt(rm_cost - secondary_item_cost, 2))
# When Stock Entry has only FG + Scrap # When Stock Entry has only FG + Scrap
@@ -1783,7 +1784,7 @@ class TestStockEntry(ERPNextTestSuite):
def test_use_serial_and_batch_fields(self): def test_use_serial_and_batch_fields(self):
item = make_item( item = make_item(
"Test Use Serial and Batch Item SN Item", "Test Use Serial and Batch Item SN Item - A",
{"has_serial_no": 1, "is_stock_item": 1}, {"has_serial_no": 1, "is_stock_item": 1},
) )
@@ -2266,7 +2267,7 @@ class TestStockEntry(ERPNextTestSuite):
make_stock_entry(item_code=rm_item2, target=warehouse, qty=5, purpose="Material Receipt") make_stock_entry(item_code=rm_item2, target=warehouse, qty=5, purpose="Material Receipt")
bom_no = make_bom(item=fg_item, raw_materials=[rm_item1, rm_item2]).name bom_no = make_bom(item=fg_item, raw_materials=[rm_item1, rm_item2]).name
se = make_stock_entry(item_code=fg_item, qty=1, purpose="Manufacture", do_not_save=True) se = make_stock_entry(item_code=fg_item, qty=1, purpose="Repack", do_not_save=True)
se.from_bom = 1 se.from_bom = 1
se.use_multi_level_bom = 1 se.use_multi_level_bom = 1
se.bom_no = bom_no se.bom_no = bom_no
@@ -2304,7 +2305,6 @@ class TestStockEntry(ERPNextTestSuite):
se.to_warehouse = warehouse se.to_warehouse = warehouse
se.get_items() se.get_items()
# Verify FG as source (being consumed) # Verify FG as source (being consumed)
fg_items = [d for d in se.items if d.is_finished_item] fg_items = [d for d in se.items if d.is_finished_item]
self.assertEqual(len(fg_items), 1) self.assertEqual(len(fg_items), 1)
@@ -2331,7 +2331,9 @@ class TestStockEntry(ERPNextTestSuite):
"Stock Settings", {"sample_retention_warehouse": "_Test Warehouse 1 - _TC"} "Stock Settings", {"sample_retention_warehouse": "_Test Warehouse 1 - _TC"}
) )
def test_sample_retention_stock_entry(self): def test_sample_retention_stock_entry(self):
from erpnext.stock.doctype.stock_entry.stock_entry import move_sample_to_retention_warehouse from erpnext.stock.doctype.stock_entry.stock_entry_handler.manufacturing import (
move_sample_to_retention_warehouse,
)
warehouse = "_Test Warehouse - _TC" warehouse = "_Test Warehouse - _TC"
retain_sample_item = make_item( retain_sample_item = make_item(

View File

@@ -1,8 +1,24 @@
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors # Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# License: GNU General Public License v3. See license.txt # License: GNU General Public License v3. See license.txt
import frappe
from frappe import _, bold
from frappe.model.document import Document from frappe.model.document import Document
from frappe.utils import (
cint,
cstr,
flt,
format_time,
formatdate,
get_link_to_form,
getdate,
nowdate,
)
from erpnext.stock.doctype.stock_reconciliation.stock_reconciliation import (
OpeningEntryAccountError,
)
from erpnext.stock.stock_ledger import NegativeStockError, get_previous_sle, is_negative_stock_allowed
class StockEntryDetail(Document): class StockEntryDetail(Document):
@@ -73,4 +89,171 @@ class StockEntryDetail(Document):
valuation_rate: DF.Currency valuation_rate: DF.Currency
# end: auto-generated types # end: auto-generated types
pass def validate_batch(self):
if not self.batch_no:
return
disabled = frappe.db.get_value("Batch", self.batch_no, "disabled")
if disabled:
frappe.throw(_("Batch {0} of Item {1} is disabled.").format(self.batch_no, self.item_code))
return
expiry_date = frappe.db.get_value("Batch", self.batch_no, "expiry_date")
if expiry_date and getdate(self.parent_doc.posting_date) > getdate(expiry_date):
frappe.throw(_("Batch {0} of Item {1} has expired.").format(self.batch_no, self.item_code))
def validate_and_update_item_details(self, item_details, company, purpose):
if flt(self.qty) and flt(self.qty) < 0:
frappe.throw(
_("Row {0}: The item {1}, quantity must be positive number").format(
self.idx, bold(self.item_code)
)
)
if item_details.get("is_stock_item") != 1:
frappe.throw(_("{0} is not a stock Item").format(self.item_code))
reset_fields = ("stock_uom", "item_name")
for field in reset_fields:
self.set(field, item_details.get(field))
update_fields = (
"uom",
"description",
"expense_account",
"cost_center",
"conversion_factor",
"barcode",
)
for field in update_fields:
if not self.get(field):
self.set(field, item_details.get(field))
if field == "conversion_factor" and self.uom == item_details.get("stock_uom"):
self.set(field, item_details.get(field))
if not self.transfer_qty and self.qty:
self.transfer_qty = flt(
flt(self.qty) * flt(self.conversion_factor), self.precision("transfer_qty")
)
if purpose == "Subcontracting Delivery":
self.expense_account = frappe.get_value("Company", company, "default_expense_account")
def validate_expense_account(self, is_opening, purpose):
if not self.expense_account:
frappe.throw(
_(
"Please enter <b>Difference Account</b> or set default "
"<b>Stock Adjustment Account</b> for company {0}"
).format(bold(self.parent_doc.company))
)
acc_details = frappe.get_cached_value(
"Account",
self.expense_account,
["account_type", "report_type"],
as_dict=True,
)
if is_opening == "Yes" and acc_details.report_type == "Profit and Loss":
frappe.throw(
_(
"Difference Account must be a Asset/Liability type account "
"(Temporary Opening), since this Stock Entry is an Opening Entry"
),
OpeningEntryAccountError,
)
if acc_details.account_type == "Stock":
frappe.throw(
_("At row #{0}: the Difference Account must not be a Stock type account...").format(
self.idx, get_link_to_form("Account", self.expense_account)
),
title=_("Difference Account in Items Table"),
)
if (
purpose not in ["Material Issue", "Subcontracting Delivery"]
and acc_details.account_type == "Cost of Goods Sold"
):
frappe.msgprint(
_("At row #{0}: you have selected the Difference Account {1}...").format(
self.idx, bold(get_link_to_form("Account", self.expense_account))
),
indicator="orange",
alert=1,
)
def set_transfer_qty(self):
if not flt(self.conversion_factor):
frappe.throw(_("Row {0}: UOM Conversion Factor is mandatory").format(self.idx))
self.transfer_qty = flt(flt(self.qty) * flt(self.conversion_factor), self.precision("transfer_qty"))
if not flt(self.transfer_qty):
frappe.throw(
_("Row {0}: Qty in Stock UOM can not be zero.").format(self.idx), title=_("Zero quantity")
)
def set_actual_qty(self, posting_date, posting_time):
allow_negative_stock = is_negative_stock_allowed(item_code=self.item_code)
previous_sle = get_previous_sle(
{
"item_code": self.item_code,
"warehouse": self.s_warehouse or self.t_warehouse,
"posting_date": posting_date,
"posting_time": posting_time,
}
)
# get actual stock at source warehouse
self.actual_qty = previous_sle.get("qty_after_transaction") or 0
# validate qty during submit
if (
self.docstatus == 1
and self.s_warehouse
and not allow_negative_stock
and flt(self.actual_qty, self.precision("actual_qty"))
< flt(self.transfer_qty, self.precision("actual_qty"))
):
frappe.throw(
_(
"Row {0}: Quantity not available for {4} in warehouse {1} at posting time of the entry ({2} {3})"
).format(
self.idx,
bold(self.s_warehouse),
formatdate(posting_date),
format_time(posting_time),
bold(self.item_code),
)
+ "<br><br>"
+ _("Available quantity is {0}, you need {1}").format(
bold(flt(self.actual_qty, self.precision("actual_qty"))),
bold(self.transfer_qty),
),
NegativeStockError,
title=_("Insufficient Stock"),
)
def delink_asset_repair_sabb(self, asset_repair):
if not self.serial_and_batch_bundle:
return
voucher_detail_no = frappe.db.get_value(
"Asset Repair Consumed Item",
{"parent": asset_repair, "serial_and_batch_bundle": self.serial_and_batch_bundle},
"name",
)
if not voucher_detail_no:
return
doc = frappe.get_doc("Serial and Batch Bundle", self.serial_and_batch_bundle)
doc.db_set(
{
"voucher_type": "Asset Repair",
"voucher_no": asset_repair,
"voucher_detail_no": voucher_detail_no,
}
)

View File

@@ -122,6 +122,7 @@ class ManufactureEntry:
if backflush_based_on != "BOM": if backflush_based_on != "BOM":
available_serial_batches = self.get_transferred_serial_batches() available_serial_batches = self.get_transferred_serial_batches()
items_list = []
for item_code, _dict in item_dict.items(): for item_code, _dict in item_dict.items():
_dict.from_warehouse = self.source_wh.get(item_code) or self.wip_warehouse _dict.from_warehouse = self.source_wh.get(item_code) or self.wip_warehouse
_dict.to_warehouse = "" _dict.to_warehouse = ""
@@ -138,7 +139,9 @@ class ManufactureEntry:
_dict.qty = calculated_qty _dict.qty = calculated_qty
self.update_available_serial_batches(_dict, available_serial_batches) self.update_available_serial_batches(_dict, available_serial_batches)
self.stock_entry.add_to_stock_entry_detail(item_dict) items_list.append(_dict)
self.stock_entry.append("items", items_list)
def parse_available_serial_batches(self, item_dict, available_serial_batches): def parse_available_serial_batches(self, item_dict, available_serial_batches):
key = (item_dict.item_code, item_dict.from_warehouse) key = (item_dict.item_code, item_dict.from_warehouse)
@@ -320,4 +323,4 @@ class ManufactureEntry:
"is_finished_item": 1, "is_finished_item": 1,
} }
self.stock_entry.add_to_stock_entry_detail({self.production_item: args}, bom_no=self.bom_no) self.stock_entry.append("items", args)

View File

@@ -1898,3 +1898,32 @@ def update_serial_batch_delivered_qty(row, name, is_cancelled=False):
) )
query.run() query.run()
def get_reserved_materials(voucher_no):
doctype = frappe.qb.DocType("Stock Reservation Entry")
serial_batch_doc = frappe.qb.DocType("Serial and Batch Entry")
query = (
frappe.qb.from_(doctype)
.inner_join(serial_batch_doc)
.on(doctype.name == serial_batch_doc.parent)
.select(
serial_batch_doc.serial_no,
serial_batch_doc.batch_no,
serial_batch_doc.qty,
doctype.item_code,
doctype.warehouse,
doctype.name,
doctype.transferred_qty,
doctype.consumed_qty,
)
.where(
(doctype.docstatus == 1)
& (doctype.voucher_no == voucher_no)
& (serial_batch_doc.delivered_qty < serial_batch_doc.qty)
)
.orderby(serial_batch_doc.idx)
)
return query.run(as_dict=True)

View File

@@ -378,7 +378,7 @@ class SubcontractingInwardOrder(SubcontractingController):
} }
} }
stock_entry.add_to_stock_entry_detail(items_dict) stock_entry.append("items", items_dict[rm_item.get("rm_item_code")])
if target_doc: if target_doc:
return stock_entry return stock_entry
@@ -419,7 +419,7 @@ class SubcontractingInwardOrder(SubcontractingController):
} }
} }
stock_entry.add_to_stock_entry_detail(items_dict) stock_entry.append("items", items_dict[rm_item.get("rm_item_code")])
if target_doc: if target_doc:
return stock_entry return stock_entry
@@ -472,7 +472,7 @@ class SubcontractingInwardOrder(SubcontractingController):
} }
} }
stock_entry.add_to_stock_entry_detail(items_dict) stock_entry.append("items", items_dict[fg_item.item_code])
if ( if (
frappe.get_single_value("Selling Settings", "deliver_secondary_items") frappe.get_single_value("Selling Settings", "deliver_secondary_items")
@@ -497,7 +497,7 @@ class SubcontractingInwardOrder(SubcontractingController):
} }
} }
stock_entry.add_to_stock_entry_detail(items_dict) stock_entry.append("items", items_dict[secondary_item.item_code])
if target_doc: if target_doc:
return stock_entry return stock_entry
@@ -542,7 +542,7 @@ class SubcontractingInwardOrder(SubcontractingController):
} }
} }
stock_entry.add_to_stock_entry_detail(items_dict) stock_entry.append("items", items_dict[fg_item.item_code])
if target_doc: if target_doc:
return stock_entry return stock_entry

View File

@@ -240,6 +240,7 @@ class IntegrationTestSubcontractingInwardOrder(ERPNextTestSuite):
delivery = frappe.new_doc("Stock Entry").update(scio.make_subcontracting_delivery()) delivery = frappe.new_doc("Stock Entry").update(scio.make_subcontracting_delivery())
delivery.items[0].use_serial_batch_fields = 1 delivery.items[0].use_serial_batch_fields = 1
delivery.save() delivery.save()
delivery.submit()
delivery_serial_list, _ = get_serial_batch_list_from_item(delivery.items[0]) delivery_serial_list, _ = get_serial_batch_list_from_item(delivery.items[0])
self.assertEqual(sorted(serial_list), sorted(delivery_serial_list)) self.assertEqual(sorted(serial_list), sorted(delivery_serial_list))