test: fixed test cases

This commit is contained in:
Rohit Waghchaure
2026-05-19 23:31:14 +05:30
parent e4b5e6bd1e
commit 38eeb6994c
13 changed files with 132 additions and 78 deletions

View File

@@ -1403,16 +1403,18 @@ def make_rm_stock_entry(
items_dict = {
rm_item_code: {
rm_detail_field: rm_item.get("name"),
"item_code": rm_item_code,
"item_name": rm_item.get("item_name")
or item_wh.get(rm_item_code, {}).get("item_name", ""),
"description": item_wh.get(rm_item_code, {}).get("description", ""),
"qty": qty,
"from_warehouse": rm_item.get("warehouse")
"s_warehouse": rm_item.get("warehouse")
or rm_item.get("reserve_warehouse"),
"to_warehouse": source_doc.supplier_warehouse,
"t_warehouse": source_doc.supplier_warehouse,
"stock_uom": rm_item.get("stock_uom"),
"serial_and_batch_bundle": rm_item.get("serial_and_batch_bundle"),
"main_item_code": fg_item_code,
"subcontracted_item": fg_item_code,
"allow_alternative_item": item_wh.get(rm_item_code, {}).get(
"allow_alternative_item"
),

View File

@@ -2005,7 +2005,7 @@ def get_secondary_items_from_sub_assemblies(bom_no, company, qty, secondary_item
def get_backflush_based_on(bom_no=None):
backflush_based_on = None
if bom_no:
backflush_based_on = frappe.get_cached_value("BOM", bom_no, "backflush_based_on")
backflush_based_on = frappe.db.get_value("BOM", bom_no, "backflush_based_on")
if not backflush_based_on:
backflush_based_on = frappe.db.get_single_value(

View File

@@ -684,7 +684,10 @@ class TestWorkOrder(ERPNextTestSuite):
def test_cost_center_for_manufacture(self):
wo_order = make_wo_order_test_record()
ste = make_stock_entry(wo_order.name, "Material Transfer for Manufacture", wo_order.qty)
ste = frappe.get_doc(
make_stock_entry(wo_order.name, "Material Transfer for Manufacture", wo_order.qty)
)
ste.save()
self.assertEqual(ste.get("items")[0].get("cost_center"), "_Test Cost Center - _TC")
def test_operation_time_with_batch_size(self):
@@ -1320,7 +1323,6 @@ class TestWorkOrder(ERPNextTestSuite):
stock_entry = frappe.get_doc(make_stock_entry(wo_order.name, "Manufacture", 10))
stock_entry.set_work_order_details()
ManufactureStockEntry(stock_entry).set_serial_nos_for_finished_good()
for row in stock_entry.items:
if row.item_code == fg_item:
self.assertTrue(row.serial_and_batch_bundle)
@@ -1361,7 +1363,6 @@ class TestWorkOrder(ERPNextTestSuite):
stock_entry = frappe.get_doc(make_stock_entry(wo_order.name, "Manufacture", 10))
stock_entry.set_work_order_details()
ManufactureStockEntry(stock_entry).set_serial_nos_for_finished_good()
for row in stock_entry.items:
if row.item_code == fg_item:
self.assertTrue(row.serial_and_batch_bundle)
@@ -4292,7 +4293,6 @@ class TestWorkOrder(ERPNextTestSuite):
)
material_transfer_entry.submit()
manufacture_entry = frappe.get_doc(make_stock_entry(wo_order.name, "Manufacture", 1))
manufacture_entry.save()

View File

@@ -2087,8 +2087,9 @@ class WorkOrder(Document):
additional_items = frappe._dict()
for row in stock_entry.items:
if row.item_code not in required_items:
additional_items.setdefault(row.item_code, []).append(row)
item_code = row.original_item if row.original_item else row.item_code
if item_code not in required_items:
additional_items.setdefault(item_code, []).append(row)
self.flags.ignore_validate_update_after_submit = True

View File

@@ -255,6 +255,8 @@ class StockEntry(StockController, SubcontractingInwardController):
if self.se_handler_class and hasattr(self.se_handler_class, "before_validate"):
self.se_handler_class(self).before_validate()
self.set_default_cost_center()
apply_rule = self.apply_putaway_rule and (self.purpose in ["Material Transfer", "Material Receipt"])
if self.get("items") and apply_rule:
@@ -268,6 +270,17 @@ class StockEntry(StockController, SubcontractingInwardController):
if not item.project:
item.project = self.project
def set_default_cost_center(self):
for row in self.items:
if not row.cost_center:
row.cost_center = get_default_cost_center(
row,
row,
get_item_group_defaults(row.item_code, self.company),
get_brand_defaults(row.item_code, self.company),
self.company,
)
def validate(self):
if self.se_handler_class:
self.se_handler_class(self).validate()
@@ -318,7 +331,6 @@ class StockEntry(StockController, SubcontractingInwardController):
self.se_handler_class(self).on_submit()
self.make_bundle_using_old_serial_batch_fields()
self.update_disassembled_order()
self.adjust_stock_reservation_entries_for_return()
self.update_stock_reservation_entries()
self.update_stock_ledger()
@@ -346,7 +358,6 @@ class StockEntry(StockController, SubcontractingInwardController):
if self.work_order and self.purpose == "Material Consumption for Manufacture":
self.validate_work_order_status()
self.update_disassembled_order()
self.cancel_stock_reservation_entries_for_inward()
self.update_stock_ledger()
@@ -1167,13 +1178,6 @@ class StockEntry(StockController, SubcontractingInwardController):
self._wo_doc = frappe.get_doc("Work Order", self.work_order)
return getattr(self, "_wo_doc", None)
def update_disassembled_order(self):
if not self.work_order:
return
if self.purpose == "Disassemble" and self.fg_completed_qty:
pro_doc = frappe.get_doc("Work Order", self.work_order)
pro_doc.run_method("update_disassembled_qty", self.fg_completed_qty, self._action == "cancel")
def make_stock_reserve_for_wip_and_fg(self):
if self.is_stock_reserve_for_work_order():
pro_doc = frappe.get_doc("Work Order", self.work_order)

View File

@@ -62,7 +62,6 @@ class DisassembleStockEntry:
"docstatus": 1,
},
pluck="name",
limit_page_length=2,
)
if len(manufacture_entries) == 1:
self.doc.source_stock_entry = manufacture_entries[0]
@@ -200,6 +199,8 @@ class DisassembleStockEntry:
item_args["bom_secondary_item"] = row.get("name")
row.qty = row.qty * self.doc.fg_completed_qty
if row.get("process_loss_per"):
row.qty -= flt(row.qty * row.get("process_loss_per") / 100)
item_args["qty"] = ceil_qty_if_uom_has_whole_number(row.qty, item_args["uom"])
self.doc.append("items", item_args)
@@ -284,6 +285,10 @@ class DisassembleStockEntry:
def on_submit(self):
self.set_serial_batch_for_disassembly()
self.update_disassembled_order()
def on_cancel(self):
self.update_disassembled_order()
def set_serial_batch_for_disassembly(self):
if self.doc.get("source_stock_entry"):
@@ -388,6 +393,16 @@ class DisassembleStockEntry:
row.serial_and_batch_bundle = bundle_doc.name
row.use_serial_batch_fields = 0
def update_disassembled_order(self):
if not self.doc.work_order:
return
if self.doc.fg_completed_qty:
pro_doc = frappe.get_doc("Work Order", self.doc.work_order)
pro_doc.run_method(
"update_disassembled_qty", self.doc.fg_completed_qty, self.doc._action == "cancel"
)
def get_available_materials(work_order, stock_entry_doc=None) -> dict:
data = get_stock_entry_data(work_order, stock_entry_doc=stock_entry_doc)

View File

@@ -97,10 +97,14 @@ class BaseManufactureStockEntry:
secondary_items = get_secondary_items(self.doc.bom_no, self.doc.work_order)
for row in secondary_items:
item_args = self.get_item_dict(row)
item_args["is_legacy_scrap_item"] = row.get("is_legacy") and row.type == "Scrap"
item_args["is_legacy_scrap_item"] = bool(row.get("is_legacy"))
item_args["type"] = row.type
item_args["bom_secondary_item"] = row.name
item_args["t_warehouse"] = self.doc.to_warehouse
if row.type == "Scrap" and self.wo_doc and self.wo_doc.get("scrap_warehouse"):
item_args["t_warehouse"] = self.wo_doc.scrap_warehouse
else:
item_args["t_warehouse"] = self.doc.to_warehouse
row.qty = row.qty * self.doc.fg_completed_qty
if row.get("process_loss_per"):
@@ -197,7 +201,7 @@ class BaseManufactureStockEntry:
row = frappe._dict({"serial_nos": serial_nos[0 : cint(item_details.qty)]})
_id = create_serial_and_batch_bundle(
self.se_doc,
self.doc,
row,
frappe._dict(
{
@@ -485,11 +489,13 @@ class ManufactureStockEntry(BaseManufactureStockEntry):
return alternative_items
def set_alternative_item_details(self, row, alternative_item_details):
if self.doc.work_order:
row.allow_alternative_item = self.wo_doc.allow_alternative_item
if self.doc.work_order and row.get("allow_alternative_item") is None:
row["allow_alternative_item"] = self.wo_doc.allow_alternative_item
if row.allow_alternative_item:
if row["allow_alternative_item"]:
original_item = row["item_code"]
row.update(alternative_item_details)
row["original_item"] = original_item
def add_raw_materials_based_on_transfer(self):
self.prepare_available_materials_based_on_transfer()
@@ -504,18 +510,46 @@ class ManufactureStockEntry(BaseManufactureStockEntry):
for row in self.available_materials:
row = self.available_materials[row]
item_args = self.get_item_dict(row)
qty = (flt(row.qty) * flt(self.doc.fg_completed_qty)) / pending_qty_to_mfg
if not self.doc.get("is_return"):
qty = (flt(row.qty) * flt(self.doc.fg_completed_qty)) / pending_qty_to_mfg
else:
qty = row.qty
item_args["qty"] = ceil_qty_if_uom_has_whole_number(qty, row.uom)
item_args["transfer_qty"] = item_args["qty"]
if row.serial_nos or (row.batches and len(row.batches) == 1):
item_args["serial_no"] = row.serial_nos[0 : cint(qty)]
item_args["batch_no"] = next(iter(row.batches.values()))
if not item_args["uom"]:
item_args["uom"] = row.stock_uom
if not self.doc.get("is_return"):
item_args["t_warehouse"] = None
item_args["s_warehouse"] = row.warehouse
else:
# In case of return, source and target warehouse will be swapped
item_args["s_warehouse"] = row.s_warehouse
item_args["t_warehouse"] = row.t_warehouse
if row.serial_nos or row.batches:
self.assign_serial_batches_to_materials(item_args, row, qty)
else:
self.doc.append("items", item_args)
elif row.batches:
self.split_items_based_on_batches(qty, item_args, row)
def assign_serial_batches_to_materials(self, item_args, row, qty):
if row.serial_nos:
if serial_nos := row.serial_nos[0 : cint(qty)]:
item_args["serial_no"] = "\n".join(serial_nos)
if not item_args["uom"]:
item_args["uom"] = row.stock_uom
item_args["use_serial_batch_fields"] = 1
self.doc.append("items", item_args)
elif row.batches and len(row.batches) == 1:
item_args["batch_no"] = next(iter(row.batches.keys()))
if not item_args["uom"]:
item_args["uom"] = row.stock_uom
item_args["use_serial_batch_fields"] = 1
self.doc.append("items", item_args)
elif row.batches:
self.split_items_based_on_batches(qty, item_args, row)
def split_items_based_on_batches(self, qty, item_args, row):
for batch_no, batch_qty in row.batches.items():
@@ -533,7 +567,9 @@ class ManufactureStockEntry(BaseManufactureStockEntry):
if not item_args["uom"]:
item_args["uom"] = row.stock_uom
item_args["batch_no"] = batch_no
item_args["transfer_qty"] = item_args["qty"]
item_args["use_serial_batch_fields"] = 1
self.doc.append("items", item_args)
@@ -577,8 +613,8 @@ class ManufactureStockEntry(BaseManufactureStockEntry):
key = (row.item_code, row.warehouse)
if key not in self.available_materials:
self.available_materials[key] = frappe._dict(row)
self.available_materials[key].qty += row.qty
else:
self.available_materials[key].qty += row.qty
if row.serial_and_batch_bundle:
self.available_materials[key].update(self.get_sabb_details(row.serial_and_batch_bundle))
@@ -835,9 +871,19 @@ def get_bom_items(bom_no, use_multi_level_bom=None, qty=None, fetch_secondary_it
doctype.conversion_factor,
)
elif table_name == "BOM Item":
query = query.select(doctype.allow_alternative_item, doctype.uom, doctype.conversion_factor)
query = query.select(
doctype.allow_alternative_item, doctype.uom, doctype.conversion_factor, doctype.bom_no
)
return query.run(as_dict=1)
items = query.run(as_dict=1)
item_dict = {}
for item in items:
if item.item_code in item_dict:
item_dict[item.item_code].qty += item.qty
else:
item_dict[item.item_code] = item
return list(item_dict.values())
def get_secondary_items(bom_no, work_order=None):
@@ -856,13 +902,12 @@ def get_secondary_items(bom_no, work_order=None):
def get_secondary_items_from_sub_assemblies(bom_no):
items = []
bom_items = get_bom_items(bom_no)
items.extend(bom_items)
for row in bom_items:
if not row.bom_no:
continue
items.extend(get_bom_items(row.bom_no, qty=row.qty, fetch_secondary_items=True))
get_secondary_items_from_sub_assemblies(row.bom_no)
items.extend(get_secondary_items_from_sub_assemblies(row.bom_no))
return items

View File

@@ -17,9 +17,9 @@ class MaterialReceiptStockEntry:
def set_default_warehouse(self):
for row in self.doc.items:
row.s_warehouse = None
if not row.t_warehouse and self.doc.to_warehouse:
row.t_warehouse = self.doc.to_warehouse
row.s_warehouse = None
def validate_warehouse(self):
for row in self.doc.items:
@@ -33,9 +33,9 @@ class BaseMaterialIssueStockEntry:
def set_default_warehouse(self):
for row in self.doc.items:
row.t_warehouse = None
if not row.s_warehouse and self.doc.from_warehouse:
row.s_warehouse = self.doc.from_warehouse
row.t_warehouse = None
def validate_warehouse(self):
for row in self.doc.items:

View File

@@ -131,6 +131,14 @@ class MaterialTransferForManufactureStockEntry(BaseMaterialTransferStockEntry):
title=_("Missing Item"),
)
def get_matched_items(self, item_code):
items = [item for item in self.doc.items if item.s_warehouse]
for row in items:
if row.item_code == item_code or row.original_item == item_code:
return row
return {}
def add_items(self):
item_dict = self.get_pending_raw_materials()
if self.doc.to_warehouse and self.wo_doc:
@@ -189,6 +197,10 @@ class MaterialTransferForManufactureStockEntry(BaseMaterialTransferStockEntry):
else:
item_dict[item]["qty"] = 0
item_dict[item]["transfer_qty"] = flt(item_dict[item]["qty"]) * flt(
item_dict[item].get("conversion_factor") or 1
)
# delete items with 0 qty
list_of_items = list(item_dict.keys())
for item in list_of_items:

View File

@@ -91,7 +91,7 @@ class SendToSubcontractorStockEntry:
)
def validate_subcontracting_order_for_transfer(self, child_row):
if not self.doc.subcontracted_item:
if not child_row.subcontracted_item:
frappe.throw(
_("Row {0}: Subcontracted Item is mandatory for the raw material {1}").format(
child_row.idx, bold(child_row.item_code)

View File

@@ -196,7 +196,6 @@ class StockEntryDetail(Document):
)
def set_actual_qty(self, posting_date, posting_time):
allow_negative_stock = is_negative_stock_allowed(item_code=self.item_code)
previous_sle = get_previous_sle(
{
"item_code": self.item_code,
@@ -209,33 +208,6 @@ class StockEntryDetail(Document):
# get actual stock at source warehouse
self.actual_qty = previous_sle.get("qty_after_transaction") or 0
# validate qty during submit
if (
self.docstatus == 1
and self.s_warehouse
and not allow_negative_stock
and flt(self.actual_qty, self.precision("actual_qty"))
< flt(self.transfer_qty, self.precision("actual_qty"))
):
frappe.throw(
_(
"Row {0}: Quantity not available for {4} in warehouse {1} at posting time of the entry ({2} {3})"
).format(
self.idx,
bold(self.s_warehouse),
formatdate(posting_date),
format_time(posting_time),
bold(self.item_code),
)
+ "<br><br>"
+ _("Available quantity is {0}, you need {1}").format(
bold(flt(self.actual_qty, self.precision("actual_qty"))),
bold(self.transfer_qty),
),
NegativeStockError,
title=_("Insufficient Stock"),
)
def delink_asset_repair_sabb(self, asset_repair):
if not self.serial_and_batch_bundle:
return

View File

@@ -122,10 +122,10 @@ class ManufactureEntry:
if backflush_based_on != "BOM":
available_serial_batches = self.get_transferred_serial_batches()
items_list = []
for item_code, _dict in item_dict.items():
_dict.from_warehouse = self.source_wh.get(item_code) or self.wip_warehouse
_dict.to_warehouse = ""
_dict.item_code = item_code
if backflush_based_on != "BOM" and not frappe.db.get_value(
"Job Card", self.job_card, "skip_material_transfer"
@@ -139,9 +139,7 @@ class ManufactureEntry:
_dict.qty = calculated_qty
self.update_available_serial_batches(_dict, available_serial_batches)
items_list.append(_dict)
self.stock_entry.append("items", items_list)
self.stock_entry.append("items", _dict)
def parse_available_serial_batches(self, item_dict, available_serial_batches):
key = (item_dict.item_code, item_dict.from_warehouse)

View File

@@ -372,8 +372,9 @@ class SubcontractingInwardOrder(SubcontractingController):
items_dict = {
rm_item.get("rm_item_code"): {
"scio_detail": rm_item.get("name"),
"item_code": rm_item.get("rm_item_code"),
"qty": calculate_qty_as_per_bom(rm_item),
"to_warehouse": rm_item.get("warehouse"),
"t_warehouse": rm_item.get("warehouse"),
"stock_uom": rm_item.get("stock_uom"),
}
}
@@ -413,8 +414,9 @@ class SubcontractingInwardOrder(SubcontractingController):
items_dict = {
rm_item.get("rm_item_code"): {
"scio_detail": rm_item.get("name"),
"item_code": rm_item.get("rm_item_code"),
"qty": rm_item.received_qty - rm_item.work_order_qty - rm_item.returned_qty,
"from_warehouse": rm_item.get("warehouse"),
"s_warehouse": rm_item.get("warehouse"),
"stock_uom": rm_item.get("stock_uom"),
}
}
@@ -465,7 +467,8 @@ class SubcontractingInwardOrder(SubcontractingController):
items_dict = {
fg_item.item_code: {
"qty": qty,
"from_warehouse": fg_item.delivery_warehouse,
"item_code": fg_item.item_code,
"s_warehouse": fg_item.delivery_warehouse,
"stock_uom": fg_item.stock_uom,
"scio_detail": fg_item.name,
"is_finished_item": 1,
@@ -490,7 +493,8 @@ class SubcontractingInwardOrder(SubcontractingController):
items_dict = {
secondary_item.item_code: {
"qty": secondary_item.produced_qty - secondary_item.delivered_qty,
"from_warehouse": secondary_item.warehouse,
"item_code": secondary_item.item_code,
"s_warehouse": secondary_item.warehouse,
"stock_uom": secondary_item.stock_uom,
"scio_detail": secondary_item.name,
"type": secondary_item.type,
@@ -536,6 +540,7 @@ class SubcontractingInwardOrder(SubcontractingController):
items_dict = {
fg_item.item_code: {
"qty": qty,
"item_code": fg_item.item_code,
"stock_uom": fg_item.stock_uom,
"scio_detail": fg_item.name,
"is_finished_item": 1,