refactor: split large functions into smaller functions

This commit is contained in:
Rohit Waghchaure
2026-05-21 10:57:38 +05:30
parent 961cbc3625
commit 068f7b9a8d
6 changed files with 1093 additions and 1087 deletions

View File

@@ -177,11 +177,11 @@ class StockEntry(StockController, SubcontractingInwardController):
def __setattr__(self, name, value): def __setattr__(self, name, value):
super().__setattr__(name, value) super().__setattr__(name, value)
if name == "purpose": if name == "purpose":
self.initialize_class_object() self._configure_purpose_class()
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.initialize_class_object() self._configure_purpose_class()
if self.subcontracting_inward_order: if self.subcontracting_inward_order:
self.subcontract_data = frappe._dict( self.subcontract_data = frappe._dict(
@@ -202,7 +202,7 @@ class StockEntry(StockController, SubcontractingInwardController):
} }
) )
def initialize_class_object(self): def _configure_purpose_class(self):
purpose_map = { purpose_map = {
"Manufacture": ManufactureStockEntry, "Manufacture": ManufactureStockEntry,
"Repack": RepackStockEntry, "Repack": RepackStockEntry,
@@ -215,10 +215,10 @@ class StockEntry(StockController, SubcontractingInwardController):
"Material Receipt": MaterialReceiptStockEntry, "Material Receipt": MaterialReceiptStockEntry,
} }
self.se_handler_class = purpose_map.get(self.purpose) self.purpose_cls = purpose_map.get(self.purpose)
if self.purpose == "Material Transfer" and self.transfer_for_material_request(): if self.purpose == "Material Transfer" and self.transfer_for_material_request():
self.se_handler_class = MaterialRequestStockEntry self.purpose_cls = MaterialRequestStockEntry
def transfer_for_material_request(self): def transfer_for_material_request(self):
if self.outgoing_stock_entry and frappe.get_all( if self.outgoing_stock_entry and frappe.get_all(
@@ -252,8 +252,8 @@ class StockEntry(StockController, SubcontractingInwardController):
def before_validate(self): def before_validate(self):
from erpnext.stock.doctype.putaway_rule.putaway_rule import apply_putaway_rule from erpnext.stock.doctype.putaway_rule.putaway_rule import apply_putaway_rule
if self.se_handler_class and hasattr(self.se_handler_class, "before_validate"): if self.purpose_cls and hasattr(self.purpose_cls, "before_validate"):
self.se_handler_class(self).before_validate() self.purpose_cls(self).before_validate()
self.set_default_cost_center() self.set_default_cost_center()
@@ -282,8 +282,8 @@ class StockEntry(StockController, SubcontractingInwardController):
) )
def validate(self): def validate(self):
if self.se_handler_class: if self.purpose_cls:
self.se_handler_class(self).validate() self.purpose_cls(self).validate()
self.validate_duplicate_serial_and_batch_bundle("items") self.validate_duplicate_serial_and_batch_bundle("items")
self.validate_posting_time() self.validate_posting_time()
@@ -327,8 +327,8 @@ class StockEntry(StockController, SubcontractingInwardController):
StockEntrySABB(self).make_serial_and_batch_bundle_for_outward() StockEntrySABB(self).make_serial_and_batch_bundle_for_outward()
def on_submit(self): def on_submit(self):
if self.se_handler_class and hasattr(self.se_handler_class, "on_submit"): if self.purpose_cls and hasattr(self.purpose_cls, "on_submit"):
self.se_handler_class(self).on_submit() self.purpose_cls(self).on_submit()
self.make_bundle_using_old_serial_batch_fields() self.make_bundle_using_old_serial_batch_fields()
self.adjust_stock_reservation_entries_for_return() self.adjust_stock_reservation_entries_for_return()
@@ -347,8 +347,8 @@ class StockEntry(StockController, SubcontractingInwardController):
super().on_submit_subcontracting_inward() super().on_submit_subcontracting_inward()
def on_cancel(self): def on_cancel(self):
if self.se_handler_class and hasattr(self.se_handler_class, "on_cancel"): if self.purpose_cls and hasattr(self.purpose_cls, "on_cancel"):
self.se_handler_class(self).on_cancel() self.purpose_cls(self).on_cancel()
self.delink_asset_repair_sabb() self.delink_asset_repair_sabb()
self.validate_closed_subcontracting_order() self.validate_closed_subcontracting_order()
@@ -474,36 +474,37 @@ class StockEntry(StockController, SubcontractingInwardController):
def validate_fg_completed_qty(self): def validate_fg_completed_qty(self):
if self.purpose != "Manufacture" or not self.from_bom: if self.purpose != "Manufacture" or not self.from_bom:
return return
fg_qty = self._aggregate_fg_qty()
if fg_qty:
self._check_process_loss_qty(fg_qty)
def _aggregate_fg_qty(self):
fg_qty = defaultdict(float) fg_qty = defaultdict(float)
for d in self.items: for d in self.items:
if d.is_finished_item: if d.is_finished_item:
fg_qty[d.item_code] += flt(d.qty) fg_qty[d.item_code] += flt(d.qty)
return fg_qty
if not fg_qty: def _check_process_loss_qty(self, fg_qty):
return
precision = frappe.get_precision("Stock Entry Detail", "qty") precision = frappe.get_precision("Stock Entry Detail", "qty")
fg_item = next(iter(fg_qty.keys())) fg_item = next(iter(fg_qty.keys()))
fg_item_qty = flt(fg_qty[fg_item], precision) fg_item_qty = flt(fg_qty[fg_item], precision)
fg_completed_qty = flt(self.fg_completed_qty, precision) fg_completed_qty = flt(self.fg_completed_qty, precision)
for d in self.items: for d in self.items:
if not fg_qty.get(d.item_code): if fg_qty.get(d.item_code):
continue self._validate_fg_qty_with_process_loss(d, fg_item_qty, fg_completed_qty, precision)
if (fg_completed_qty - fg_item_qty) > 0: def _validate_fg_qty_with_process_loss(self, d, fg_item_qty, fg_completed_qty, precision):
self.process_loss_qty = fg_completed_qty - fg_item_qty if (fg_completed_qty - fg_item_qty) > 0:
self.process_loss_qty = fg_completed_qty - fg_item_qty
if not self.process_loss_qty: if not self.process_loss_qty:
continue return
if fg_completed_qty != (flt(fg_item_qty, precision) + flt(self.process_loss_qty, precision)):
if fg_completed_qty != (flt(fg_item_qty, precision) + flt(self.process_loss_qty, precision)): frappe.throw(
frappe.throw( _(
_( "Since there is a process loss of {0} units for the finished good {1}, you should reduce the quantity by {0} units for the finished good {1} in the Items Table."
"Since there is a process loss of {0} units for the finished good {1}, you should reduce the quantity by {0} units for the finished good {1} in the Items Table." ).format(frappe.bold(self.process_loss_qty), frappe.bold(d.item_code))
).format(frappe.bold(self.process_loss_qty), frappe.bold(d.item_code)) )
)
def validate_difference_account(self): def validate_difference_account(self):
if not cint(erpnext.is_perpetual_inventory_enabled(self.company)): if not cint(erpnext.is_perpetual_inventory_enabled(self.company)):
@@ -546,71 +547,66 @@ class StockEntry(StockController, SubcontractingInwardController):
self.set_total_amount() self.set_total_amount()
def set_basic_rate(self, reset_outgoing_rate=True, raise_error_if_no_rate=True): def set_basic_rate(self, reset_outgoing_rate=True, raise_error_if_no_rate=True):
""" """Set rate for outgoing, secondary and finished items."""
Set rate for outgoing, secondary and finished items
"""
# Set rate for outgoing items
outgoing_items_cost = self.set_rate_for_outgoing_items(reset_outgoing_rate, raise_error_if_no_rate) outgoing_items_cost = self.set_rate_for_outgoing_items(reset_outgoing_rate, raise_error_if_no_rate)
raise_error_if_no_rate = raise_error_if_no_rate and not self.is_new()
items = [] zero_valuation_items = []
# Set basic rate for incoming items
for d in self.get("items"): for d in self.get("items"):
if d.s_warehouse or d.set_basic_rate_manually: if d.s_warehouse or d.set_basic_rate_manually:
continue continue
self._set_incoming_item_rate(d, outgoing_items_cost, raise_error_if_no_rate, zero_valuation_items)
if d.allow_zero_valuation_rate and d.basic_rate and self.purpose != "Receive from Customer": if zero_valuation_items:
d.basic_rate = 0.0 self._notify_zero_valuation_rate(zero_valuation_items)
items.append(d.item_code)
elif d.is_finished_item:
if self.purpose == "Manufacture":
d.basic_rate = self.get_basic_rate_for_manufactured_item(
d.transfer_qty, outgoing_items_cost
)
elif self.purpose == "Repack":
d.basic_rate = self.get_basic_rate_for_repacked_items(d.transfer_qty, outgoing_items_cost)
if self.bom_no: def _set_incoming_item_rate(self, d, outgoing_items_cost, raise_error_if_no_rate, zero_valuation_items):
d.basic_rate *= frappe.get_value("BOM", self.bom_no, "cost_allocation_per") / 100 if d.allow_zero_valuation_rate and d.basic_rate and self.purpose != "Receive from Customer":
elif d.type and d.bom_secondary_item: d.basic_rate = 0.0
cost_allocation_per = frappe.get_value( zero_valuation_items.append(d.item_code)
"BOM Secondary Item", d.bom_secondary_item, "cost_allocation_per" elif d.is_finished_item:
) if self.purpose == "Manufacture":
d.basic_rate = (outgoing_items_cost * (cost_allocation_per / 100)) / d.transfer_qty d.basic_rate = self.get_basic_rate_for_manufactured_item(d.transfer_qty, outgoing_items_cost)
elif self.purpose == "Repack":
d.basic_rate = self.get_basic_rate_for_repacked_items(d.transfer_qty, outgoing_items_cost)
if not d.basic_rate and not d.allow_zero_valuation_rate: if self.bom_no:
if self.is_new(): d.basic_rate *= frappe.get_value("BOM", self.bom_no, "cost_allocation_per") / 100
raise_error_if_no_rate = False elif d.type and d.bom_secondary_item:
cost_allocation_per = frappe.get_value(
"BOM Secondary Item", d.bom_secondary_item, "cost_allocation_per"
)
d.basic_rate = (outgoing_items_cost * (cost_allocation_per / 100)) / d.transfer_qty
d.basic_rate = get_valuation_rate( if not d.basic_rate and not d.allow_zero_valuation_rate:
d.item_code, d.basic_rate = get_valuation_rate(
d.t_warehouse, d.item_code,
self.doctype, d.t_warehouse,
self.name, self.doctype,
d.allow_zero_valuation_rate, self.name,
currency=erpnext.get_company_currency(self.company), d.allow_zero_valuation_rate,
company=self.company, currency=erpnext.get_company_currency(self.company),
raise_error_if_no_rate=raise_error_if_no_rate, company=self.company,
batch_no=d.batch_no, raise_error_if_no_rate=raise_error_if_no_rate,
serial_and_batch_bundle=d.serial_and_batch_bundle, batch_no=d.batch_no,
) serial_and_batch_bundle=d.serial_and_batch_bundle,
)
# do not round off basic rate to avoid precision loss # do not round off basic rate to avoid precision loss
d.basic_rate = flt(d.basic_rate) d.basic_rate = flt(d.basic_rate)
d.basic_amount = flt(flt(d.transfer_qty) * flt(d.basic_rate), d.precision("basic_amount")) d.basic_amount = flt(flt(d.transfer_qty) * flt(d.basic_rate), d.precision("basic_amount"))
if items: def _notify_zero_valuation_rate(self, items):
message = "" if len(items) > 1:
message = _(
"Items rate has been updated to zero as Allow Zero Valuation Rate is checked for the following items: {0}"
).format(", ".join(frappe.bold(item) for item in items))
else:
message = _(
"Item rate has been updated to zero as Allow Zero Valuation Rate is checked for item {0}"
).format(frappe.bold(items[0]))
if len(items) > 1: frappe.msgprint(message, alert=True)
message = _(
"Items rate has been updated to zero as Allow Zero Valuation Rate is checked for the following items: {0}"
).format(", ".join(frappe.bold(item) for item in items))
else:
message = _(
"Item rate has been updated to zero as Allow Zero Valuation Rate is checked for item {0}"
).format(frappe.bold(items[0]))
frappe.msgprint(message, alert=True)
def set_rate_for_outgoing_items(self, reset_outgoing_rate=True, raise_error_if_no_rate=True): def set_rate_for_outgoing_items(self, reset_outgoing_rate=True, raise_error_if_no_rate=True):
outgoing_items_cost = 0.0 outgoing_items_cost = 0.0
@@ -662,68 +658,78 @@ class StockEntry(StockController, SubcontractingInwardController):
scrap_items_cost = sum([flt(d.basic_amount) for d in self.get("items") if d.is_legacy_scrap_item]) scrap_items_cost = sum([flt(d.basic_amount) for d in self.get("items") if d.is_legacy_scrap_item])
if settings.material_consumption: if settings.material_consumption:
if settings.get_rm_cost_from_consumption_entry and self.work_order: outgoing_items_cost = self._get_rm_cost_for_manufacture(
# Validate only if Material Consumption Entry exists for the Work Order. settings, finished_item_qty, outgoing_items_cost
if frappe.db.exists( )
"Stock Entry",
{
"docstatus": 1,
"work_order": self.work_order,
"purpose": "Material Consumption for Manufacture",
},
):
for item in self.items:
if not item.is_finished_item and not item.type and not item.is_legacy_scrap_item:
label = frappe.get_meta(settings.doctype).get_label(
"get_rm_cost_from_consumption_entry"
)
frappe.throw(
_(
"Row {0}: As {1} is enabled, raw materials cannot be added to {2} entry. Use {3} entry to consume raw materials."
).format(
item.idx,
frappe.bold(label),
frappe.bold(_("Manufacture")),
frappe.bold(_("Material Consumption for Manufacture")),
)
)
if frappe.db.exists(
"Stock Entry",
{
"docstatus": 1,
"work_order": self.work_order,
"purpose": "Manufacture",
"name": ("!=", self.name),
},
):
frappe.throw(
_("Only one {0} entry can be created against the Work Order {1}").format(
frappe.bold(_("Manufacture")), frappe.bold(self.work_order)
)
)
SE = frappe.qb.DocType("Stock Entry")
SE_ITEM = frappe.qb.DocType("Stock Entry Detail")
outgoing_items_cost = (
frappe.qb.from_(SE)
.left_join(SE_ITEM)
.on(SE.name == SE_ITEM.parent)
.select(Sum(SE_ITEM.valuation_rate * SE_ITEM.transfer_qty))
.where(
(SE.docstatus == 1)
& (SE.work_order == self.work_order)
& (SE.purpose == "Material Consumption for Manufacture")
)
).run()[0][0] or 0
elif not outgoing_items_cost:
bom_items = self.get_bom_raw_materials(finished_item_qty)
outgoing_items_cost = sum([flt(row.qty) * flt(row.rate) for row in bom_items.values()])
return flt((outgoing_items_cost - scrap_items_cost) / finished_item_qty) return flt((outgoing_items_cost - scrap_items_cost) / finished_item_qty)
def _get_rm_cost_for_manufacture(self, settings, finished_item_qty, outgoing_items_cost):
if settings.get_rm_cost_from_consumption_entry and self.work_order:
if frappe.db.exists(
"Stock Entry",
{
"docstatus": 1,
"work_order": self.work_order,
"purpose": "Material Consumption for Manufacture",
},
):
self._validate_no_raw_materials_in_manufacture_entry(settings)
self._validate_single_manufacture_entry()
return self._fetch_consumption_entry_cost()
elif not outgoing_items_cost:
bom_items = self.get_bom_raw_materials(finished_item_qty)
outgoing_items_cost = sum([flt(row.qty) * flt(row.rate) for row in bom_items.values()])
return outgoing_items_cost
def _validate_no_raw_materials_in_manufacture_entry(self, settings):
for item in self.items:
if not item.is_finished_item and not item.type and not item.is_legacy_scrap_item:
label = frappe.get_meta(settings.doctype).get_label("get_rm_cost_from_consumption_entry")
frappe.throw(
_(
"Row {0}: As {1} is enabled, raw materials cannot be added to {2} entry. Use {3} entry to consume raw materials."
).format(
item.idx,
frappe.bold(label),
frappe.bold(_("Manufacture")),
frappe.bold(_("Material Consumption for Manufacture")),
)
)
def _validate_single_manufacture_entry(self):
if frappe.db.exists(
"Stock Entry",
{
"docstatus": 1,
"work_order": self.work_order,
"purpose": "Manufacture",
"name": ("!=", self.name),
},
):
frappe.throw(
_("Only one {0} entry can be created against the Work Order {1}").format(
frappe.bold(_("Manufacture")), frappe.bold(self.work_order)
)
)
def _fetch_consumption_entry_cost(self):
SE = frappe.qb.DocType("Stock Entry")
SE_ITEM = frappe.qb.DocType("Stock Entry Detail")
return (
frappe.qb.from_(SE)
.left_join(SE_ITEM)
.on(SE.name == SE_ITEM.parent)
.select(Sum(SE_ITEM.valuation_rate * SE_ITEM.transfer_qty))
.where(
(SE.docstatus == 1)
& (SE.work_order == self.work_order)
& (SE.purpose == "Material Consumption for Manufacture")
)
).run()[0][0] or 0
def distribute_additional_costs(self): def distribute_additional_costs(self):
# If no incoming items, set additional costs blank # If no incoming items, set additional costs blank
if not any(d.item_code for d in self.items if d.t_warehouse): if not any(d.item_code for d in self.items if d.t_warehouse):
@@ -1044,11 +1050,20 @@ class StockEntry(StockController, SubcontractingInwardController):
total_basic_amount = sum(flt(t.basic_amount) for t in self.get("items") if t.t_warehouse) total_basic_amount = sum(flt(t.basic_amount) for t in self.get("items") if t.t_warehouse)
divide_based_on = total_basic_amount divide_based_on = total_basic_amount
if self.get("additional_costs") and not total_basic_amount: if self.get("additional_costs") and not total_basic_amount:
# if total_basic_amount is 0, distribute additional charges based on qty divide_based_on = sum(item.qty for item in self.get("items"))
divide_based_on = sum(item.qty for item in list(self.get("items")))
item_account_wise_additional_cost = self._build_additional_cost_per_item_account(
total_basic_amount, divide_based_on
)
if item_account_wise_additional_cost:
self._append_additional_cost_gl_entries(gl_entries, item_account_wise_additional_cost)
self.set_gl_entries_for_landed_cost_voucher(gl_entries, inventory_account_map)
return process_gl_map(gl_entries, from_repost=frappe.flags.through_repost_item_valuation)
def _build_additional_cost_per_item_account(self, total_basic_amount, divide_based_on):
item_account_wise_additional_cost = {} item_account_wise_additional_cost = {}
for t in self.get("additional_costs"): for t in self.get("additional_costs"):
@@ -1064,56 +1079,44 @@ class StockEntry(StockController, SubcontractingInwardController):
) )
multiply_based_on = d.basic_amount if total_basic_amount else d.qty multiply_based_on = d.basic_amount if total_basic_amount else d.qty
entry = item_account_wise_additional_cost[(d.item_code, d.name)][t.expense_account]
entry["amount"] += flt(t.amount * multiply_based_on) / divide_based_on
entry["base_amount"] += flt(t.base_amount * multiply_based_on) / divide_based_on
item_account_wise_additional_cost[(d.item_code, d.name)][t.expense_account]["amount"] += ( return item_account_wise_additional_cost
flt(t.amount * multiply_based_on) / divide_based_on
def _append_additional_cost_gl_entries(self, gl_entries, item_account_wise_additional_cost):
for d in self.get("items"):
for account, amount in item_account_wise_additional_cost.get((d.item_code, d.name), {}).items():
if not amount:
continue
gl_entries.append(
self.get_gl_dict(
{
"account": account,
"against": d.expense_account,
"cost_center": d.cost_center,
"remarks": self.get("remarks") or _("Accounting Entry for Stock"),
"credit_in_account_currency": flt(amount["amount"]),
"credit": flt(amount["base_amount"]),
},
item=d,
)
) )
item_account_wise_additional_cost[(d.item_code, d.name)][t.expense_account][ gl_entries.append(
"base_amount" self.get_gl_dict(
] += flt(t.base_amount * multiply_based_on) / divide_based_on {
"account": d.expense_account,
if item_account_wise_additional_cost: "against": account,
for d in self.get("items"): "cost_center": d.cost_center,
for account, amount in item_account_wise_additional_cost.get( "remarks": self.get("remarks") or _("Accounting Entry for Stock"),
(d.item_code, d.name), {} "credit": -1 * amount["base_amount"], # negative credit instead of debit
).items(): },
if not amount: item=d,
continue
gl_entries.append(
self.get_gl_dict(
{
"account": account,
"against": d.expense_account,
"cost_center": d.cost_center,
"remarks": self.get("remarks") or _("Accounting Entry for Stock"),
"credit_in_account_currency": flt(amount["amount"]),
"credit": flt(amount["base_amount"]),
},
item=d,
)
) )
)
gl_entries.append(
self.get_gl_dict(
{
"account": d.expense_account,
"against": account,
"cost_center": d.cost_center,
"remarks": self.get("remarks") or _("Accounting Entry for Stock"),
"credit": -1
* amount[
"base_amount"
], # put it as negative credit instead of debit purposefully
},
item=d,
)
)
self.set_gl_entries_for_landed_cost_voucher(gl_entries, inventory_account_map)
return process_gl_map(gl_entries, from_repost=frappe.flags.through_repost_item_valuation)
def set_gl_entries_for_landed_cost_voucher(self, gl_entries, inventory_account_map): def set_gl_entries_for_landed_cost_voucher(self, gl_entries, inventory_account_map):
landed_cost_entries = self.get_item_account_wise_lcv_entries() landed_cost_entries = self.get_item_account_wise_lcv_entries()
@@ -1240,49 +1243,68 @@ class StockEntry(StockController, SubcontractingInwardController):
@frappe.whitelist() @frappe.whitelist()
def get_item_details(self, args: ItemDetailsCtx | None = None, for_update: bool = False): def get_item_details(self, args: ItemDetailsCtx | None = None, for_update: bool = False):
item = frappe.qb.DocType("Item") item = self._fetch_item_data(args)
item_group_defaults = get_item_group_defaults(item.name, self.company)
brand_defaults = get_brand_defaults(item.name, self.company)
ret = self._build_item_ret(args, item, item_group_defaults, brand_defaults, for_update)
self._apply_account_defaults(ret)
args["posting_date"] = self.posting_date
args["posting_time"] = self.posting_time
ret.update(get_warehouse_details(args) if args.get("warehouse") else {})
if self.purpose == "Send to Subcontractor":
self._resolve_subcontract_item(args, ret)
barcode_data = get_barcode_data(item_code=item.name)
if barcode_data and len(barcode_data.get(item.name)) == 1:
ret["barcode"] = barcode_data.get(item.name)[0]
return ret
def _fetch_item_data(self, args):
item_dt = frappe.qb.DocType("Item")
item_default = frappe.qb.DocType("Item Default") item_default = frappe.qb.DocType("Item Default")
query = ( result = (
frappe.qb.from_(item) frappe.qb.from_(item_dt)
.left_join(item_default) .left_join(item_default)
.on((item.name == item_default.parent) & (item_default.company == self.company)) .on((item_dt.name == item_default.parent) & (item_default.company == self.company))
.select( .select(
item.name, item_dt.name,
item.stock_uom, item_dt.stock_uom,
item.description, item_dt.description,
item.image, item_dt.image,
item.is_stock_item, item_dt.is_stock_item,
item.item_name, item_dt.item_name,
item.item_group, item_dt.item_group,
item.has_batch_no, item_dt.has_batch_no,
item.sample_quantity, item_dt.sample_quantity,
item.has_serial_no, item_dt.has_serial_no,
item.allow_alternative_item, item_dt.allow_alternative_item,
item_default.expense_account, item_default.expense_account,
item_default.buying_cost_center, item_default.buying_cost_center,
) )
.where( .where(
(item.name == args.get("item_code")) (item_dt.name == args.get("item_code"))
& (item.disabled == 0) & (item_dt.disabled == 0)
& ( & (
(item.end_of_life.isnull()) (item_dt.end_of_life.isnull())
| (item.end_of_life < "1900-01-01") | (item_dt.end_of_life < "1900-01-01")
| (item.end_of_life > nowdate()) | (item_dt.end_of_life > nowdate())
) )
) )
) ).run(as_dict=True)
item = query.run(as_dict=True)
if not item: if not result:
frappe.throw( frappe.throw(
_("Item {0} is not active or end of life has been reached").format(args.get("item_code")) _("Item {0} is not active or end of life has been reached").format(args.get("item_code"))
) )
item = item[0] return result[0]
item_group_defaults = get_item_group_defaults(item.name, self.company)
brand_defaults = get_brand_defaults(item.name, self.company)
def _build_item_ret(self, args, item, item_group_defaults, brand_defaults, for_update):
ret = frappe._dict( ret = frappe._dict(
{ {
"uom": item.stock_uom, "uom": item.stock_uom,
@@ -1309,13 +1331,15 @@ class StockEntry(StockController, SubcontractingInwardController):
if self.purpose == "Send to Subcontractor": if self.purpose == "Send to Subcontractor":
ret["allow_alternative_item"] = item.allow_alternative_item ret["allow_alternative_item"] = item.allow_alternative_item
# update uom
if args.get("uom") and for_update: if args.get("uom") and for_update:
ret.update(get_uom_details(args.get("item_code"), args.get("uom"), args.get("qty"))) ret.update(get_uom_details(args.get("item_code"), args.get("uom"), args.get("qty")))
if self.purpose == "Material Issue": if self.purpose == "Material Issue":
ret["expense_account"] = item.get("expense_account") or item_group_defaults.get("expense_account") ret["expense_account"] = item.get("expense_account") or item_group_defaults.get("expense_account")
return ret
def _apply_account_defaults(self, ret):
if not ret.get("expense_account"): if not ret.get("expense_account"):
ret["expense_account"] = frappe.get_cached_value( ret["expense_account"] = frappe.get_cached_value(
"Company", self.company, "stock_adjustment_account" "Company", self.company, "stock_adjustment_account"
@@ -1328,34 +1352,21 @@ class StockEntry(StockController, SubcontractingInwardController):
if not ret.get(field): if not ret.get(field):
ret[field] = frappe.get_cached_value("Company", self.company, company_field) ret[field] = frappe.get_cached_value("Company", self.company, company_field)
args["posting_date"] = self.posting_date def _resolve_subcontract_item(self, args, ret):
args["posting_time"] = self.posting_time if not (self.get(self.subcontract_data.order_field) and args.get("item_code")):
return
stock_and_rate = get_warehouse_details(args) if args.get("warehouse") else {} subcontract_items = frappe.get_all(
ret.update(stock_and_rate) self.subcontract_data.order_supplied_items_field,
{
"parent": self.get(self.subcontract_data.order_field),
"rm_item_code": args.get("item_code"),
},
"main_item_code",
)
if ( if subcontract_items and len(subcontract_items) == 1:
self.purpose == "Send to Subcontractor" ret["subcontracted_item"] = subcontract_items[0].main_item_code
and self.get(self.subcontract_data.order_field)
and args.get("item_code")
):
subcontract_items = frappe.get_all(
self.subcontract_data.order_supplied_items_field,
{
"parent": self.get(self.subcontract_data.order_field),
"rm_item_code": args.get("item_code"),
},
"main_item_code",
)
if subcontract_items and len(subcontract_items) == 1:
ret["subcontracted_item"] = subcontract_items[0].main_item_code
barcode_data = get_barcode_data(item_code=item.name)
if barcode_data and len(barcode_data.get(item.name)) == 1:
ret["barcode"] = barcode_data.get(item.name)[0]
return ret
@frappe.whitelist() @frappe.whitelist()
def set_items_for_stock_in(self): def set_items_for_stock_in(self):
@@ -1385,8 +1396,8 @@ class StockEntry(StockController, SubcontractingInwardController):
@frappe.whitelist() @frappe.whitelist()
def get_items(self): def get_items(self):
self.set("items", []) self.set("items", [])
if self.se_handler_class and hasattr(self.se_handler_class, "add_items"): if self.purpose_cls and hasattr(self.purpose_cls, "add_items"):
self.se_handler_class(self).add_items() self.purpose_cls(self).add_items()
self.set_serial_batch_from_reserved_entry() self.set_serial_batch_from_reserved_entry()
self.set_actual_qty() self.set_actual_qty()

View File

@@ -112,50 +112,44 @@ class DisassembleStockEntry(BaseStockEntry):
def _append_disassembly_row_from_source(self, disassemble_qty, scale_factor): def _append_disassembly_row_from_source(self, disassemble_qty, scale_factor):
for source_row in self.get_items_from_manufacture_stock_entry(): for source_row in self.get_items_from_manufacture_stock_entry():
if source_row.is_finished_item: self._append_disassembly_item(source_row, disassemble_qty, scale_factor)
qty = disassemble_qty
s_warehouse = self.doc.from_warehouse or source_row.t_warehouse
t_warehouse = ""
elif source_row.s_warehouse:
# RM: was consumed FROM s_warehouse -> return TO s_warehouse
qty = flt(source_row.qty * scale_factor)
s_warehouse = ""
t_warehouse = self.doc.to_warehouse or source_row.s_warehouse
else:
# Scrap/secondary: was produced TO t_warehouse -> take FROM t_warehouse
qty = flt(source_row.qty * scale_factor)
s_warehouse = source_row.t_warehouse
t_warehouse = ""
item = { def _get_disassembly_warehouses(self, source_row, disassemble_qty, scale_factor):
"item_code": source_row.item_code, if source_row.is_finished_item:
"item_name": source_row.item_name, return disassemble_qty, self.doc.from_warehouse or source_row.t_warehouse, ""
"description": source_row.description, elif source_row.s_warehouse:
"stock_uom": source_row.stock_uom, return flt(source_row.qty * scale_factor), "", self.doc.to_warehouse or source_row.s_warehouse
"uom": source_row.uom, else:
"conversion_factor": source_row.conversion_factor, return flt(source_row.qty * scale_factor), source_row.t_warehouse, ""
"basic_rate": source_row.basic_rate,
"qty": qty,
"s_warehouse": s_warehouse,
"t_warehouse": t_warehouse,
"is_finished_item": source_row.is_finished_item,
"type": source_row.type,
"is_legacy_scrap_item": source_row.is_legacy_scrap_item,
"bom_secondary_item": source_row.bom_secondary_item,
"bom_no": source_row.bom_no,
# batch and serial bundles built on submit
"use_serial_batch_fields": 1 if (source_row.batch_no or source_row.serial_no) else 0,
}
if self.doc.source_stock_entry: def _build_disassembly_item_dict(self, source_row, qty, s_warehouse, t_warehouse):
item.update( return {
{ "item_code": source_row.item_code,
"against_stock_entry": self.doc.source_stock_entry, "item_name": source_row.item_name,
"ste_detail": source_row.name, "description": source_row.description,
} "stock_uom": source_row.stock_uom,
) "uom": source_row.uom,
"conversion_factor": source_row.conversion_factor,
"basic_rate": source_row.basic_rate,
"qty": qty,
"s_warehouse": s_warehouse,
"t_warehouse": t_warehouse,
"is_finished_item": source_row.is_finished_item,
"type": source_row.type,
"is_legacy_scrap_item": source_row.is_legacy_scrap_item,
"bom_secondary_item": source_row.bom_secondary_item,
"bom_no": source_row.bom_no,
"use_serial_batch_fields": 1 if (source_row.batch_no or source_row.serial_no) else 0,
}
self.doc.append("items", item) def _append_disassembly_item(self, source_row, disassemble_qty, scale_factor):
qty, s_warehouse, t_warehouse = self._get_disassembly_warehouses(
source_row, disassemble_qty, scale_factor
)
item = self._build_disassembly_item_dict(source_row, qty, s_warehouse, t_warehouse)
if self.doc.source_stock_entry:
item.update({"against_stock_entry": self.doc.source_stock_entry, "ste_detail": source_row.name})
self.doc.append("items", item)
def _add_items_for_disassembly_from_bom(self): def _add_items_for_disassembly_from_bom(self):
if not self.doc.bom_no or not self.doc.fg_completed_qty: if not self.doc.bom_no or not self.doc.fg_completed_qty:
@@ -291,70 +285,71 @@ class DisassembleStockEntry(BaseStockEntry):
frappe.db.get_value("Stock Entry", self.doc.source_stock_entry, "fg_completed_qty") frappe.db.get_value("Stock Entry", self.doc.source_stock_entry, "fg_completed_qty")
) )
scale_factor = flt(self.doc.fg_completed_qty) / source_fg_qty if source_fg_qty else 0 scale_factor = flt(self.doc.fg_completed_qty) / source_fg_qty if source_fg_qty else 0
bundle_data = get_voucher_wise_serial_batch_from_bundle(voucher_no=[self.doc.source_stock_entry]) bundle_data = get_voucher_wise_serial_batch_from_bundle(voucher_no=[self.doc.source_stock_entry])
source_rows_by_name = {r.name: r for r in self.get_items_from_manufacture_stock_entry()} source_rows_by_name = {r.name: r for r in self.get_items_from_manufacture_stock_entry()}
for row in self.doc.items: for row in self.doc.items:
if not row.ste_detail: if not row.ste_detail:
continue continue
source_row = source_rows_by_name.get(row.ste_detail) source_row = source_rows_by_name.get(row.ste_detail)
if not source_row: if source_row:
continue self._apply_bundle_to_disassembly_row(row, source_row, bundle_data, scale_factor)
source_warehouse = source_row.s_warehouse or source_row.t_warehouse def _apply_bundle_to_disassembly_row(self, row, source_row, bundle_data, scale_factor):
key = (source_row.item_code, source_warehouse, self.doc.source_stock_entry) source_warehouse = source_row.s_warehouse or source_row.t_warehouse
source_bundle = bundle_data.get(key, {}) key = (source_row.item_code, source_warehouse, self.doc.source_stock_entry)
source_bundle = bundle_data.get(key, {})
batches = self._extract_batches(source_row, source_bundle, row, scale_factor)
serial_nos = self._extract_serial_nos(source_row, source_bundle, row)
self._set_serial_batch_bundle_for_disassembly_row(row, serial_nos, batches)
batches = defaultdict(float) def _extract_batches(self, source_row, source_bundle, row, scale_factor):
serial_nos = [] batches = defaultdict(float)
if source_bundle.get("batch_nos"):
self._allocate_batches(batches, source_bundle["batch_nos"], row.transfer_qty, scale_factor)
elif source_row.batch_no:
batches[source_row.batch_no] = row.transfer_qty
return batches
if source_bundle.get("batch_nos"): def _allocate_batches(self, batches, batch_nos, transfer_qty, scale_factor):
qty_remaining = row.transfer_qty qty_remaining = transfer_qty
for batch_no, batch_qty in source_bundle["batch_nos"].items(): for batch_no, batch_qty in batch_nos.items():
if qty_remaining <= 0: if qty_remaining <= 0:
break break
alloc = min(abs(flt(batch_qty)) * scale_factor, qty_remaining) alloc = min(abs(flt(batch_qty)) * scale_factor, qty_remaining)
batches[batch_no] = alloc batches[batch_no] = alloc
qty_remaining -= alloc qty_remaining -= alloc
elif source_row.batch_no:
batches[source_row.batch_no] = row.transfer_qty
if source_bundle.get("serial_nos"): def _extract_serial_nos(self, source_row, source_bundle, row):
serial_nos = get_serial_nos(source_bundle["serial_nos"])[: int(row.transfer_qty)] if source_bundle.get("serial_nos"):
elif source_row.serial_no: return get_serial_nos(source_bundle["serial_nos"])[: int(row.transfer_qty)]
serial_nos = get_serial_nos(source_row.serial_no)[: int(row.transfer_qty)] elif source_row.serial_no:
return get_serial_nos(source_row.serial_no)[: int(row.transfer_qty)]
self._set_serial_batch_bundle_for_disassembly_row(row, serial_nos, batches) return []
def _set_serial_batch_for_disassembly_from_available_materials(self): def _set_serial_batch_for_disassembly_from_available_materials(self):
available_materials = get_available_materials(self.doc.work_order, self.doc) available_materials = get_available_materials(self.doc.work_order, self.doc)
for row in self.doc.items: for row in self.doc.items:
warehouse = row.s_warehouse or row.t_warehouse warehouse = row.s_warehouse or row.t_warehouse
materials = available_materials.get((row.item_code, warehouse)) materials = available_materials.get((row.item_code, warehouse))
if not materials: if materials:
continue self._apply_available_material_bundle(row, materials)
batches = defaultdict(float) def _apply_available_material_bundle(self, row, materials):
serial_nos = [] batches = self._collect_available_batches(materials.batch_details, row.transfer_qty)
qty = row.transfer_qty serial_nos = materials.serial_nos[: int(row.transfer_qty)] if materials.serial_nos else []
for batch_no, batch_qty in materials.batch_details.items(): self._set_serial_batch_bundle_for_disassembly_row(row, serial_nos, batches)
if qty <= 0:
break
batch_qty = abs(batch_qty) def _collect_available_batches(self, batch_details, transfer_qty):
if batch_qty <= qty: batches, qty = defaultdict(float), transfer_qty
batches[batch_no] = batch_qty for batch_no, batch_qty in batch_details.items():
qty -= batch_qty if qty <= 0:
else: break
batches[batch_no] = qty batch_qty = abs(batch_qty)
qty = 0 if batch_qty <= qty:
batches[batch_no], qty = batch_qty, qty - batch_qty
if materials.serial_nos: else:
serial_nos = materials.serial_nos[: int(row.transfer_qty)] batches[batch_no], qty = qty, 0
return batches
self._set_serial_batch_bundle_for_disassembly_row(row, serial_nos, batches)
def _set_serial_batch_bundle_for_disassembly_row(self, row, serial_nos, batches): def _set_serial_batch_bundle_for_disassembly_row(self, row, serial_nos, batches):
if not serial_nos and not batches: if not serial_nos and not batches:
@@ -392,145 +387,143 @@ class DisassembleStockEntry(BaseStockEntry):
def get_available_materials(work_order, stock_entry_doc=None) -> dict: def get_available_materials(work_order, stock_entry_doc=None) -> dict:
data = get_stock_entry_data(work_order, stock_entry_doc=stock_entry_doc) data = get_stock_entry_data(work_order, stock_entry_doc=stock_entry_doc)
available_materials = {} available_materials = {}
for row in data: for row in data:
key = (row.item_code, row.warehouse) key = _get_material_key(row, stock_entry_doc)
if row.purpose != "Material Transfer for Manufacture":
key = (row.item_code, row.s_warehouse)
if stock_entry_doc and stock_entry_doc.purpose == "Disassemble":
key = (row.item_code, row.s_warehouse or row.warehouse)
if key not in available_materials: if key not in available_materials:
available_materials.setdefault( available_materials[key] = frappe._dict(
key, {"item_details": row, "batch_details": defaultdict(float), "qty": 0, "serial_nos": []}
frappe._dict(
{"item_details": row, "batch_details": defaultdict(float), "qty": 0, "serial_nos": []}
),
) )
_update_material_qty(available_materials[key], row, stock_entry_doc)
item_data = available_materials[key]
if row.purpose == "Material Transfer for Manufacture" or (
stock_entry_doc and stock_entry_doc.purpose == "Disassemble" and row.purpose == "Manufacture"
):
item_data.qty += row.qty
if row.batch_no:
item_data.batch_details[row.batch_no] += row.qty
elif row.batch_nos:
for batch_no, qty in row.batch_nos.items():
item_data.batch_details[batch_no] += qty
if row.serial_no:
item_data.serial_nos.extend(get_serial_nos(row.serial_no))
item_data.serial_nos.sort()
elif row.serial_nos:
item_data.serial_nos.extend(get_serial_nos(row.serial_nos))
item_data.serial_nos.sort()
else:
# Consume raw material qty in case of 'Manufacture' or 'Material Consumption for Manufacture'
item_data.qty -= row.qty
if row.batch_no:
item_data.batch_details[row.batch_no] -= row.qty
elif row.batch_nos:
for batch_no, qty in row.batch_nos.items():
item_data.batch_details[batch_no] += qty
if row.serial_no:
for serial_no in get_serial_nos(row.serial_no):
if serial_no in item_data.serial_nos:
item_data.serial_nos.remove(serial_no)
elif row.serial_nos:
for serial_no in get_serial_nos(row.serial_nos):
if serial_no in item_data.serial_nos:
item_data.serial_nos.remove(serial_no)
return available_materials return available_materials
def _get_material_key(row, stock_entry_doc):
if stock_entry_doc and stock_entry_doc.purpose == "Disassemble":
return (row.item_code, row.s_warehouse or row.warehouse)
if row.purpose != "Material Transfer for Manufacture":
return (row.item_code, row.s_warehouse)
return (row.item_code, row.warehouse)
def _update_material_qty(item_data, row, stock_entry_doc):
is_inward = row.purpose == "Material Transfer for Manufacture" or (
stock_entry_doc and stock_entry_doc.purpose == "Disassemble" and row.purpose == "Manufacture"
)
if is_inward:
_add_inward_material_qty(item_data, row)
else:
_deduct_consumed_material_qty(item_data, row)
def _add_inward_material_qty(item_data, row):
item_data.qty += row.qty
if row.batch_no:
item_data.batch_details[row.batch_no] += row.qty
elif row.batch_nos:
for batch_no, qty in row.batch_nos.items():
item_data.batch_details[batch_no] += qty
_extend_serial_nos_from_row(item_data, row)
def _extend_serial_nos_from_row(item_data, row):
sn = row.serial_no or row.serial_nos
if sn:
item_data.serial_nos.extend(get_serial_nos(sn))
item_data.serial_nos.sort()
def _deduct_consumed_material_qty(item_data, row):
item_data.qty -= row.qty
if row.batch_no:
item_data.batch_details[row.batch_no] -= row.qty
elif row.batch_nos:
for batch_no, qty in row.batch_nos.items():
item_data.batch_details[batch_no] += qty
_remove_serial_nos_from_available(item_data, row)
def _remove_serial_nos_from_available(item_data, row):
sn = row.serial_no or row.serial_nos
if not sn:
return
for serial_no in get_serial_nos(sn):
if serial_no in item_data.serial_nos:
item_data.serial_nos.remove(serial_no)
def get_stock_entry_data(work_order, stock_entry_doc=None): def get_stock_entry_data(work_order, stock_entry_doc=None):
data = _run_stock_entry_query(work_order, stock_entry_doc)
if not data:
return []
_enrich_with_bundle_data(data, stock_entry_doc)
return data
def _run_stock_entry_query(work_order, stock_entry_doc):
se = frappe.qb.DocType("Stock Entry")
sed = frappe.qb.DocType("Stock Entry Detail")
query = _build_stock_entry_base_query(se, sed, work_order)
query = _apply_stock_entry_purpose_filter(query, se, sed, stock_entry_doc)
return query.run(as_dict=1)
def _build_stock_entry_base_query(se, sed, work_order):
return (
frappe.qb.from_(se)
.from_(sed)
.select(
sed.item_name,
sed.original_item,
sed.item_code,
sed.qty,
sed.t_warehouse.as_("warehouse"),
sed.s_warehouse.as_("s_warehouse"),
sed.description,
sed.stock_uom,
sed.expense_account,
sed.cost_center,
sed.serial_and_batch_bundle,
sed.batch_no,
sed.serial_no,
se.purpose,
se.name,
)
.where((se.name == sed.parent) & (se.work_order == work_order) & (se.docstatus == 1))
.orderby(se.creation, sed.item_code, sed.idx)
)
def _apply_stock_entry_purpose_filter(query, se, sed, stock_entry_doc):
if stock_entry_doc and stock_entry_doc.purpose == "Disassemble":
query = query.where(se.purpose.isin(["Disassemble", "Manufacture"]))
return query.where(se.name != stock_entry_doc.name)
query = query.where(
se.purpose.isin(
["Manufacture", "Material Consumption for Manufacture", "Material Transfer for Manufacture"]
)
)
return query.where(sed.s_warehouse.isnotnull())
def _enrich_with_bundle_data(data, stock_entry_doc):
from erpnext.stock.doctype.serial_and_batch_bundle.serial_and_batch_bundle import ( from erpnext.stock.doctype.serial_and_batch_bundle.serial_and_batch_bundle import (
get_voucher_wise_serial_batch_from_bundle, get_voucher_wise_serial_batch_from_bundle,
) )
stock_entry = frappe.qb.DocType("Stock Entry")
stock_entry_detail = frappe.qb.DocType("Stock Entry Detail")
data = (
frappe.qb.from_(stock_entry)
.from_(stock_entry_detail)
.select(
stock_entry_detail.item_name,
stock_entry_detail.original_item,
stock_entry_detail.item_code,
stock_entry_detail.qty,
(stock_entry_detail.t_warehouse).as_("warehouse"),
(stock_entry_detail.s_warehouse).as_("s_warehouse"),
stock_entry_detail.description,
stock_entry_detail.stock_uom,
stock_entry_detail.expense_account,
stock_entry_detail.cost_center,
stock_entry_detail.serial_and_batch_bundle,
stock_entry_detail.batch_no,
stock_entry_detail.serial_no,
stock_entry.purpose,
stock_entry.name,
)
.where(
(stock_entry.name == stock_entry_detail.parent)
& (stock_entry.work_order == work_order)
& (stock_entry.docstatus == 1)
)
.orderby(stock_entry.creation, stock_entry_detail.item_code, stock_entry_detail.idx)
)
if stock_entry_doc and stock_entry_doc.purpose == "Disassemble":
data = data.where(
stock_entry.purpose.isin(
[
"Disassemble",
"Manufacture",
]
)
)
data = data.where(stock_entry.name != stock_entry_doc.name)
else:
data = data.where(
stock_entry.purpose.isin(
[
"Manufacture",
"Material Consumption for Manufacture",
"Material Transfer for Manufacture",
]
)
)
data = data.where(stock_entry_detail.s_warehouse.isnotnull())
data = data.run(as_dict=1)
if not data:
return []
voucher_nos = [row.get("name") for row in data if row.get("name")] voucher_nos = [row.get("name") for row in data if row.get("name")]
if voucher_nos: if not voucher_nos:
bundle_data = get_voucher_wise_serial_batch_from_bundle(voucher_no=voucher_nos) return
for row in data: bundle_data = get_voucher_wise_serial_batch_from_bundle(voucher_no=voucher_nos)
key = (row.item_code, row.warehouse, row.name) for row in data:
if row.purpose != "Material Transfer for Manufacture": key = _get_bundle_key(row, stock_entry_doc)
key = (row.item_code, row.s_warehouse, row.name) if bundle_data.get(key):
row.update(bundle_data.get(key))
if stock_entry_doc and stock_entry_doc.purpose == "Disassemble":
key = (row.item_code, row.s_warehouse or row.warehouse, row.name)
if bundle_data.get(key): def _get_bundle_key(row, stock_entry_doc):
row.update(bundle_data.get(key)) if stock_entry_doc and stock_entry_doc.purpose == "Disassemble":
return (row.item_code, row.s_warehouse or row.warehouse, row.name)
return data if row.purpose != "Material Transfer for Manufacture":
return (row.item_code, row.s_warehouse, row.name)
return (row.item_code, row.warehouse, row.name)

View File

@@ -7,12 +7,10 @@ from frappe.query_builder.functions import Sum
from frappe.utils import ceil, cint, flt, get_link_to_form from frappe.utils import ceil, cint, flt, get_link_to_form
from erpnext.manufacturing.doctype.bom.bom import add_additional_cost from erpnext.manufacturing.doctype.bom.bom import add_additional_cost
from erpnext.stock.doctype.serial_no.serial_no import get_serial_nos
from erpnext.stock.serial_batch_bundle import ( from erpnext.stock.serial_batch_bundle import (
SerialBatchCreation, SerialBatchCreation,
get_batch_nos, get_batch_nos,
get_empty_batches_based_work_order, get_empty_batches_based_work_order,
get_serial_or_batch_items,
) )
from .base import BaseStockEntry from .base import BaseStockEntry
@@ -212,52 +210,39 @@ class BaseManufactureStockEntry(BaseStockEntry):
def add_batchwise_finished_good(self, batches, item_details): def add_batchwise_finished_good(self, batches, item_details):
qty = flt(self.doc.fg_completed_qty) qty = flt(self.doc.fg_completed_qty)
row = frappe._dict({"batches_to_be_consume": defaultdict(float)}) row = frappe._dict({"batches_to_be_consume": defaultdict(float)})
self.update_batches_to_be_consume(batches, row, qty) self.update_batches_to_be_consume(batches, row, qty)
if row.batches_to_be_consume:
self._link_fg_bundle_and_append(item_details, row)
if not row.batches_to_be_consume: def _link_fg_bundle_and_append(self, item_details, row):
return
_id = create_serial_and_batch_bundle( _id = create_serial_and_batch_bundle(
self.doc, self.doc,
row, row,
frappe._dict( frappe._dict(
{ {"item_code": self.wo_doc.production_item, "warehouse": item_details.get("t_warehouse")}
"item_code": self.wo_doc.production_item,
"warehouse": item_details.get("t_warehouse"),
}
), ),
) )
item_details["serial_and_batch_bundle"] = _id item_details["serial_and_batch_bundle"] = _id
self.doc.append("items", item_details) self.doc.append("items", item_details)
def update_batches_to_be_consume(self, batches, row, qty): def update_batches_to_be_consume(self, batches, row, qty):
qty_to_be_consumed = qty qty_to_be_consumed = qty
batches = sorted(batches.items(), key=lambda x: x[0]) for batch_no, batch_qty in sorted(batches.items(), key=lambda x: x[0]):
for batch_no, batch_qty in batches:
if qty_to_be_consumed <= 0 or batch_qty <= 0: if qty_to_be_consumed <= 0 or batch_qty <= 0:
continue continue
batch_qty = min(batch_qty, qty_to_be_consumed)
if batch_qty > qty_to_be_consumed: self._consume_batch(row, batch_no, batch_qty)
batch_qty = qty_to_be_consumed
row.batches_to_be_consume[batch_no] += batch_qty
if batch_no and row.serial_nos:
serial_nos = self.get_serial_nos_based_on_transferred_batch(batch_no, row.serial_nos)
serial_nos = serial_nos[0 : cint(batch_qty)]
# remove consumed serial nos from list
for sn in serial_nos:
row.serial_nos.remove(sn)
if "batch_details" in row:
row.batch_details[batch_no] -= batch_qty
qty_to_be_consumed -= batch_qty qty_to_be_consumed -= batch_qty
def _consume_batch(self, row, batch_no, batch_qty):
row.batches_to_be_consume[batch_no] += batch_qty
if batch_no and row.serial_nos:
serial_nos = self.get_serial_nos_based_on_transferred_batch(batch_no, row.serial_nos)
for sn in serial_nos[: cint(batch_qty)]:
row.serial_nos.remove(sn)
if "batch_details" in row:
row.batch_details[batch_no] -= batch_qty
class ManufactureStockEntry(BaseManufactureStockEntry): class ManufactureStockEntry(BaseManufactureStockEntry):
def before_validate(self): def before_validate(self):
@@ -322,33 +307,30 @@ class ManufactureStockEntry(BaseManufactureStockEntry):
wo = self.wo_doc wo = self.wo_doc
if not wo: if not wo:
return return
work_order_qty = flt(wo.material_transferred_for_manufacturing) or flt(wo.qty) work_order_qty = flt(wo.material_transferred_for_manufacturing) or flt(wo.qty)
wo_qty_to_produce = work_order_qty - flt(wo.produced_qty) wo_qty_to_produce = work_order_qty - flt(wo.produced_qty)
for item in wo.get("required_items"): for item in wo.get("required_items"):
wo_item_qty = flt(item.transferred_qty) or flt(item.required_qty) self._append_unconsumed_item(item, wo, wo_qty_to_produce)
wo_qty_unconsumed = wo_item_qty - flt(item.consumed_qty)
bom_qty_per_unit = flt(item.required_qty) / flt(wo.qty)
req_qty_each = wo_qty_unconsumed / (wo_qty_to_produce or 1) def _append_unconsumed_item(self, item, wo, wo_qty_to_produce):
req_qty_each = min(req_qty_each, bom_qty_per_unit) wo_item_qty = flt(item.transferred_qty) or flt(item.required_qty)
wo_qty_unconsumed = wo_item_qty - flt(item.consumed_qty)
qty = req_qty_each * flt(self.doc.fg_completed_qty) bom_qty_per_unit = flt(item.required_qty) / flt(wo.qty)
if qty <= 0: req_qty_each = min(wo_qty_unconsumed / (wo_qty_to_produce or 1), bom_qty_per_unit)
continue qty = req_qty_each * flt(self.doc.fg_completed_qty)
if qty <= 0:
item_args = self.get_item_dict(item) return
item_args.update( item_args = self.get_item_dict(item)
{ item_args.update(
"conversion_factor": 1, {
"s_warehouse": wo.wip_warehouse or item.source_warehouse, "conversion_factor": 1,
"uom": item.stock_uom, "s_warehouse": wo.wip_warehouse or item.source_warehouse,
"qty": ceil_qty_if_uom_has_whole_number(qty, item.stock_uom), "uom": item.stock_uom,
} "qty": ceil_qty_if_uom_has_whole_number(qty, item.stock_uom),
) }
item_args["transfer_qty"] = item_args["qty"] )
self.doc.append("items", item_args) item_args["transfer_qty"] = item_args["qty"]
self.doc.append("items", item_args)
def add_raw_materials_based_on_work_order(self): def add_raw_materials_based_on_work_order(self):
bom_items = ( bom_items = (
@@ -357,42 +339,47 @@ class ManufactureStockEntry(BaseManufactureStockEntry):
else get_bom_items(self.doc.bom_no, self.doc.use_multi_level_bom) else get_bom_items(self.doc.bom_no, self.doc.use_multi_level_bom)
) )
alternative_items = self.get_alternative_items(bom_items) alternative_items = self.get_alternative_items(bom_items)
for row in bom_items: for row in bom_items:
item_args = self.get_item_dict(row) self._append_wo_raw_material(row, alternative_items)
warehouse = self.doc.from_warehouse
if not warehouse:
if self.wo_doc.from_wip_warehouse:
warehouse = self.wo_doc.wip_warehouse
else:
warehouse = row.get("source_warehouse")
item_args.update( def _append_wo_raw_material(self, row, alternative_items):
{ item_args = self.get_item_dict(row)
"conversion_factor": 1, item_args.update(
"item_group": row.get("item_group"), {
"s_warehouse": warehouse, "conversion_factor": 1,
"uom": row.stock_uom, "item_group": row.get("item_group"),
} "s_warehouse": self._resolve_rm_warehouse(row),
) "uom": row.stock_uom,
}
)
qty = (
(row.required_qty / self.wo_doc.qty) * self.doc.fg_completed_qty
if self.wo_doc
else flt(row.qty) * self.doc.fg_completed_qty
)
item_args["qty"] = ceil_qty_if_uom_has_whole_number(qty, row.stock_uom)
item_args["transfer_qty"] = item_args["qty"]
if alt := alternative_items.get(row.item_code):
self.set_alternative_item_details(item_args, alt)
self.doc.append("items", item_args)
if self.wo_doc: def _resolve_rm_warehouse(self, row):
qty = (row.required_qty / self.wo_doc.qty) * self.doc.fg_completed_qty if self.doc.from_warehouse:
else: return self.doc.from_warehouse
qty = flt(row.qty) * self.doc.fg_completed_qty if self.wo_doc.from_wip_warehouse:
return self.wo_doc.wip_warehouse
item_args["qty"] = ceil_qty_if_uom_has_whole_number(qty, row.stock_uom) return row.get("source_warehouse")
item_args["transfer_qty"] = item_args["qty"]
if alternative_item_details := alternative_items.get(row.item_code):
self.set_alternative_item_details(item_args, alternative_item_details)
self.doc.append("items", item_args)
def get_alternative_items(self, bom_items): def get_alternative_items(self, bom_items):
item_codes_in_bom = [row.item_code for row in bom_items]
data = self._query_alternative_items(item_codes_in_bom)
if not data:
return frappe._dict()
return self._index_alternative_items(data)
def _query_alternative_items(self, item_codes_in_bom):
doctype = frappe.qb.DocType("Stock Entry") doctype = frappe.qb.DocType("Stock Entry")
child_doc = frappe.qb.DocType("Stock Entry Detail") child_doc = frappe.qb.DocType("Stock Entry Detail")
query = ( query = (
frappe.qb.from_(child_doc) frappe.qb.from_(child_doc)
.inner_join(doctype) .inner_join(doctype)
@@ -413,20 +400,15 @@ class ManufactureStockEntry(BaseManufactureStockEntry):
& (doctype.docstatus == 1) & (doctype.docstatus == 1)
) )
) )
item_codes_in_bom = [row.item_code for row in bom_items]
if item_codes_in_bom: if item_codes_in_bom:
query = query.where(child_doc.original_item.isin(item_codes_in_bom)) query = query.where(child_doc.original_item.isin(item_codes_in_bom))
return query.run(as_dict=1)
data = query.run(as_dict=1) def _index_alternative_items(self, data):
if not data:
return frappe._dict()
alternative_items = frappe._dict() alternative_items = frappe._dict()
for row in data: for row in data:
alternative_items[row.original_item] = row alternative_items[row.original_item] = row
alternative_items[row.original_item].original_item = None alternative_items[row.original_item].original_item = None
return alternative_items return alternative_items
def set_alternative_item_details(self, row, alternative_item_details): def set_alternative_item_details(self, row, alternative_item_details):
@@ -440,79 +422,72 @@ class ManufactureStockEntry(BaseManufactureStockEntry):
def add_raw_materials_based_on_transfer(self): def add_raw_materials_based_on_transfer(self):
self.prepare_available_materials_based_on_transfer() self.prepare_available_materials_based_on_transfer()
pending_qty_to_mfg = flt(self.wo_doc.material_transferred_for_manufacturing) - flt( pending_qty_to_mfg = flt(self.wo_doc.material_transferred_for_manufacturing) - flt(
self.wo_doc.produced_qty self.wo_doc.produced_qty
) )
if pending_qty_to_mfg <= 0 and not self.doc.get("is_return"): if pending_qty_to_mfg <= 0 and not self.doc.get("is_return"):
return return
for key in self.available_materials:
self._append_transfer_based_rm(self.available_materials[key], pending_qty_to_mfg)
for row in self.available_materials: def _append_transfer_based_rm(self, row, pending_qty_to_mfg):
row = self.available_materials[row] item_args = self.get_item_dict(row)
item_args = self.get_item_dict(row) is_return = self.doc.get("is_return")
if not self.doc.get("is_return"): qty = row.qty if is_return else (flt(row.qty) * flt(self.doc.fg_completed_qty)) / pending_qty_to_mfg
qty = (flt(row.qty) * flt(self.doc.fg_completed_qty)) / pending_qty_to_mfg item_args["qty"] = ceil_qty_if_uom_has_whole_number(qty, row.uom)
else: item_args["transfer_qty"] = item_args["qty"]
qty = row.qty if is_return:
item_args["s_warehouse"], item_args["t_warehouse"] = row.s_warehouse, row.t_warehouse
item_args["qty"] = ceil_qty_if_uom_has_whole_number(qty, row.uom) else:
item_args["transfer_qty"] = item_args["qty"] item_args["t_warehouse"], item_args["s_warehouse"] = None, row.warehouse
if row.serial_nos or row.batches:
if not self.doc.get("is_return"): self.assign_serial_batches_to_materials(item_args, row, qty)
item_args["t_warehouse"] = None else:
item_args["s_warehouse"] = row.warehouse self.doc.append("items", item_args)
else:
# In case of return, source and target warehouse will be swapped
item_args["s_warehouse"] = row.s_warehouse
item_args["t_warehouse"] = row.t_warehouse
if row.serial_nos or row.batches:
self.assign_serial_batches_to_materials(item_args, row, qty)
else:
self.doc.append("items", item_args)
def assign_serial_batches_to_materials(self, item_args, row, qty): def assign_serial_batches_to_materials(self, item_args, row, qty):
if row.serial_nos: if row.serial_nos:
if serial_nos := row.serial_nos[0 : cint(qty)]: self._append_with_serial_nos(item_args, row, qty)
item_args["serial_no"] = "\n".join(serial_nos) elif len(row.batches) == 1:
self._append_with_single_batch(item_args, row)
if not item_args["uom"]:
item_args["uom"] = row.stock_uom
item_args["use_serial_batch_fields"] = 1
self.doc.append("items", item_args)
elif row.batches and len(row.batches) == 1:
item_args["batch_no"] = next(iter(row.batches.keys()))
if not item_args["uom"]:
item_args["uom"] = row.stock_uom
item_args["use_serial_batch_fields"] = 1
self.doc.append("items", item_args)
elif row.batches: elif row.batches:
self.split_items_based_on_batches(qty, item_args, row) self.split_items_based_on_batches(qty, item_args, row)
def _append_with_serial_nos(self, item_args, row, qty):
if serial_nos := row.serial_nos[: cint(qty)]:
item_args["serial_no"] = "\n".join(serial_nos)
if not item_args.get("uom"):
item_args["uom"] = row.stock_uom
item_args["use_serial_batch_fields"] = 1
self.doc.append("items", item_args)
def _append_with_single_batch(self, item_args, row):
item_args["batch_no"] = next(iter(row.batches.keys()))
if not item_args.get("uom"):
item_args["uom"] = row.stock_uom
item_args["use_serial_batch_fields"] = 1
self.doc.append("items", item_args)
def split_items_based_on_batches(self, qty, item_args, row): def split_items_based_on_batches(self, qty, item_args, row):
for batch_no, batch_qty in row.batches.items(): for batch_no, batch_qty in row.batches.items():
if qty <= 0: if qty <= 0:
return return
qty = self._append_batch_split_item(item_args, row, batch_no, batch_qty, qty)
if batch_qty >= qty: def _append_batch_split_item(self, item_args, row, batch_no, batch_qty, qty):
item_args["qty"] = qty if batch_qty >= qty:
qty = 0 item_args["qty"], qty = qty, 0
else: else:
item_args["qty"] = batch_qty item_args["qty"] = batch_qty
qty -= batch_qty qty -= batch_qty
row.batches[batch_no] -= batch_qty
row.batches[batch_no] -= batch_qty if not item_args.get("uom"):
if not item_args["uom"]: item_args["uom"] = row.stock_uom
item_args["uom"] = row.stock_uom item_args["batch_no"] = batch_no
item_args["transfer_qty"] = item_args["qty"]
item_args["batch_no"] = batch_no item_args["use_serial_batch_fields"] = 1
item_args["transfer_qty"] = item_args["qty"] self.doc.append("items", item_args)
item_args["use_serial_batch_fields"] = 1 return qty
self.doc.append("items", item_args)
def prepare_available_materials_based_on_transfer(self): def prepare_available_materials_based_on_transfer(self):
self.available_materials = frappe._dict() self.available_materials = frappe._dict()
@@ -584,14 +559,17 @@ class ManufactureStockEntry(BaseManufactureStockEntry):
key = (row.item_code, row.warehouse) key = (row.item_code, row.warehouse)
self.available_materials[key].qty -= row.qty self.available_materials[key].qty -= row.qty
if row.serial_and_batch_bundle: if row.serial_and_batch_bundle:
_details = self.get_sabb_details(row.serial_and_batch_bundle) self._deduct_consumed_serial_batch(key, row.serial_and_batch_bundle)
if _details.serial_nos:
for sn in _details.serial_nos: def _deduct_consumed_serial_batch(self, key, sabb_name):
self.available_materials[key].serial_nos.remove(sn) _details = self.get_sabb_details(sabb_name)
elif _details.batches: if _details.serial_nos:
# Qty is in negative therefore added insted of subtraction for sn in _details.serial_nos:
for batch_no, qty in _details.batches.items(): self.available_materials[key].serial_nos.remove(sn)
self.available_materials[key].batches[batch_no] += qty elif _details.batches:
for batch_no, qty in _details.batches.items():
# qty is negative, so add instead of subtract
self.available_materials[key].batches[batch_no] += qty
def add_additional_cost(self): def add_additional_cost(self):
if not self.wo_doc: if not self.wo_doc:
@@ -618,46 +596,47 @@ class ManufactureStockEntry(BaseManufactureStockEntry):
def get_secondary_items_from_job_card(self): def get_secondary_items_from_job_card(self):
if not self.wo_doc.operations: if not self.wo_doc.operations:
return [] return []
secondary_items = get_secondary_items_from_job_card(self.doc.work_order, self.doc.job_card) secondary_items = get_secondary_items_from_job_card(self.doc.work_order, self.doc.job_card)
if self.doc.job_card: pending_qty = self._get_pending_secondary_qty()
pending_qty = flt(self.doc.fg_completed_qty)
else:
pending_qty = flt(self.get_completed_job_card_qty()) - flt(self.wo_doc.produced_qty)
used_secondary_items = self.get_used_secondary_items() used_secondary_items = self.get_used_secondary_items()
self._adjust_secondary_item_qtys(secondary_items, used_secondary_items, pending_qty)
return secondary_items
def _get_pending_secondary_qty(self):
if self.doc.job_card:
return flt(self.doc.fg_completed_qty)
return flt(self.get_completed_job_card_qty()) - flt(self.wo_doc.produced_qty)
def _adjust_secondary_item_qtys(self, secondary_items, used_secondary_items, pending_qty):
for row in secondary_items: for row in secondary_items:
row.stock_qty -= flt(used_secondary_items.get(row.item_code)) row.stock_qty -= flt(used_secondary_items.get(row.item_code))
row.stock_qty = (row.stock_qty) * flt(self.doc.fg_completed_qty) / flt(pending_qty) row.stock_qty = row.stock_qty * flt(self.doc.fg_completed_qty) / flt(pending_qty)
if used_secondary_items.get(row.item_code): if used_secondary_items.get(row.item_code):
used_secondary_items[row.item_code] -= row.stock_qty used_secondary_items[row.item_code] -= row.stock_qty
return secondary_items
def get_used_secondary_items(self): def get_used_secondary_items(self):
data = self._query_used_secondary_items()
used_secondary_items = defaultdict(float) used_secondary_items = defaultdict(float)
StockEntry = frappe.qb.DocType("Stock Entry")
StockEntryDetail = frappe.qb.DocType("Stock Entry Detail")
data = (
frappe.qb.from_(StockEntry)
.inner_join(StockEntryDetail)
.on(StockEntryDetail.parent == StockEntry.name)
.select(StockEntryDetail.item_code, StockEntryDetail.qty)
.where(
(StockEntry.work_order == self.doc.work_order)
& ((StockEntryDetail.type.isnotnull()) | (StockEntryDetail.is_legacy_scrap_item == 1))
& (StockEntry.docstatus == 1)
& (StockEntry.purpose.isin(["Repack", "Manufacture"]))
)
).run(as_dict=1)
for row in data: for row in data:
used_secondary_items[row.item_code] += row.qty used_secondary_items[row.item_code] += row.qty
return used_secondary_items return used_secondary_items
def _query_used_secondary_items(self):
se = frappe.qb.DocType("Stock Entry")
sed = frappe.qb.DocType("Stock Entry Detail")
return (
frappe.qb.from_(se)
.inner_join(sed)
.on(sed.parent == se.name)
.select(sed.item_code, sed.qty)
.where(
(se.work_order == self.doc.work_order)
& ((sed.type.isnotnull()) | (sed.is_legacy_scrap_item == 1))
& (se.docstatus == 1)
& (se.purpose.isin(["Repack", "Manufacture"]))
)
).run(as_dict=1)
def get_completed_job_card_qty(self): def get_completed_job_card_qty(self):
return flt(min([d.completed_qty for d in self.wo_doc.operations])) return flt(min([d.completed_qty for d in self.wo_doc.operations]))
@@ -688,21 +667,24 @@ class ManufactureStockEntry(BaseManufactureStockEntry):
def update_job_card_and_work_order(self): def update_job_card_and_work_order(self):
if self.doc.job_card: if self.doc.job_card:
job_doc = frappe.get_doc("Job Card", self.doc.job_card) self._update_job_card_on_manufacture()
job_doc.set_consumed_qty_in_job_card_item(self.doc)
job_doc.set_manufactured_qty()
job_doc.update_work_order()
if self.doc.work_order: if self.doc.work_order:
self._validate_work_order() self._update_work_order_on_manufacture()
if self.doc.fg_completed_qty: def _update_job_card_on_manufacture(self):
self.wo_doc.run_method("update_work_order_qty") job_doc = frappe.get_doc("Job Card", self.doc.job_card)
self.wo_doc.run_method("update_planned_qty") job_doc.set_consumed_qty_in_job_card_item(self.doc)
job_doc.set_manufactured_qty()
job_doc.update_work_order()
self.wo_doc.run_method("update_status") def _update_work_order_on_manufacture(self):
if not self.wo_doc.operations: self._validate_work_order()
self.wo_doc.set_actual_dates() if self.doc.fg_completed_qty:
self.wo_doc.run_method("update_work_order_qty")
self.wo_doc.run_method("update_planned_qty")
self.wo_doc.run_method("update_status")
if not self.wo_doc.operations:
self.wo_doc.set_actual_dates()
class RepackStockEntry(BaseManufactureStockEntry): class RepackStockEntry(BaseManufactureStockEntry):
@@ -809,20 +791,20 @@ def _check_bom_component_qty(doc, bom_items):
def get_bom_items(bom_no, use_multi_level_bom=None, qty=None, fetch_secondary_items=False): def get_bom_items(bom_no, use_multi_level_bom=None, qty=None, fetch_secondary_items=False):
if use_multi_level_bom is None: if use_multi_level_bom is None:
use_multi_level_bom = frappe.get_cached_value("BOM", bom_no, "use_multi_level_bom") use_multi_level_bom = frappe.get_cached_value("BOM", bom_no, "use_multi_level_bom")
qty = qty or 1
if qty is None:
qty = 1
table_name = "BOM Item"
if use_multi_level_bom:
table_name = "BOM Explosion Item"
if fetch_secondary_items: if fetch_secondary_items:
table_name = "BOM Secondary Item" table_name = "BOM Secondary Item"
else:
table_name = "BOM Explosion Item" if use_multi_level_bom else "BOM Item"
items = _run_bom_items_query(bom_no, table_name, qty)
return _deduplicate_bom_items(items)
def _run_bom_items_query(bom_no, table_name, qty):
bom_doc = frappe.qb.DocType("BOM") bom_doc = frappe.qb.DocType("BOM")
doctype = frappe.qb.DocType(table_name) doctype = frappe.qb.DocType(table_name)
query = ( query = (
frappe.qb.from_(doctype) frappe.qb.from_(doctype)
.inner_join(bom_doc) .inner_join(bom_doc)
@@ -838,9 +820,12 @@ def get_bom_items(bom_no, use_multi_level_bom=None, qty=None, fetch_secondary_it
.where((bom_doc.name == bom_no) & (bom_doc.docstatus == 1)) .where((bom_doc.name == bom_no) & (bom_doc.docstatus == 1))
.orderby(doctype.idx) .orderby(doctype.idx)
) )
return _add_bom_table_specific_fields(query, doctype, table_name).run(as_dict=1)
def _add_bom_table_specific_fields(query, doctype, table_name):
if table_name == "BOM Secondary Item": if table_name == "BOM Secondary Item":
query = query.select( return query.select(
doctype.name, doctype.name,
doctype.cost_allocation_per, doctype.cost_allocation_per,
doctype.uom, doctype.uom,
@@ -849,19 +834,20 @@ def get_bom_items(bom_no, use_multi_level_bom=None, qty=None, fetch_secondary_it
doctype.is_legacy, doctype.is_legacy,
doctype.conversion_factor, doctype.conversion_factor,
) )
elif table_name == "BOM Item": if table_name == "BOM Item":
query = query.select( return query.select(
doctype.allow_alternative_item, doctype.uom, doctype.conversion_factor, doctype.bom_no doctype.allow_alternative_item, doctype.uom, doctype.conversion_factor, doctype.bom_no
) )
return query
items = query.run(as_dict=1)
def _deduplicate_bom_items(items):
item_dict = {} item_dict = {}
for item in items: for item in items:
if item.item_code in item_dict: if item.item_code in item_dict:
item_dict[item.item_code].qty += item.qty item_dict[item.item_code].qty += item.qty
else: else:
item_dict[item.item_code] = item item_dict[item.item_code] = item
return list(item_dict.values()) return list(item_dict.values())
@@ -940,72 +926,92 @@ def move_sample_to_retention_warehouse(company: str, items: str | list):
stock_entry.company = company stock_entry.company = company
stock_entry.purpose = "Material Transfer" stock_entry.purpose = "Material Transfer"
stock_entry.set_stock_entry_type() stock_entry.set_stock_entry_type()
for item in items: for item in items:
if item.get("sample_quantity") and item.get("serial_and_batch_bundle"): if item.get("sample_quantity") and item.get("serial_and_batch_bundle"):
warehouse = item.get("t_warehouse") or item.get("warehouse") _process_sample_item(stock_entry, item, retention_warehouse)
total_qty = 0
cls_obj = SerialBatchCreation(
{
"type_of_transaction": "Outward",
"serial_and_batch_bundle": item.get("serial_and_batch_bundle"),
"item_code": item.get("item_code"),
"warehouse": warehouse,
"do_not_save": True,
}
)
sabb = cls_obj.duplicate_package()
batches = get_batch_nos(item.get("serial_and_batch_bundle"))
sabe_list = []
for batch_no in batches.keys():
sample_quantity = validate_sample_quantity(
item.get("item_code"),
item.get("sample_quantity"),
item.get("transfer_qty") or item.get("qty"),
batch_no,
)
sabe = next(item for item in sabb.entries if item.batch_no == batch_no)
if sample_quantity:
if sabb.has_serial_no:
new_sabe = [
entry
for entry in sabb.entries
if entry.batch_no == batch_no
and frappe.db.exists(
"Serial No", {"name": entry.serial_no, "warehouse": warehouse}
)
][: int(sample_quantity)]
sabe_list.extend(new_sabe)
total_qty += len(new_sabe)
else:
total_qty += sample_quantity
sabe.qty = sample_quantity
else:
sabb.entries.remove(sabe)
if total_qty:
if sabe_list:
sabb.entries = sabe_list
sabb.save()
stock_entry.append(
"items",
{
"item_code": item.get("item_code"),
"s_warehouse": warehouse,
"t_warehouse": retention_warehouse,
"qty": total_qty,
"basic_rate": item.get("valuation_rate"),
"uom": item.get("uom"),
"stock_uom": item.get("stock_uom"),
"conversion_factor": item.get("conversion_factor") or 1.0,
"serial_and_batch_bundle": sabb.name,
},
)
if stock_entry.get("items"): if stock_entry.get("items"):
return stock_entry.as_dict() return stock_entry.as_dict()
def _process_sample_item(stock_entry, item, retention_warehouse):
warehouse = item.get("t_warehouse") or item.get("warehouse")
sabb = _duplicate_sample_bundle(item, warehouse)
total_qty, sabe_list = _collect_sample_batches(sabb, item, warehouse)
if total_qty:
_append_sample_entry(stock_entry, sabb, item, warehouse, retention_warehouse, total_qty, sabe_list)
def _duplicate_sample_bundle(item, warehouse):
return SerialBatchCreation(
{
"type_of_transaction": "Outward",
"serial_and_batch_bundle": item.get("serial_and_batch_bundle"),
"item_code": item.get("item_code"),
"warehouse": warehouse,
"do_not_save": True,
}
).duplicate_package()
def _collect_sample_batches(sabb, item, warehouse):
batches = get_batch_nos(item.get("serial_and_batch_bundle"))
sabe_list, total_qty = [], 0
for batch_no in batches.keys():
qty, entries = _process_sample_batch(sabb, item, warehouse, batch_no)
total_qty += qty
sabe_list.extend(entries)
return total_qty, sabe_list
def _process_sample_batch(sabb, item, warehouse, batch_no):
sample_quantity = validate_sample_quantity(
item.get("item_code"),
item.get("sample_quantity"),
item.get("transfer_qty") or item.get("qty"),
batch_no,
)
sabe = next(entry for entry in sabb.entries if entry.batch_no == batch_no)
if not sample_quantity:
sabb.entries.remove(sabe)
return 0, []
return _apply_sample_quantity(sabb, sabe, warehouse, batch_no, sample_quantity)
def _apply_sample_quantity(sabb, sabe, warehouse, batch_no, sample_quantity):
if sabb.has_serial_no:
entries = [
e
for e in sabb.entries
if e.batch_no == batch_no
and frappe.db.exists("Serial No", {"name": e.serial_no, "warehouse": warehouse})
][: int(sample_quantity)]
return len(entries), entries
sabe.qty = sample_quantity
return sample_quantity, []
def _append_sample_entry(stock_entry, sabb, item, warehouse, retention_warehouse, total_qty, sabe_list):
if sabe_list:
sabb.entries = sabe_list
sabb.save()
stock_entry.append(
"items",
{
"item_code": item.get("item_code"),
"s_warehouse": warehouse,
"t_warehouse": retention_warehouse,
"qty": total_qty,
"basic_rate": item.get("valuation_rate"),
"uom": item.get("uom"),
"stock_uom": item.get("stock_uom"),
"conversion_factor": item.get("conversion_factor") or 1.0,
"serial_and_batch_bundle": sabb.name,
},
)
@frappe.whitelist() @frappe.whitelist()
def validate_sample_quantity(item_code: str, sample_quantity: int, qty: float, batch_no: str | None = None): def validate_sample_quantity(item_code: str, sample_quantity: int, qty: float, batch_no: str | None = None):
from erpnext.stock.doctype.batch.batch import get_batch_qty from erpnext.stock.doctype.batch.batch import get_batch_qty
@@ -1014,19 +1020,29 @@ def validate_sample_quantity(item_code: str, sample_quantity: int, qty: float, b
frappe.throw( frappe.throw(
_("Sample quantity {0} cannot be more than received quantity {1}").format(sample_quantity, qty) _("Sample quantity {0} cannot be more than received quantity {1}").format(sample_quantity, qty)
) )
return _adjust_sample_quantity(item_code, sample_quantity, batch_no, get_batch_qty)
def _adjust_sample_quantity(item_code, sample_quantity, batch_no, get_batch_qty):
retention_warehouse = frappe.get_single_value("Stock Settings", "sample_retention_warehouse") retention_warehouse = frappe.get_single_value("Stock Settings", "sample_retention_warehouse")
retainted_qty = 0 retainted_qty = get_batch_qty(batch_no, retention_warehouse, item_code) if batch_no else 0
if batch_no:
retainted_qty = get_batch_qty(batch_no, retention_warehouse, item_code)
max_retain_qty = frappe.get_value("Item", item_code, "sample_quantity") max_retain_qty = frappe.get_value("Item", item_code, "sample_quantity")
if retainted_qty >= max_retain_qty: if retainted_qty >= max_retain_qty:
frappe.msgprint( _warn_max_retained(retainted_qty, batch_no, item_code)
_( return 0
"Maximum Samples - {0} have already been retained for Batch {1} and Item {2} in Batch {3}." return _cap_sample_quantity(sample_quantity, max_retain_qty, retainted_qty, batch_no, item_code)
).format(retainted_qty, batch_no, item_code, batch_no),
alert=True,
) def _warn_max_retained(retainted_qty, batch_no, item_code):
sample_quantity = 0 frappe.msgprint(
_("Maximum Samples - {0} have already been retained for Batch {1} and Item {2} in Batch {3}.").format(
retainted_qty, batch_no, item_code, batch_no
),
alert=True,
)
def _cap_sample_quantity(sample_quantity, max_retain_qty, retainted_qty, batch_no, item_code):
qty_diff = max_retain_qty - retainted_qty qty_diff = max_retain_qty - retainted_qty
if cint(sample_quantity) > cint(qty_diff): if cint(sample_quantity) > cint(qty_diff):
frappe.msgprint( frappe.msgprint(
@@ -1035,6 +1051,5 @@ def validate_sample_quantity(item_code: str, sample_quantity: int, qty: float, b
), ),
alert=True, alert=True,
) )
sample_quantity = qty_diff return qty_diff
return sample_quantity return sample_quantity

View File

@@ -119,131 +119,105 @@ class MaterialTransferForManufactureStockEntry(BaseMaterialTransferStockEntry):
self.doc.append("items", item_dict[item_code]) self.doc.append("items", item_dict[item_code])
def get_pending_raw_materials(self): def get_pending_raw_materials(self):
""" """Return pending raw material qty to transfer, capped at what's still needed."""
issue (item quantity) that is pending to issue or desire to transfer,
whichever is less
"""
item_dict = self.get_work_order_required_items() item_dict = self.get_work_order_required_items()
max_qty = flt(self.wo_doc.qty) max_qty = flt(self.wo_doc.qty)
allow_overproduction = self._is_overproduction_allowed(max_qty)
allow_overproduction = False
overproduction_percentage = flt(
frappe.db.get_single_value("Manufacturing Settings", "overproduction_percentage_for_work_order")
)
transfer_extra_materials_percentage = flt(
frappe.db.get_single_value("Manufacturing Settings", "transfer_extra_materials_percentage")
)
to_transfer_qty = flt(self.wo_doc.material_transferred_for_manufacturing) + flt(
self.doc.fg_completed_qty
)
transfer_limit_qty = max_qty + ((max_qty * overproduction_percentage) / 100)
if transfer_extra_materials_percentage:
transfer_limit_qty = max_qty + ((max_qty * transfer_extra_materials_percentage) / 100)
if transfer_limit_qty >= to_transfer_qty:
allow_overproduction = True
for item, item_details in item_dict.items(): for item, item_details in item_dict.items():
pending_to_issue = flt(item_details.required_qty) - flt(item_details.transferred_qty) item_dict[item]["qty"] = self._calculate_item_transfer_qty(
desire_to_transfer = flt(self.doc.fg_completed_qty) * flt(item_details.required_qty) / max_qty item_details, allow_overproduction, max_qty
)
if (
desire_to_transfer <= pending_to_issue
or (
desire_to_transfer > 0
and self.backflush_based_on == "Material Transferred for Manufacture"
)
or allow_overproduction
):
# "No need for transfer but qty still pending to transfer" case can occur
# when transferring multiple RM in different Stock Entries
item_dict[item]["qty"] = desire_to_transfer if (desire_to_transfer > 0) else pending_to_issue
elif pending_to_issue > 0:
item_dict[item]["qty"] = pending_to_issue
else:
item_dict[item]["qty"] = 0
item_dict[item]["transfer_qty"] = flt(item_dict[item]["qty"]) * flt( item_dict[item]["transfer_qty"] = flt(item_dict[item]["qty"]) * flt(
item_dict[item].get("conversion_factor") or 1 item_dict[item].get("conversion_factor") or 1
) )
# delete items with 0 qty item_dict = {k: v for k, v in item_dict.items() if v["qty"]}
list_of_items = list(item_dict.keys())
for item in list_of_items:
if not item_dict[item]["qty"]:
del item_dict[item]
# show some message if not item_dict:
if not len(item_dict): frappe.msgprint(_("All items have already been transferred for this Work Order."))
frappe.msgprint(_("""All items have already been transferred for this Work Order."""))
return item_dict return item_dict
def get_work_order_required_items(self): def _is_overproduction_allowed(self, max_qty):
""" overproduction_pct = flt(
Gets Work Order Required Items only if Stock Entry purpose is **Material Transferred for Manufacture**. frappe.db.get_single_value("Manufacturing Settings", "overproduction_percentage_for_work_order")
""" )
item_dict, job_card_items = frappe._dict(), [] extra_materials_pct = flt(
work_order = self.wo_doc
consider_job_card = work_order.transfer_material_against == "Job Card" and self.doc.get("job_card")
if consider_job_card:
job_card_items = self.get_job_card_item_codes()
if not frappe.db.get_value("Warehouse", work_order.wip_warehouse, "is_group"):
wip_warehouse = work_order.wip_warehouse
else:
wip_warehouse = None
transfer_extra_materials_percentage = flt(
frappe.db.get_single_value("Manufacturing Settings", "transfer_extra_materials_percentage") frappe.db.get_single_value("Manufacturing Settings", "transfer_extra_materials_percentage")
) )
to_transfer_qty = flt(self.wo_doc.material_transferred_for_manufacturing) + flt(
self.doc.fg_completed_qty
)
limit_pct = extra_materials_pct or overproduction_pct
transfer_limit_qty = max_qty + (max_qty * limit_pct / 100)
return transfer_limit_qty >= to_transfer_qty
def _calculate_item_transfer_qty(self, item_details, allow_overproduction, max_qty):
pending_to_issue = flt(item_details.required_qty) - flt(item_details.transferred_qty)
desire_to_transfer = flt(self.doc.fg_completed_qty) * flt(item_details.required_qty) / max_qty
can_transfer = (
desire_to_transfer <= pending_to_issue
or (desire_to_transfer > 0 and self.backflush_based_on == "Material Transferred for Manufacture")
or allow_overproduction
)
return _resolve_transfer_qty(desire_to_transfer, pending_to_issue, can_transfer)
def get_work_order_required_items(self):
"""Gets Work Order Required Items for Material Transfer for Manufacture."""
work_order = self.wo_doc
consider_job_card = work_order.transfer_material_against == "Job Card" and self.doc.get("job_card")
job_card_items = self.get_job_card_item_codes() if consider_job_card else []
wip_warehouse = self._resolve_wip_warehouse(work_order)
extra_pct = flt(
frappe.db.get_single_value("Manufacturing Settings", "transfer_extra_materials_percentage")
)
item_dict = frappe._dict()
for d in work_order.get("required_items"): for d in work_order.get("required_items"):
if consider_job_card and (d.item_code not in job_card_items): self._add_required_item(
continue item_dict, d, consider_job_card, job_card_items, wip_warehouse, extra_pct, work_order
additional_qty = 0.0
if transfer_extra_materials_percentage:
additional_qty = transfer_extra_materials_percentage * flt(d.required_qty) / 100
transfer_pending = flt(d.required_qty) > flt(d.transferred_qty)
if additional_qty:
transfer_pending = (flt(d.required_qty) + additional_qty) > flt(d.transferred_qty)
can_transfer = transfer_pending or (
self.backflush_based_on == "Material Transferred for Manufacture"
) )
if not can_transfer:
continue
if d.include_item_in_manufacturing:
item_row = d.as_dict()
item_row["idx"] = len(item_dict) + 1
if consider_job_card:
job_card_item = frappe.db.get_value(
"Job Card Item", {"item_code": d.item_code, "parent": self.doc.get("job_card")}
)
item_row["job_card_item"] = job_card_item or None
if d.source_warehouse and not frappe.db.get_value(
"Warehouse", d.source_warehouse, "is_group"
):
item_row["from_warehouse"] = d.source_warehouse
item_row["to_warehouse"] = wip_warehouse
if item_row["allow_alternative_item"]:
item_row["allow_alternative_item"] = work_order.allow_alternative_item
item_dict.setdefault(d.item_code, item_row)
return item_dict return item_dict
def _resolve_wip_warehouse(self, work_order):
if not frappe.db.get_value("Warehouse", work_order.wip_warehouse, "is_group"):
return work_order.wip_warehouse
return None
def _add_required_item(
self, item_dict, d, consider_job_card, job_card_items, wip_warehouse, extra_pct, work_order
):
if consider_job_card and d.item_code not in job_card_items:
return
additional_qty = extra_pct * flt(d.required_qty) / 100 if extra_pct else 0.0
transfer_pending = (
(flt(d.required_qty) + additional_qty) > flt(d.transferred_qty)
if additional_qty
else flt(d.required_qty) > flt(d.transferred_qty)
)
can_transfer = transfer_pending or self.backflush_based_on == "Material Transferred for Manufacture"
if not can_transfer or not d.include_item_in_manufacturing:
return
self._build_required_item_row(item_dict, d, consider_job_card, wip_warehouse, work_order)
def _build_required_item_row(self, item_dict, d, consider_job_card, wip_warehouse, work_order):
item_row = d.as_dict()
item_row["idx"] = len(item_dict) + 1
if consider_job_card:
item_row["job_card_item"] = self._get_job_card_item(d.item_code)
if d.source_warehouse and not frappe.db.get_value("Warehouse", d.source_warehouse, "is_group"):
item_row["from_warehouse"] = d.source_warehouse
item_row["to_warehouse"] = wip_warehouse
if item_row["allow_alternative_item"]:
item_row["allow_alternative_item"] = work_order.allow_alternative_item
item_dict.setdefault(d.item_code, item_row)
def _get_job_card_item(self, item_code):
return (
frappe.db.get_value("Job Card Item", {"item_code": item_code, "parent": self.doc.get("job_card")})
or None
)
def get_job_card_item_codes(self): def get_job_card_item_codes(self):
if not self.doc.get("job_card"): if not self.doc.get("job_card"):
return [] return []
@@ -389,68 +363,73 @@ class MaterialRequestStockEntry(BaseMaterialTransferStockEntry):
return stock_entries, child_list return stock_entries, child_list
def _bulk_update_transferred_qty(self, stock_entries, child_list): def _bulk_update_transferred_qty(self, stock_entries, child_list):
from pypika import Case
sed = frappe.qb.DocType("Stock Entry Detail") sed = frappe.qb.DocType("Stock Entry Detail")
case_expr = Case() case_expr = self._build_case_expr(sed, stock_entries)
for (parent, name), qty in stock_entries.items():
case_expr = case_expr.when((sed.parent == parent) & (sed.name == name), qty)
( (
frappe.qb.update(sed) frappe.qb.update(sed)
.set(sed.transferred_qty, case_expr.else_(sed.transferred_qty)) .set(sed.transferred_qty, case_expr.else_(sed.transferred_qty))
.where(sed.name.isin(child_list)) .where(sed.name.isin(child_list))
).run() ).run()
def _build_case_expr(self, sed, stock_entries):
from pypika import Case
case_expr = Case()
for (parent, name), qty in stock_entries.items():
case_expr = case_expr.when((sed.parent == parent) & (sed.name == name), qty)
return case_expr
def _update_per_transferred_field(self): def _update_per_transferred_field(self):
self.doc._update_percent_field_in_targets( self.doc._update_percent_field_in_targets(self._get_per_transferred_config(), update_modified=True)
{
"source_dt": "Stock Entry Detail", def _get_per_transferred_config(self):
"target_field": "transferred_qty", return {
"target_ref_field": "qty", "source_dt": "Stock Entry Detail",
"target_dt": "Stock Entry Detail", "target_field": "transferred_qty",
"join_field": "ste_detail", "target_ref_field": "qty",
"target_parent_dt": "Stock Entry", "target_dt": "Stock Entry Detail",
"target_parent_field": "per_transferred", "join_field": "ste_detail",
"source_field": "qty", "target_parent_dt": "Stock Entry",
"percent_join_field": "against_stock_entry", "target_parent_field": "per_transferred",
}, "source_field": "qty",
update_modified=True, "percent_join_field": "against_stock_entry",
) }
def set_material_request_transfer_status(self, status): def set_material_request_transfer_status(self, status):
material_requests = [] material_requests = []
parent_se = None parent_se = (
if self.doc.outgoing_stock_entry: frappe.get_value("Stock Entry", self.doc.outgoing_stock_entry, "add_to_transit")
parent_se = frappe.get_value("Stock Entry", self.doc.outgoing_stock_entry, "add_to_transit") if self.doc.outgoing_stock_entry
else None
)
for item in self.doc.items: for item in self.doc.items:
material_request = item.get("material_request") mr = item.get("material_request")
if material_request not in material_requests: if mr not in material_requests and self.doc.outgoing_stock_entry and parent_se:
if self.doc.outgoing_stock_entry and parent_se: mr = frappe.get_value("Stock Entry Detail", item.ste_detail, "material_request")
material_request = frappe.get_value( if mr and mr not in material_requests:
"Stock Entry Detail", item.ste_detail, "material_request" status = self._update_mr_transfer_status(mr, status, material_requests)
)
if material_request and material_request not in material_requests: def _update_mr_transfer_status(self, material_request, status, material_requests):
material_requests.append(material_request) material_requests.append(material_request)
if status == "Completed": if status == "Completed":
qty = get_transferred_qty(material_request) qty = get_transferred_qty(material_request)
if qty.get("transfer_qty") > qty.get("transferred_qty"): if qty.get("transfer_qty") > qty.get("transferred_qty"):
status = "In Transit" status = "In Transit"
frappe.db.set_value("Material Request", material_request, "transfer_status", status)
return status
frappe.db.set_value("Material Request", material_request, "transfer_status", status)
def _resolve_transfer_qty(desire_to_transfer, pending_to_issue, can_transfer):
# "No need for transfer but qty still pending" can occur when transferring multiple RM in different Stock Entries
if can_transfer:
return desire_to_transfer if desire_to_transfer > 0 else pending_to_issue
return pending_to_issue if pending_to_issue > 0 else 0
def get_transferred_qty(material_request): def get_transferred_qty(material_request):
sed = frappe.qb.DocType("Stock Entry Detail") sed = frappe.qb.DocType("Stock Entry Detail")
return (
query = (
frappe.qb.from_(sed) frappe.qb.from_(sed)
.select( .select(Sum(sed.transfer_qty).as_("transfer_qty"), Sum(sed.transferred_qty).as_("transferred_qty"))
Sum(sed.transfer_qty).as_("transfer_qty"),
Sum(sed.transferred_qty).as_("transferred_qty"),
)
.where((sed.material_request == material_request) & (sed.docstatus == 1)) .where((sed.material_request == material_request) & (sed.docstatus == 1))
).run(as_dict=True) ).run(as_dict=True)[0]
return query[0]

View File

@@ -21,64 +21,62 @@ class StockEntrySABB(BaseStockEntry):
already_picked_serial_nos = [] already_picked_serial_nos = []
for row in self.doc.items: for row in self.doc.items:
if row.use_serial_batch_fields: if row.use_serial_batch_fields or not row.s_warehouse:
continue continue
if not row.s_warehouse:
continue
if row.item_code not in serial_or_batch_items: if row.item_code not in serial_or_batch_items:
continue continue
bundle_doc = None bundle_doc = self._create_or_update_bundle_for_row(
if row.serial_and_batch_bundle and abs(row.transfer_qty) != abs( row, serial_nos, batch_nos, already_picked_serial_nos
frappe.get_cached_value("Serial and Batch Bundle", row.serial_and_batch_bundle, "total_qty") )
):
bundle_doc = SerialBatchCreation(
{
"item_code": row.item_code,
"warehouse": row.s_warehouse,
"serial_and_batch_bundle": row.serial_and_batch_bundle,
"type_of_transaction": "Outward",
"ignore_serial_nos": already_picked_serial_nos,
"qty": row.transfer_qty * -1,
}
).update_serial_and_batch_entries(
serial_nos=serial_nos.get(row.name), batch_nos=batch_nos.get(row.name)
)
elif not row.serial_and_batch_bundle and frappe.get_single_value(
"Stock Settings", "auto_create_serial_and_batch_bundle_for_outward"
):
bundle_doc = SerialBatchCreation(
{
"item_code": row.item_code,
"warehouse": row.s_warehouse,
"posting_datetime": get_combine_datetime(
self.doc.posting_date, self.doc.posting_time
),
"voucher_type": self.doc.doctype,
"voucher_detail_no": row.name,
"qty": row.transfer_qty * -1,
"ignore_serial_nos": already_picked_serial_nos,
"type_of_transaction": "Outward",
"company": self.doc.company,
"do_not_submit": True,
}
).make_serial_and_batch_bundle(
serial_nos=serial_nos.get(row.name), batch_nos=batch_nos.get(row.name)
)
if not bundle_doc: if not bundle_doc:
continue continue
for entry in bundle_doc.entries: for entry in bundle_doc.entries:
if not entry.serial_no: if entry.serial_no:
continue already_picked_serial_nos.append(entry.serial_no)
already_picked_serial_nos.append(entry.serial_no)
row.serial_and_batch_bundle = bundle_doc.name row.serial_and_batch_bundle = bundle_doc.name
def _create_or_update_bundle_for_row(self, row, serial_nos, batch_nos, already_picked_serial_nos):
if row.serial_and_batch_bundle and abs(row.transfer_qty) != abs(
frappe.get_cached_value("Serial and Batch Bundle", row.serial_and_batch_bundle, "total_qty")
):
return SerialBatchCreation(
{
"item_code": row.item_code,
"warehouse": row.s_warehouse,
"serial_and_batch_bundle": row.serial_and_batch_bundle,
"type_of_transaction": "Outward",
"ignore_serial_nos": already_picked_serial_nos,
"qty": row.transfer_qty * -1,
}
).update_serial_and_batch_entries(
serial_nos=serial_nos.get(row.name), batch_nos=batch_nos.get(row.name)
)
if not row.serial_and_batch_bundle and frappe.get_single_value(
"Stock Settings", "auto_create_serial_and_batch_bundle_for_outward"
):
return SerialBatchCreation(
{
"item_code": row.item_code,
"warehouse": row.s_warehouse,
"posting_datetime": get_combine_datetime(self.doc.posting_date, self.doc.posting_time),
"voucher_type": self.doc.doctype,
"voucher_detail_no": row.name,
"qty": row.transfer_qty * -1,
"ignore_serial_nos": already_picked_serial_nos,
"type_of_transaction": "Outward",
"company": self.doc.company,
"do_not_submit": True,
}
).make_serial_and_batch_bundle(
serial_nos=serial_nos.get(row.name), batch_nos=batch_nos.get(row.name)
)
return None
def get_serial_nos_and_batches_from_sres(self, scio_detail, only_pending=True): def get_serial_nos_and_batches_from_sres(self, scio_detail, only_pending=True):
serial_nos, batch_nos = [], frappe._dict() serial_nos, batch_nos = [], frappe._dict()
@@ -189,64 +187,67 @@ class StockEntrySABB(BaseStockEntry):
key = (d.item_code, d.s_warehouse) key = (d.item_code, d.s_warehouse)
if details := reservation_entries.get(key): if details := reservation_entries.get(key):
original_qty = d.qty self._apply_batch_reservation_to_item(d, details, new_items_to_add)
if batches := details.get("batch_no"):
for batch_no, qty in batches.items():
if original_qty <= 0:
break
if qty <= 0:
continue
if d.batch_no and original_qty > 0:
new_row = frappe.copy_doc(d)
new_row.name = None
new_row.batch_no = batch_no
new_row.qty = qty
new_row.idx = d.idx + 1
if new_row.batch_no and details.get("batchwise_sn"):
new_row.serial_no = "\n".join(
details.get("batchwise_sn")[new_row.batch_no][: cint(new_row.qty)]
)
new_items_to_add.append(new_row)
original_qty -= qty
batches[batch_no] -= qty
if qty >= d.qty and not d.batch_no:
d.batch_no = batch_no
batches[batch_no] -= d.qty
if d.batch_no and details.get("batchwise_sn"):
d.serial_no = "\n".join(
details.get("batchwise_sn")[d.batch_no][: cint(d.qty)]
)
elif not d.batch_no:
d.batch_no = batch_no
d.qty = qty
original_qty -= qty
batches[batch_no] = 0
if d.batch_no and details.get("batchwise_sn"):
d.serial_no = "\n".join(
details.get("batchwise_sn")[d.batch_no][: cint(d.qty)]
)
if details.get("serial_no"):
d.serial_no = "\n".join(details.get("serial_no")[: cint(d.qty)])
d.use_serial_batch_fields = 1 d.use_serial_batch_fields = 1
for new_row in new_items_to_add: for new_row in new_items_to_add:
self.doc.append("items", new_row) self.doc.append("items", new_row)
self._sort_and_reindex_items()
def _apply_batch_reservation_to_item(self, d, details, new_items_to_add):
original_qty = d.qty
if batches := details.get("batch_no"):
original_qty = self._distribute_batches_to_item(
d, batches, details, new_items_to_add, original_qty
)
if details.get("serial_no"):
d.serial_no = "\n".join(details.get("serial_no")[: cint(d.qty)])
def _distribute_batches_to_item(self, d, batches, details, new_items_to_add, original_qty):
for batch_no, qty in batches.items():
if original_qty <= 0:
break
if qty <= 0:
continue
if d.batch_no:
original_qty, _ = self._make_overflow_batch_row(
d, batches, details, new_items_to_add, batch_no, qty, original_qty
)
else:
self._assign_batch_to_item(d, batches, details, batch_no, qty)
return original_qty
def _make_overflow_batch_row(self, d, batches, details, new_items_to_add, batch_no, qty, original_qty):
new_row = frappe.copy_doc(d)
new_row.name = None
new_row.batch_no = batch_no
new_row.qty = qty
new_row.idx = d.idx + 1
if new_row.batch_no and details.get("batchwise_sn"):
new_row.serial_no = "\n".join(details.get("batchwise_sn")[new_row.batch_no][: cint(new_row.qty)])
new_items_to_add.append(new_row)
batches[batch_no] -= qty
return original_qty - qty, new_row
def _assign_batch_to_item(self, d, batches, details, batch_no, qty):
if qty >= d.qty:
d.batch_no = batch_no
batches[batch_no] -= d.qty
else:
d.batch_no = batch_no
d.qty = qty
batches[batch_no] = 0
if d.batch_no and details.get("batchwise_sn"):
d.serial_no = "\n".join(details.get("batchwise_sn")[d.batch_no][: cint(d.qty)])
def _sort_and_reindex_items(self):
sorted_items = sorted(self.doc.items, key=lambda x: x.item_code) sorted_items = sorted(self.doc.items, key=lambda x: x.item_code)
if self.doc.purpose == "Manufacture": if self.doc.purpose == "Manufacture":
# ensure finished item at last # ensure finished item at last
sorted_items = sorted(sorted_items, key=lambda x: cstr(x.t_warehouse)) sorted_items = sorted(sorted_items, key=lambda x: cstr(x.t_warehouse))
idx = 0 for idx, row in enumerate(sorted_items, start=1):
for row in sorted_items:
idx += 1
row.idx = idx row.idx = idx
self.doc.set("items", sorted_items) self.doc.set("items", sorted_items)
@@ -256,14 +257,17 @@ def create_serial_and_batch_bundle(parent_doc, row, child, type_of_transaction=N
item_details = frappe.get_cached_value( item_details = frappe.get_cached_value(
"Item", child.item_code, ["has_serial_no", "has_batch_no"], as_dict=1 "Item", child.item_code, ["has_serial_no", "has_batch_no"], as_dict=1
) )
if not (item_details.has_serial_no or item_details.has_batch_no): if not (item_details.has_serial_no or item_details.has_batch_no):
return return
doc = _make_bundle_doc(parent_doc, child, type_of_transaction or "Inward")
_populate_bundle_entries(doc, row, child)
if not doc.entries:
return None
return doc.insert(ignore_permissions=True).name
if not type_of_transaction:
type_of_transaction = "Inward"
doc = frappe.get_doc( def _make_bundle_doc(parent_doc, child, type_of_transaction):
return frappe.get_doc(
{ {
"doctype": "Serial and Batch Bundle", "doctype": "Serial and Batch Bundle",
"voucher_type": "Stock Entry", "voucher_type": "Stock Entry",
@@ -275,41 +279,45 @@ def create_serial_and_batch_bundle(parent_doc, row, child, type_of_transaction=N
} }
) )
def _populate_bundle_entries(doc, row, child):
precision = frappe.get_precision("Stock Entry Detail", "qty") precision = frappe.get_precision("Stock Entry Detail", "qty")
if row.serial_nos and row.batches_to_be_consume: if row.serial_nos and row.batches_to_be_consume:
doc.has_serial_no = 1 _append_serial_batch_entries(doc, row, child, precision)
doc.has_batch_no = 1
batchwise_serial_nos = get_batchwise_serial_nos(child.item_code, row)
for batch_no, qty in row.batches_to_be_consume.items():
while flt(qty, precision) > 0:
qty -= 1
doc.append(
"entries",
{
"batch_no": batch_no,
"serial_no": batchwise_serial_nos.get(batch_no).pop(0),
"warehouse": row.warehouse,
"qty": -1,
},
)
elif row.serial_nos: elif row.serial_nos:
doc.has_serial_no = 1 doc.has_serial_no = 1
for serial_no in row.serial_nos: for serial_no in row.serial_nos:
doc.append("entries", {"serial_no": serial_no, "warehouse": row.warehouse, "qty": -1}) doc.append("entries", {"serial_no": serial_no, "warehouse": row.warehouse, "qty": -1})
elif row.batches_to_be_consume: elif row.batches_to_be_consume:
precision = frappe.get_precision("Serial and Batch Entry", "qty") _append_batch_entries(doc, row)
doc.has_batch_no = 1
for batch_no, qty in row.batches_to_be_consume.items():
if flt(qty, precision) > 0:
qty = flt(qty, precision)
doc.append("entries", {"batch_no": batch_no, "warehouse": row.warehouse, "qty": qty * -1})
if not doc.entries:
return None
return doc.insert(ignore_permissions=True).name def _append_serial_batch_entries(doc, row, child, precision):
doc.has_serial_no = 1
doc.has_batch_no = 1
batchwise_serial_nos = get_batchwise_serial_nos(child.item_code, row)
for batch_no, qty in row.batches_to_be_consume.items():
while flt(qty, precision) > 0:
qty -= 1
doc.append(
"entries",
{
"batch_no": batch_no,
"serial_no": batchwise_serial_nos.get(batch_no).pop(0),
"warehouse": row.warehouse,
"qty": -1,
},
)
def _append_batch_entries(doc, row):
precision = frappe.get_precision("Serial and Batch Entry", "qty")
doc.has_batch_no = 1
for batch_no, qty in row.batches_to_be_consume.items():
if flt(qty, precision) > 0:
doc.append(
"entries", {"batch_no": batch_no, "warehouse": row.warehouse, "qty": flt(qty, precision) * -1}
)
def get_batchwise_serial_nos(item_code, row): def get_batchwise_serial_nos(item_code, row):
@@ -329,24 +337,20 @@ def get_batchwise_serial_nos(item_code, row):
@frappe.whitelist() @frappe.whitelist()
def get_expired_batch_items(): def get_expired_batch_items():
from erpnext.stock.doctype.serial_and_batch_bundle.serial_and_batch_bundle import get_auto_batch_nos
expired_batches = get_expired_batches() expired_batches = get_expired_batches()
if not expired_batches: if not expired_batches:
return [] return []
return _enrich_expired_batches_with_stock(expired_batches)
def _enrich_expired_batches_with_stock(expired_batches):
from erpnext.stock.doctype.serial_and_batch_bundle.serial_and_batch_bundle import get_auto_batch_nos
expired_batches_stock = get_auto_batch_nos( expired_batches_stock = get_auto_batch_nos(
frappe._dict( frappe._dict({"batch_no": list(expired_batches.keys()), "for_stock_levels": True})
{
"batch_no": list(expired_batches.keys()),
"for_stock_levels": True,
}
)
) )
for row in expired_batches_stock: for row in expired_batches_stock:
row.update(expired_batches.get(row.batch_no)) row.update(expired_batches.get(row.batch_no))
return expired_batches_stock return expired_batches_stock

View File

@@ -34,21 +34,26 @@ class SendToSubcontractorStockEntry(BaseStockEntry):
self.validate_subcontracting_order_for_transfer(row) self.validate_subcontracting_order_for_transfer(row)
def validate_subcontracting_order_for_bom(self, child_row, subcontract_order): def validate_subcontracting_order_for_bom(self, child_row, subcontract_order):
def get_required_qty(item_code):
return sum(
flt(d.required_qty) for d in subcontract_order.supplied_items if d.rm_item_code == item_code
)
qty_allowance = flt(frappe.db.get_single_value("Buying Settings", "over_transfer_allowance"))
item_code = child_row.original_item or child_row.item_code item_code = child_row.original_item or child_row.item_code
required_qty = get_required_qty(item_code) required_qty = self._get_required_qty_for_bom(item_code, child_row, subcontract_order)
qty_allowance = flt(frappe.db.get_single_value("Buying Settings", "over_transfer_allowance"))
total_allowed = required_qty + (required_qty * qty_allowance / 100)
self._validate_transfer_qty(child_row, item_code, total_allowed)
self._link_rm_detail_if_missing(child_row, item_code)
def _get_required_qty_for_bom(self, item_code, child_row, subcontract_order):
required_qty = sum(
flt(d.required_qty) for d in subcontract_order.supplied_items if d.rm_item_code == item_code
)
if not required_qty and child_row.allow_alternative_item: if not required_qty and child_row.allow_alternative_item:
original_item_code = frappe.get_value( original_item_code = frappe.get_value(
"Item Alternative", {"alternative_item_code": item_code}, "item_code" "Item Alternative", {"alternative_item_code": item_code}, "item_code"
) )
required_qty = get_required_qty(original_item_code) required_qty = sum(
flt(d.required_qty)
for d in subcontract_order.supplied_items
if d.rm_item_code == original_item_code
)
if not required_qty: if not required_qty:
frappe.throw( frappe.throw(
_("Item {0} not found in 'Raw Materials Supplied' table in {1} {2}").format( _("Item {0} not found in 'Raw Materials Supplied' table in {1} {2}").format(
@@ -57,14 +62,15 @@ class SendToSubcontractorStockEntry(BaseStockEntry):
self.doc.get(self.doc.subcontract_data.order_field), self.doc.get(self.doc.subcontract_data.order_field),
) )
) )
return required_qty
total_allowed = required_qty + (required_qty * (qty_allowance / 100)) def _validate_transfer_qty(self, child_row, item_code, total_allowed):
total_supplied = self.get_total_supplied_qty(child_row) total_supplied = self.get_total_supplied_qty(child_row)
total_returned = (
total_returned = 0 self.get_total_returned_qty(child_row)
if self.doc.subcontract_data.order_doctype == "Subcontracting Order": if self.doc.subcontract_data.order_doctype == "Subcontracting Order"
total_returned = self.get_total_returned_qty(child_row) else 0
)
if flt( if flt(
total_supplied + child_row.transfer_qty - total_returned, child_row.precision("transfer_qty") total_supplied + child_row.transfer_qty - total_returned, child_row.precision("transfer_qty")
) > flt(total_allowed, child_row.precision("transfer_qty")): ) > flt(total_allowed, child_row.precision("transfer_qty")):
@@ -77,20 +83,21 @@ class SendToSubcontractorStockEntry(BaseStockEntry):
self.doc.get(self.doc.subcontract_data.order_field), self.doc.get(self.doc.subcontract_data.order_field),
) )
) )
elif not child_row.get(self.doc.subcontract_data.rm_detail_field):
def _link_rm_detail_if_missing(self, child_row, item_code):
if not child_row.get(self.doc.subcontract_data.rm_detail_field):
order_rm_detail = self.get_order_rm_detail(child_row) order_rm_detail = self.get_order_rm_detail(child_row)
if order_rm_detail: if order_rm_detail:
child_row.db_set(self.doc.subcontract_data.rm_detail_field, order_rm_detail) child_row.db_set(self.doc.subcontract_data.rm_detail_field, order_rm_detail)
else: elif not child_row.allow_alternative_item:
if not child_row.allow_alternative_item: frappe.throw(
frappe.throw( _("Row {0}# Item {1} not found in 'Raw Materials Supplied' table in {2} {3}").format(
_("Row {0}# Item {1} not found in 'Raw Materials Supplied' table in {2} {3}").format( child_row.idx,
child_row.idx, item_code,
item_code, self.doc.subcontract_data.order_doctype,
self.doc.subcontract_data.order_doctype, self.doc.get(self.doc.subcontract_data.order_field),
self.doc.get(self.doc.subcontract_data.order_field),
)
) )
)
def validate_subcontracting_order_for_transfer(self, child_row): def validate_subcontracting_order_for_transfer(self, child_row):
if not child_row.subcontracted_item: if not child_row.subcontracted_item:
@@ -106,46 +113,42 @@ class SendToSubcontractorStockEntry(BaseStockEntry):
def get_total_supplied_qty(self, child_row): def get_total_supplied_qty(self, child_row):
se = frappe.qb.DocType("Stock Entry") se = frappe.qb.DocType("Stock Entry")
se_detail = frappe.qb.DocType("Stock Entry Detail") sed = frappe.qb.DocType("Stock Entry Detail")
order_filter = self._get_supplied_qty_order_filter(se, sed, child_row)
return ( return (
frappe.qb.from_(se) frappe.qb.from_(se)
.inner_join(se_detail) .inner_join(sed)
.on(se.name == se_detail.parent) .on(se.name == sed.parent)
.select(Sum(se_detail.transfer_qty)) .select(Sum(sed.transfer_qty))
.where( .where(
(se.purpose == "Send to Subcontractor") (se.purpose == "Send to Subcontractor")
& (se.docstatus == 1) & (se.docstatus == 1)
& (se_detail.item_code == child_row.item_code) & (sed.item_code == child_row.item_code)
& ( & order_filter
(
(se.purchase_order == self.doc.purchase_order)
& (se_detail.po_detail == self.doc.po_detail)
)
if self.doc.subcontract_data.order_doctype == "Purchase Order"
else (
(se.subcontracting_order == self.doc.subcontracting_order)
& (se_detail.sco_rm_detail == child_row.sco_rm_detail)
)
)
) )
).run()[0][0] or 0 ).run()[0][0] or 0
def _get_supplied_qty_order_filter(self, se, sed, child_row):
if self.doc.subcontract_data.order_doctype == "Purchase Order":
return (se.purchase_order == self.doc.purchase_order) & (sed.po_detail == self.doc.po_detail)
return (se.subcontracting_order == self.doc.subcontracting_order) & (
sed.sco_rm_detail == child_row.sco_rm_detail
)
def get_total_returned_qty(self, child_row): def get_total_returned_qty(self, child_row):
se = frappe.qb.DocType("Stock Entry") se = frappe.qb.DocType("Stock Entry")
se_detail = frappe.qb.DocType("Stock Entry Detail") sed = frappe.qb.DocType("Stock Entry Detail")
return ( return (
frappe.qb.from_(se) frappe.qb.from_(se)
.inner_join(se_detail) .inner_join(sed)
.on(se.name == se_detail.parent) .on(se.name == sed.parent)
.select(Sum(se_detail.transfer_qty)) .select(Sum(sed.transfer_qty))
.where( .where(
(se.purpose == "Material Transfer") (se.purpose == "Material Transfer")
& (se.docstatus == 1) & (se.docstatus == 1)
& (se.is_return == 1) & (se.is_return == 1)
& (se_detail.item_code == child_row.item_code) & (sed.item_code == child_row.item_code)
& (se_detail.sco_rm_detail == child_row.sco_rm_detail) & (sed.sco_rm_detail == child_row.sco_rm_detail)
& (se.subcontracting_order == self.doc.subcontracting_order) & (se.subcontracting_order == self.doc.subcontracting_order)
) )
).run()[0][0] or 0 ).run()[0][0] or 0
@@ -169,36 +172,37 @@ class SendToSubcontractorStockEntry(BaseStockEntry):
def update_subcontract_order_supplied_items(self): def update_subcontract_order_supplied_items(self):
if not self.doc.get(self.doc.subcontract_data.order_field): if not self.doc.get(self.doc.subcontract_data.order_field):
return return
order_supplied_items = self._get_order_supplied_items()
supplied_items = self._get_supplied_items_details()
self._update_supplied_items_in_order(order_supplied_items, supplied_items)
self._update_reserved_qty_for_subcontracting(order_supplied_items)
# Get Subcontract Order Supplied Items Details def _get_order_supplied_items(self):
order_supplied_items = frappe.db.get_all( return frappe.db.get_all(
self.doc.subcontract_data.order_supplied_items_field, self.doc.subcontract_data.order_supplied_items_field,
filters={"parent": self.doc.get(self.doc.subcontract_data.order_field)}, filters={"parent": self.doc.get(self.doc.subcontract_data.order_field)},
fields=["name", "rm_item_code", "reserve_warehouse"], fields=["name", "rm_item_code", "reserve_warehouse"],
) )
# Get Items Supplied in Stock Entries against Subcontract Order def _get_supplied_items_details(self):
supplied_items = get_supplied_items( return get_supplied_items(
self.doc.get(self.doc.subcontract_data.order_field), self.doc.get(self.doc.subcontract_data.order_field),
self.doc.subcontract_data.rm_detail_field, self.doc.subcontract_data.rm_detail_field,
self.doc.subcontract_data.order_field, self.doc.subcontract_data.order_field,
) )
def _update_supplied_items_in_order(self, order_supplied_items, supplied_items):
for row in order_supplied_items: for row in order_supplied_items:
key, item = row.name, {} item = supplied_items.get(row.name) or {
if not supplied_items.get(key): "supplied_qty": 0,
# no stock transferred against Subcontract Order Supplied Items row "returned_qty": 0,
item = {"supplied_qty": 0, "returned_qty": 0, "total_supplied_qty": 0} "total_supplied_qty": 0,
else: }
item = supplied_items.get(key)
frappe.db.set_value(self.doc.subcontract_data.order_supplied_items_field, row.name, item) frappe.db.set_value(self.doc.subcontract_data.order_supplied_items_field, row.name, item)
# RM Item-Reserve Warehouse Dict def _update_reserved_qty_for_subcontracting(self, order_supplied_items):
item_wh = {x.get("rm_item_code"): x.get("reserve_warehouse") for x in order_supplied_items} item_wh = {x.get("rm_item_code"): x.get("reserve_warehouse") for x in order_supplied_items}
for d in self.doc.get("items"): for d in self.doc.get("items"):
# Update reserved sub contracted quantity in bin based on Supplied Item Details and
item_code = d.get("original_item") or d.get("item_code") item_code = d.get("original_item") or d.get("item_code")
reserve_warehouse = item_wh.get(item_code) reserve_warehouse = item_wh.get(item_code)
if not (reserve_warehouse and item_code): if not (reserve_warehouse and item_code):