Merge pull request #29296 from ankush/lifo_valuation

feat: LIFO valuation
This commit is contained in:
Ankush Menat
2022-02-08 18:15:56 +05:30
committed by GitHub
7 changed files with 367 additions and 66 deletions

View File

@@ -346,7 +346,7 @@
"fieldname": "valuation_method", "fieldname": "valuation_method",
"fieldtype": "Select", "fieldtype": "Select",
"label": "Valuation Method", "label": "Valuation Method",
"options": "\nFIFO\nMoving Average" "options": "\nFIFO\nMoving Average\nLIFO"
}, },
{ {
"depends_on": "is_stock_item", "depends_on": "is_stock_item",

View File

@@ -99,7 +99,7 @@
"fieldname": "valuation_method", "fieldname": "valuation_method",
"fieldtype": "Select", "fieldtype": "Select",
"label": "Default Valuation Method", "label": "Default Valuation Method",
"options": "FIFO\nMoving Average" "options": "FIFO\nMoving Average\nLIFO"
}, },
{ {
"description": "The percentage you are allowed to receive or deliver more against the quantity ordered. For example, if you have ordered 100 units, and your Allowance is 10%, then you are allowed to receive 110 units.", "description": "The percentage you are allowed to receive or deliver more against the quantity ordered. For example, if you have ordered 100 units, and your Allowance is 10%, then you are allowed to receive 110 units.",
@@ -346,7 +346,7 @@
"index_web_pages_for_search": 1, "index_web_pages_for_search": 1,
"issingle": 1, "issingle": 1,
"links": [], "links": [],
"modified": "2022-02-04 15:33:43.692736", "modified": "2022-02-05 15:33:43.692736",
"modified_by": "Administrator", "modified_by": "Administrator",
"module": "Stock", "module": "Stock",
"name": "Stock Settings", "name": "Stock Settings",

View File

@@ -167,7 +167,7 @@ def get_columns():
{ {
"fieldname": "stock_queue", "fieldname": "stock_queue",
"fieldtype": "Data", "fieldtype": "Data",
"label": "FIFO Queue", "label": "FIFO/LIFO Queue",
}, },
{ {

View File

@@ -16,7 +16,7 @@ from erpnext.stock.utils import (
get_or_make_bin, get_or_make_bin,
get_valuation_method, get_valuation_method,
) )
from erpnext.stock.valuation import FIFOValuation from erpnext.stock.valuation import FIFOValuation, LIFOValuation
class NegativeStockError(frappe.ValidationError): pass class NegativeStockError(frappe.ValidationError): pass
@@ -461,7 +461,7 @@ class update_entries_after(object):
self.wh_data.qty_after_transaction += flt(sle.actual_qty) self.wh_data.qty_after_transaction += flt(sle.actual_qty)
self.wh_data.stock_value = flt(self.wh_data.qty_after_transaction) * flt(self.wh_data.valuation_rate) self.wh_data.stock_value = flt(self.wh_data.qty_after_transaction) * flt(self.wh_data.valuation_rate)
else: else:
self.update_fifo_values(sle) self.update_queue_values(sle)
self.wh_data.qty_after_transaction += flt(sle.actual_qty) self.wh_data.qty_after_transaction += flt(sle.actual_qty)
# rounding as per precision # rounding as per precision
@@ -701,14 +701,18 @@ class update_entries_after(object):
sle.voucher_type, sle.voucher_no, self.allow_zero_rate, sle.voucher_type, sle.voucher_no, self.allow_zero_rate,
currency=erpnext.get_company_currency(sle.company), company=sle.company) currency=erpnext.get_company_currency(sle.company), company=sle.company)
def update_fifo_values(self, sle): def update_queue_values(self, sle):
incoming_rate = flt(sle.incoming_rate) incoming_rate = flt(sle.incoming_rate)
actual_qty = flt(sle.actual_qty) actual_qty = flt(sle.actual_qty)
outgoing_rate = flt(sle.outgoing_rate) outgoing_rate = flt(sle.outgoing_rate)
fifo_queue = FIFOValuation(self.wh_data.stock_queue) if self.valuation_method == "LIFO":
stock_queue = LIFOValuation(self.wh_data.stock_queue)
else:
stock_queue = FIFOValuation(self.wh_data.stock_queue)
if actual_qty > 0: if actual_qty > 0:
fifo_queue.add_stock(qty=actual_qty, rate=incoming_rate) stock_queue.add_stock(qty=actual_qty, rate=incoming_rate)
else: else:
def rate_generator() -> float: def rate_generator() -> float:
allow_zero_valuation_rate = self.check_if_allow_zero_valuation_rate(sle.voucher_type, sle.voucher_detail_no) allow_zero_valuation_rate = self.check_if_allow_zero_valuation_rate(sle.voucher_type, sle.voucher_detail_no)
@@ -719,11 +723,11 @@ class update_entries_after(object):
else: else:
return 0.0 return 0.0
fifo_queue.remove_stock(qty=abs(actual_qty), outgoing_rate=outgoing_rate, rate_generator=rate_generator) stock_queue.remove_stock(qty=abs(actual_qty), outgoing_rate=outgoing_rate, rate_generator=rate_generator)
stock_qty, stock_value = fifo_queue.get_total_stock_and_value() stock_qty, stock_value = stock_queue.get_total_stock_and_value()
self.wh_data.stock_queue = fifo_queue.get_state() self.wh_data.stock_queue = stock_queue.state
self.wh_data.stock_value = stock_value self.wh_data.stock_value = stock_value
if stock_qty: if stock_qty:
self.wh_data.valuation_rate = stock_value / stock_qty self.wh_data.valuation_rate = stock_value / stock_qty

View File

@@ -1,16 +1,21 @@
import json
import unittest import unittest
import frappe
from hypothesis import given from hypothesis import given
from hypothesis import strategies as st from hypothesis import strategies as st
from erpnext.stock.valuation import FIFOValuation, _round_off_if_near_zero from erpnext.stock.doctype.item.test_item import make_item
from erpnext.stock.doctype.stock_entry.stock_entry_utils import make_stock_entry
from erpnext.stock.valuation import FIFOValuation, LIFOValuation, _round_off_if_near_zero
from erpnext.tests.utils import ERPNextTestCase
qty_gen = st.floats(min_value=-1e6, max_value=1e6) qty_gen = st.floats(min_value=-1e6, max_value=1e6)
value_gen = st.floats(min_value=1, max_value=1e6) value_gen = st.floats(min_value=1, max_value=1e6)
stock_queue_generator = st.lists(st.tuples(qty_gen, value_gen), min_size=10) stock_queue_generator = st.lists(st.tuples(qty_gen, value_gen), min_size=10)
class TestFifoValuation(unittest.TestCase): class TestFIFOValuation(unittest.TestCase):
def setUp(self): def setUp(self):
self.queue = FIFOValuation([]) self.queue = FIFOValuation([])
@@ -164,3 +169,184 @@ class TestFifoValuation(unittest.TestCase):
total_value -= sum(q * r for q, r in consumed) total_value -= sum(q * r for q, r in consumed)
self.assertTotalQty(total_qty) self.assertTotalQty(total_qty)
self.assertTotalValue(total_value) self.assertTotalValue(total_value)
class TestLIFOValuation(unittest.TestCase):
def setUp(self):
self.stack = LIFOValuation([])
def tearDown(self):
qty, value = self.stack.get_total_stock_and_value()
self.assertTotalQty(qty)
self.assertTotalValue(value)
def assertTotalQty(self, qty):
self.assertAlmostEqual(sum(q for q, _ in self.stack), qty, msg=f"stack: {self.stack}", places=4)
def assertTotalValue(self, value):
self.assertAlmostEqual(sum(q * r for q, r in self.stack), value, msg=f"stack: {self.stack}", places=2)
def test_simple_addition(self):
self.stack.add_stock(1, 10)
self.assertTotalQty(1)
def test_merge_new_stock(self):
self.stack.add_stock(1, 10)
self.stack.add_stock(1, 10)
self.assertEqual(self.stack, [[2, 10]])
def test_simple_removal(self):
self.stack.add_stock(1, 10)
self.stack.remove_stock(1)
self.assertTotalQty(0)
def test_adding_negative_stock_keeps_rate(self):
self.stack = LIFOValuation([[-5.0, 100]])
self.stack.add_stock(1, 10)
self.assertEqual(self.stack, [[-4, 100]])
def test_adding_negative_stock_updates_rate(self):
self.stack = LIFOValuation([[-5.0, 100]])
self.stack.add_stock(6, 10)
self.assertEqual(self.stack, [[1, 10]])
def test_rounding_off(self):
self.stack.add_stock(1.0, 1.0)
self.stack.remove_stock(1.0 - 1e-9)
self.assertTotalQty(0)
def test_lifo_consumption(self):
self.stack.add_stock(10, 10)
self.stack.add_stock(10, 20)
consumed = self.stack.remove_stock(15)
self.assertEqual(consumed, [[10, 20], [5, 10]])
self.assertTotalQty(5)
def test_lifo_consumption_going_negative(self):
self.stack.add_stock(10, 10)
self.stack.add_stock(10, 20)
consumed = self.stack.remove_stock(25)
self.assertEqual(consumed, [[10, 20], [10, 10], [5, 10]])
self.assertTotalQty(-5)
def test_lifo_consumption_multiple(self):
self.stack.add_stock(1, 1)
self.stack.add_stock(2, 2)
consumed = self.stack.remove_stock(1)
self.assertEqual(consumed, [[1, 2]])
self.stack.add_stock(3, 3)
consumed = self.stack.remove_stock(4)
self.assertEqual(consumed, [[3, 3], [1, 2]])
self.stack.add_stock(4, 4)
consumed = self.stack.remove_stock(5)
self.assertEqual(consumed, [[4, 4], [1, 1]])
self.stack.add_stock(5, 5)
consumed = self.stack.remove_stock(5)
self.assertEqual(consumed, [[5, 5]])
@given(stock_queue_generator)
def test_lifo_qty_hypothesis(self, stock_stack):
self.stack = LIFOValuation([])
total_qty = 0
for qty, rate in stock_stack:
if qty == 0:
continue
if qty > 0:
self.stack.add_stock(qty, rate)
total_qty += qty
else:
qty = abs(qty)
consumed = self.stack.remove_stock(qty)
self.assertAlmostEqual(qty, sum(q for q, _ in consumed), msg=f"incorrect consumption {consumed}")
total_qty -= qty
self.assertTotalQty(total_qty)
@given(stock_queue_generator)
def test_lifo_qty_value_nonneg_hypothesis(self, stock_stack):
self.stack = LIFOValuation([])
total_qty = 0.0
total_value = 0.0
for qty, rate in stock_stack:
# don't allow negative stock
if qty == 0 or total_qty + qty < 0 or abs(qty) < 0.1:
continue
if qty > 0:
self.stack.add_stock(qty, rate)
total_qty += qty
total_value += qty * rate
else:
qty = abs(qty)
consumed = self.stack.remove_stock(qty)
self.assertAlmostEqual(qty, sum(q for q, _ in consumed), msg=f"incorrect consumption {consumed}")
total_qty -= qty
total_value -= sum(q * r for q, r in consumed)
self.assertTotalQty(total_qty)
self.assertTotalValue(total_value)
class TestLIFOValuationSLE(ERPNextTestCase):
ITEM_CODE = "_Test LIFO item"
WAREHOUSE = "_Test Warehouse - _TC"
@classmethod
def setUpClass(cls) -> None:
super().setUpClass()
make_item(cls.ITEM_CODE, {"valuation_method": "LIFO"})
def _make_stock_entry(self, qty, rate=None):
kwargs = {
"item_code": self.ITEM_CODE,
"from_warehouse" if qty < 0 else "to_warehouse": self.WAREHOUSE,
"rate": rate,
"qty": abs(qty),
}
return make_stock_entry(**kwargs)
def assertStockQueue(self, se, expected_queue):
sle_name = frappe.db.get_value("Stock Ledger Entry", {"voucher_no": se.name, "is_cancelled": 0, "voucher_type": "Stock Entry"})
sle = frappe.get_doc("Stock Ledger Entry", sle_name)
stock_queue = json.loads(sle.stock_queue)
total_qty, total_value = LIFOValuation(stock_queue).get_total_stock_and_value()
self.assertEqual(sle.qty_after_transaction, total_qty)
self.assertEqual(sle.stock_value, total_value)
if total_qty > 0:
self.assertEqual(stock_queue, expected_queue)
def test_lifo_values(self):
in1 = self._make_stock_entry(1, 1)
self.assertStockQueue(in1, [[1, 1]])
in2 = self._make_stock_entry(2, 2)
self.assertStockQueue(in2, [[1, 1], [2, 2]])
out1 = self._make_stock_entry(-1)
self.assertStockQueue(out1, [[1, 1], [1, 2]])
in3 = self._make_stock_entry(3, 3)
self.assertStockQueue(in3, [[1, 1], [1, 2], [3, 3]])
out2 = self._make_stock_entry(-4)
self.assertStockQueue(out2, [[1, 1]])
in4 = self._make_stock_entry(4, 4)
self.assertStockQueue(in4, [[1, 1], [4,4]])
out3 = self._make_stock_entry(-5)
self.assertStockQueue(out3, [])
in5 = self._make_stock_entry(5, 5)
self.assertStockQueue(in5, [[5, 5]])
out5 = self._make_stock_entry(-5)
self.assertStockQueue(out5, [])

View File

@@ -9,6 +9,7 @@ from frappe import _
from frappe.utils import cstr, flt, get_link_to_form, nowdate, nowtime from frappe.utils import cstr, flt, get_link_to_form, nowdate, nowtime
import erpnext import erpnext
from erpnext.stock.valuation import FIFOValuation, LIFOValuation
class InvalidWarehouseCompany(frappe.ValidationError): pass class InvalidWarehouseCompany(frappe.ValidationError): pass
@@ -228,10 +229,10 @@ def get_incoming_rate(args, raise_error_if_no_rate=True):
else: else:
valuation_method = get_valuation_method(args.get("item_code")) valuation_method = get_valuation_method(args.get("item_code"))
previous_sle = get_previous_sle(args) previous_sle = get_previous_sle(args)
if valuation_method == 'FIFO': if valuation_method in ('FIFO', 'LIFO'):
if previous_sle: if previous_sle:
previous_stock_queue = json.loads(previous_sle.get('stock_queue', '[]') or '[]') previous_stock_queue = json.loads(previous_sle.get('stock_queue', '[]') or '[]')
in_rate = get_fifo_rate(previous_stock_queue, args.get("qty") or 0) if previous_stock_queue else 0 in_rate = _get_fifo_lifo_rate(previous_stock_queue, args.get("qty") or 0, valuation_method) if previous_stock_queue else 0
elif valuation_method == 'Moving Average': elif valuation_method == 'Moving Average':
in_rate = previous_sle.get('valuation_rate') or 0 in_rate = previous_sle.get('valuation_rate') or 0
@@ -261,29 +262,25 @@ def get_valuation_method(item_code):
def get_fifo_rate(previous_stock_queue, qty): def get_fifo_rate(previous_stock_queue, qty):
"""get FIFO (average) Rate from Queue""" """get FIFO (average) Rate from Queue"""
if flt(qty) >= 0: return _get_fifo_lifo_rate(previous_stock_queue, qty, "FIFO")
total = sum(f[0] for f in previous_stock_queue)
return sum(flt(f[0]) * flt(f[1]) for f in previous_stock_queue) / flt(total) if total else 0.0
else:
available_qty_for_outgoing, outgoing_cost = 0, 0
qty_to_pop = abs(flt(qty))
while qty_to_pop and previous_stock_queue:
batch = previous_stock_queue[0]
if 0 < batch[0] <= qty_to_pop:
# if batch qty > 0
# not enough or exactly same qty in current batch, clear batch
available_qty_for_outgoing += flt(batch[0])
outgoing_cost += flt(batch[0]) * flt(batch[1])
qty_to_pop -= batch[0]
previous_stock_queue.pop(0)
else:
# all from current batch
available_qty_for_outgoing += flt(qty_to_pop)
outgoing_cost += flt(qty_to_pop) * flt(batch[1])
batch[0] -= qty_to_pop
qty_to_pop = 0
return outgoing_cost / available_qty_for_outgoing def get_lifo_rate(previous_stock_queue, qty):
"""get LIFO (average) Rate from Queue"""
return _get_fifo_lifo_rate(previous_stock_queue, qty, "LIFO")
def _get_fifo_lifo_rate(previous_stock_queue, qty, method):
ValuationKlass = LIFOValuation if method == "LIFO" else FIFOValuation
stock_queue = ValuationKlass(previous_stock_queue)
if flt(qty) >= 0:
total_qty, total_value = stock_queue.get_total_stock_and_value()
return total_value / total_qty if total_qty else 0.0
else:
popped_bins = stock_queue.remove_stock(abs(flt(qty)))
total_qty, total_value = ValuationKlass(popped_bins).get_total_stock_and_value()
return total_value / total_qty if total_qty else 0.0
def get_valid_serial_nos(sr_nos, qty=0, item_code=''): def get_valid_serial_nos(sr_nos, qty=0, item_code=''):
"""split serial nos, validate and return list of valid serial nos""" """split serial nos, validate and return list of valid serial nos"""

View File

@@ -1,15 +1,54 @@
from abc import ABC, abstractmethod, abstractproperty
from typing import Callable, List, NewType, Optional, Tuple from typing import Callable, List, NewType, Optional, Tuple
from frappe.utils import flt from frappe.utils import flt
FifoBin = NewType("FifoBin", List[float]) StockBin = NewType("StockBin", List[float]) # [[qty, rate], ...]
# Indexes of values inside FIFO bin 2-tuple # Indexes of values inside FIFO bin 2-tuple
QTY = 0 QTY = 0
RATE = 1 RATE = 1
class FIFOValuation: class BinWiseValuation(ABC):
@abstractmethod
def add_stock(self, qty: float, rate: float) -> None:
pass
@abstractmethod
def remove_stock(
self, qty: float, outgoing_rate: float = 0.0, rate_generator: Callable[[], float] = None
) -> List[StockBin]:
pass
@abstractproperty
def state(self) -> List[StockBin]:
pass
def get_total_stock_and_value(self) -> Tuple[float, float]:
total_qty = 0.0
total_value = 0.0
for qty, rate in self.state:
total_qty += flt(qty)
total_value += flt(qty) * flt(rate)
return _round_off_if_near_zero(total_qty), _round_off_if_near_zero(total_value)
def __repr__(self):
return str(self.state)
def __iter__(self):
return iter(self.state)
def __eq__(self, other):
if isinstance(other, list):
return self.state == other
return type(self) == type(other) and self.state == other.state
class FIFOValuation(BinWiseValuation):
"""Valuation method where a queue of all the incoming stock is maintained. """Valuation method where a queue of all the incoming stock is maintained.
New stock is added at end of the queue. New stock is added at end of the queue.
@@ -24,34 +63,14 @@ class FIFOValuation:
# ref: https://docs.python.org/3/reference/datamodel.html#slots # ref: https://docs.python.org/3/reference/datamodel.html#slots
__slots__ = ["queue",] __slots__ = ["queue",]
def __init__(self, state: Optional[List[FifoBin]]): def __init__(self, state: Optional[List[StockBin]]):
self.queue: List[FifoBin] = state if state is not None else [] self.queue: List[StockBin] = state if state is not None else []
def __repr__(self): @property
return str(self.queue) def state(self) -> List[StockBin]:
def __iter__(self):
return iter(self.queue)
def __eq__(self, other):
if isinstance(other, list):
return self.queue == other
return self.queue == other.queue
def get_state(self) -> List[FifoBin]:
"""Get current state of queue.""" """Get current state of queue."""
return self.queue return self.queue
def get_total_stock_and_value(self) -> Tuple[float, float]:
total_qty = 0.0
total_value = 0.0
for qty, rate in self.queue:
total_qty += flt(qty)
total_value += flt(qty) * flt(rate)
return _round_off_if_near_zero(total_qty), _round_off_if_near_zero(total_value)
def add_stock(self, qty: float, rate: float) -> None: def add_stock(self, qty: float, rate: float) -> None:
"""Update fifo queue with new stock. """Update fifo queue with new stock.
@@ -78,7 +97,7 @@ class FIFOValuation:
def remove_stock( def remove_stock(
self, qty: float, outgoing_rate: float = 0.0, rate_generator: Callable[[], float] = None self, qty: float, outgoing_rate: float = 0.0, rate_generator: Callable[[], float] = None
) -> List[FifoBin]: ) -> List[StockBin]:
"""Remove stock from the queue and return popped bins. """Remove stock from the queue and return popped bins.
args: args:
@@ -136,6 +155,101 @@ class FIFOValuation:
return consumed_bins return consumed_bins
class LIFOValuation(BinWiseValuation):
"""Valuation method where a *stack* of all the incoming stock is maintained.
New stock is added at top of the stack.
Qty consumption happens on Last In First Out basis.
Stack is implemented using "bins" of [qty, rate].
ref: https://en.wikipedia.org/wiki/FIFO_and_LIFO_accounting
Implementation detail: appends and pops both at end of list.
"""
# specifying the attributes to save resources
# ref: https://docs.python.org/3/reference/datamodel.html#slots
__slots__ = ["stack",]
def __init__(self, state: Optional[List[StockBin]]):
self.stack: List[StockBin] = state if state is not None else []
@property
def state(self) -> List[StockBin]:
"""Get current state of stack."""
return self.stack
def add_stock(self, qty: float, rate: float) -> None:
"""Update lifo stack with new stock.
args:
qty: new quantity to add
rate: incoming rate of new quantity.
Behaviour of this is same as FIFO valuation.
"""
if not len(self.stack):
self.stack.append([0, 0])
# last row has the same rate, merge new bin.
if self.stack[-1][RATE] == rate:
self.stack[-1][QTY] += qty
else:
# Item has a positive balance qty, add new entry
if self.stack[-1][QTY] > 0:
self.stack.append([qty, rate])
else: # negative balance qty
qty = self.stack[-1][QTY] + qty
if qty > 0: # new balance qty is positive
self.stack[-1] = [qty, rate]
else: # new balance qty is still negative, maintain same rate
self.stack[-1][QTY] = qty
def remove_stock(
self, qty: float, outgoing_rate: float = 0.0, rate_generator: Callable[[], float] = None
) -> List[StockBin]:
"""Remove stock from the stack and return popped bins.
args:
qty: quantity to remove
rate: outgoing rate - ignored. Kept for backwards compatibility.
rate_generator: function to be called if stack is not found and rate is required.
"""
if not rate_generator:
rate_generator = lambda : 0.0 # noqa
consumed_bins = []
while qty:
if not len(self.stack):
# rely on rate generator.
self.stack.append([0, rate_generator()])
# start at the end.
index = -1
stock_bin = self.stack[index]
if qty >= stock_bin[QTY]:
# consume current bin
qty = _round_off_if_near_zero(qty - stock_bin[QTY])
to_consume = self.stack.pop(index)
consumed_bins.append(list(to_consume))
if not self.stack and qty:
# stock finished, qty still remains to be withdrawn
# negative stock, keep in as a negative bin
self.stack.append([-qty, outgoing_rate or stock_bin[RATE]])
consumed_bins.append([qty, outgoing_rate or stock_bin[RATE]])
break
else:
# qty found in current bin consume it and exit
stock_bin[QTY] = _round_off_if_near_zero(stock_bin[QTY] - qty)
consumed_bins.append([qty, stock_bin[RATE]])
qty = 0
return consumed_bins
def _round_off_if_near_zero(number: float, precision: int = 7) -> float: def _round_off_if_near_zero(number: float, precision: int = 7) -> float:
"""Rounds off the number to zero only if number is close to zero for decimal """Rounds off the number to zero only if number is close to zero for decimal
specified in precision. Precision defaults to 7. specified in precision. Precision defaults to 7.