flectra/addons/stock/models/stock_inventory.py

465 lines
21 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
2018-01-16 11:34:37 +01:00
# Part of Odoo, Flectra. See LICENSE file for full copyright and licensing details.
2018-01-16 11:34:37 +01:00
from flectra import api, fields, models, _
from flectra.addons import decimal_precision as dp
from flectra.exceptions import UserError
from flectra.tools import float_utils
class Inventory(models.Model):
_name = "stock.inventory"
_description = "Inventory"
_order = "date desc, id desc"
@api.model
def _default_location_id(self):
company_user = self.env.user.company_id
warehouse = self.env['stock.warehouse'].search([('company_id', '=', company_user.id)], limit=1)
if warehouse:
return warehouse.lot_stock_id.id
else:
raise UserError(_('You must define a warehouse for the company: %s.') % (company_user.name,))
name = fields.Char(
'Inventory Reference',
readonly=True, required=True,
states={'draft': [('readonly', False)]})
date = fields.Datetime(
'Inventory Date',
readonly=True, required=True,
default=fields.Datetime.now,
help="The date that will be used for the stock level check of the products and the validation of the stock move related to this inventory.")
line_ids = fields.One2many(
'stock.inventory.line', 'inventory_id', string='Inventories',
copy=True, readonly=False,
states={'done': [('readonly', True)]})
move_ids = fields.One2many(
'stock.move', 'inventory_id', string='Created Moves',
states={'done': [('readonly', True)]})
state = fields.Selection(string='Status', selection=[
('draft', 'Draft'),
('cancel', 'Cancelled'),
('confirm', 'In Progress'),
('done', 'Validated')],
copy=False, index=True, readonly=True,
default='draft')
company_id = fields.Many2one(
'res.company', 'Company',
readonly=True, index=True, required=True,
states={'draft': [('readonly', False)]},
default=lambda self: self.env['res.company']._company_default_get('stock.inventory'))
location_id = fields.Many2one(
'stock.location', 'Inventoried Location',
readonly=True, required=True,
states={'draft': [('readonly', False)]},
default=_default_location_id)
branch_id = fields.Many2one('res.branch', 'Branch',
related='location_id.branch_id', index=True, readonly=True,
store=True)
product_id = fields.Many2one(
'product.product', 'Inventoried Product',
readonly=True,
states={'draft': [('readonly', False)]},
help="Specify Product to focus your inventory on a particular Product.")
package_id = fields.Many2one(
'stock.quant.package', 'Inventoried Pack',
readonly=True,
states={'draft': [('readonly', False)]},
help="Specify Pack to focus your inventory on a particular Pack.")
partner_id = fields.Many2one(
'res.partner', 'Inventoried Owner',
readonly=True,
states={'draft': [('readonly', False)]},
help="Specify Owner to focus your inventory on a particular Owner.")
lot_id = fields.Many2one(
'stock.production.lot', 'Inventoried Lot/Serial Number',
copy=False, readonly=True,
states={'draft': [('readonly', False)]},
help="Specify Lot/Serial Number to focus your inventory on a particular Lot/Serial Number.")
filter = fields.Selection(
string='Inventory of', selection='_selection_filter',
required=True,
default='none',
help="If you do an entire inventory, you can choose 'All Products' and it will prefill the inventory with the current stock. If you only do some products "
"(e.g. Cycle Counting) you can choose 'Manual Selection of Products' and the system won't propose anything. You can also let the "
"system propose for a single product / lot /... ")
total_qty = fields.Float('Total Quantity', compute='_compute_total_qty')
category_id = fields.Many2one(
'product.category', 'Inventoried Category',
readonly=True, states={'draft': [('readonly', False)]},
help="Specify Product Category to focus your inventory on a particular Category.")
exhausted = fields.Boolean('Include Exhausted Products', readonly=True, states={'draft': [('readonly', False)]})
@api.one
@api.depends('product_id', 'line_ids.product_qty')
def _compute_total_qty(self):
""" For single product inventory, total quantity of the counted """
if self.product_id:
self.total_qty = sum(self.mapped('line_ids').mapped('product_qty'))
else:
self.total_qty = 0
@api.multi
def unlink(self):
for inventory in self:
if inventory.state == 'done':
raise UserError(_('You cannot delete a validated inventory adjustement.'))
return super(Inventory, self).unlink()
@api.model
def _selection_filter(self):
""" Get the list of filter allowed according to the options checked
in 'Settings\Warehouse'. """
res_filter = [
('none', _('All products')),
('category', _('One product category')),
('product', _('One product only')),
('partial', _('Select products manually'))]
if self.user_has_groups('stock.group_tracking_owner'):
res_filter += [('owner', _('One owner only')), ('product_owner', _('One product for a specific owner'))]
if self.user_has_groups('stock.group_production_lot'):
res_filter.append(('lot', _('One Lot/Serial Number')))
if self.user_has_groups('stock.group_tracking_lot'):
res_filter.append(('pack', _('A Pack')))
return res_filter
@api.onchange('filter')
def onchange_filter(self):
if self.filter not in ('product', 'product_owner'):
self.product_id = False
if self.filter != 'lot':
self.lot_id = False
if self.filter not in ('owner', 'product_owner'):
self.partner_id = False
if self.filter != 'pack':
self.package_id = False
if self.filter != 'category':
self.category_id = False
if self.filter == 'product':
self.exhausted = True
@api.onchange('location_id')
def onchange_location_id(self):
if self.location_id.company_id:
self.company_id = self.location_id.company_id
@api.one
@api.constrains('filter', 'product_id', 'lot_id', 'partner_id', 'package_id')
def _check_filter_product(self):
if self.filter == 'none' and self.product_id and self.location_id and self.lot_id:
return
if self.filter not in ('product', 'product_owner') and self.product_id:
raise UserError(_('The selected inventory options are not coherent.'))
if self.filter != 'lot' and self.lot_id:
raise UserError(_('The selected inventory options are not coherent.'))
if self.filter not in ('owner', 'product_owner') and self.partner_id:
raise UserError(_('The selected inventory options are not coherent.'))
if self.filter != 'pack' and self.package_id:
raise UserError(_('The selected inventory options are not coherent.'))
def action_reset_product_qty(self):
self.mapped('line_ids').write({'product_qty': 0})
return True
def action_done(self):
negative = next((line for line in self.mapped('line_ids') if line.product_qty < 0 and line.product_qty != line.theoretical_qty), False)
if negative:
raise UserError(_('You cannot set a negative product quantity in an inventory line:\n\t%s - qty: %s') % (negative.product_id.name, negative.product_qty))
self.action_check()
self.write({'state': 'done'})
self.post_inventory()
return True
def post_inventory(self):
# The inventory is posted as a single step which means quants cannot be moved from an internal location to another using an inventory
# as they will be moved to inventory loss, and other quants will be created to the encoded quant location. This is a normal behavior
# as quants cannot be reuse from inventory location (users can still manually move the products before/after the inventory if they want).
self.mapped('move_ids').filtered(lambda move: move.state != 'done')._action_done()
def action_check(self):
""" Checks the inventory and computes the stock move to do """
# tde todo: clean after _generate_moves
for inventory in self.filtered(lambda x: x.state not in ('done','cancel')):
# first remove the existing stock moves linked to this inventory
inventory.mapped('move_ids').unlink()
inventory.line_ids._generate_moves()
def action_cancel_draft(self):
self.mapped('move_ids')._action_cancel()
self.write({
'line_ids': [(5,)],
'state': 'draft'
})
def action_start(self):
for inventory in self.filtered(lambda x: x.state not in ('done','cancel')):
vals = {'state': 'confirm', 'date': fields.Datetime.now()}
if (inventory.filter != 'partial') and not inventory.line_ids:
vals.update({'line_ids': [(0, 0, line_values) for line_values in inventory._get_inventory_lines_values()]})
inventory.write(vals)
return True
def action_inventory_line_tree(self):
action = self.env.ref('stock.action_inventory_line_tree').read()[0]
action['context'] = {
'default_location_id': self.location_id.id,
'default_product_id': self.product_id.id,
'default_prod_lot_id': self.lot_id.id,
'default_package_id': self.package_id.id,
'default_partner_id': self.partner_id.id,
'default_inventory_id': self.id,
}
return action
def _get_inventory_lines_values(self):
# TDE CLEANME: is sql really necessary ? I don't think so
locations = self.env['stock.location'].search([('id', 'child_of', [self.location_id.id])])
domain = ' location_id in %s'
args = (tuple(locations.ids),)
vals = []
Product = self.env['product.product']
# Empty recordset of products available in stock_quants
quant_products = self.env['product.product']
# Empty recordset of products to filter
products_to_filter = self.env['product.product']
# case 0: Filter on company
if self.company_id:
domain += ' AND company_id = %s'
args += (self.company_id.id,)
#case 1: Filter on One owner only or One product for a specific owner
if self.partner_id:
domain += ' AND owner_id = %s'
args += (self.partner_id.id,)
#case 2: Filter on One Lot/Serial Number
if self.lot_id:
domain += ' AND lot_id = %s'
args += (self.lot_id.id,)
#case 3: Filter on One product
if self.product_id:
domain += ' AND product_id = %s'
args += (self.product_id.id,)
products_to_filter |= self.product_id
#case 4: Filter on A Pack
if self.package_id:
domain += ' AND package_id = %s'
args += (self.package_id.id,)
#case 5: Filter on One product category + Exahausted Products
if self.category_id:
categ_products = Product.search([('categ_id', '=', self.category_id.id)])
domain += ' AND product_id = ANY (%s)'
args += (categ_products.ids,)
products_to_filter |= categ_products
self.env.cr.execute("""SELECT product_id, sum(quantity) as product_qty, location_id, lot_id as prod_lot_id, package_id, owner_id as partner_id
FROM stock_quant
WHERE %s
GROUP BY product_id, location_id, lot_id, package_id, partner_id """ % domain, args)
for product_data in self.env.cr.dictfetchall():
# replace the None the dictionary by False, because falsy values are tested later on
for void_field in [item[0] for item in product_data.items() if item[1] is None]:
product_data[void_field] = False
product_data['theoretical_qty'] = product_data['product_qty']
if product_data['product_id']:
product_data['product_uom_id'] = Product.browse(product_data['product_id']).uom_id.id
quant_products |= Product.browse(product_data['product_id'])
vals.append(product_data)
if self.exhausted:
exhausted_vals = self._get_exhausted_inventory_line(products_to_filter, quant_products)
vals.extend(exhausted_vals)
return vals
def _get_exhausted_inventory_line(self, products, quant_products):
'''
This function return inventory lines for exausted products
:param products: products With Selected Filter.
:param quant_products: products available in stock_quants
'''
vals = []
exhausted_domain = [('type', 'not in', ('service', 'consu', 'digital'))]
if products:
exhausted_products = products - quant_products
exhausted_domain += [('id', 'in', exhausted_products.ids)]
else:
exhausted_domain += [('id', 'not in', quant_products.ids)]
exhausted_products = self.env['product.product'].search(exhausted_domain)
for product in exhausted_products:
vals.append({
'inventory_id': self.id,
'product_id': product.id,
'location_id': self.location_id.id,
})
return vals
class InventoryLine(models.Model):
_name = "stock.inventory.line"
_description = "Inventory Line"
_order = "product_name ,inventory_id, location_name, product_code, prodlot_name"
inventory_id = fields.Many2one(
'stock.inventory', 'Inventory',
index=True, ondelete='cascade')
branch_id = fields.Many2one('res.branch', 'Branch',
related='inventory_id.branch_id', index=True,
readonly=True, store=True)
partner_id = fields.Many2one('res.partner', 'Owner')
product_id = fields.Many2one(
'product.product', 'Product',
domain=[('type', '=', 'product')],
index=True, required=True)
product_name = fields.Char(
'Product Name', related='product_id.name', store=True, readonly=True)
product_code = fields.Char(
'Product Code', related='product_id.default_code', store=True)
product_uom_id = fields.Many2one(
'product.uom', 'Product Unit of Measure',
required=True,
default=lambda self: self.env.ref('product.product_uom_unit', raise_if_not_found=True))
product_qty = fields.Float(
'Checked Quantity',
digits=dp.get_precision('Product Unit of Measure'), default=0)
location_id = fields.Many2one(
'stock.location', 'Location',
index=True, required=True)
# TDE FIXME: necessary ? only in order -> replace by location_id
location_name = fields.Char(
'Location Name', related='location_id.complete_name', store=True)
package_id = fields.Many2one(
'stock.quant.package', 'Pack', index=True)
prod_lot_id = fields.Many2one(
'stock.production.lot', 'Lot/Serial Number',
domain="[('product_id','=',product_id)]")
# TDE FIXME: necessary ? -> replace by location_id
prodlot_name = fields.Char(
'Serial Number Name',
2018-07-06 14:58:06 +02:00
related='prod_lot_id.name', store=True, readonly=True)
company_id = fields.Many2one(
'res.company', 'Company', related='inventory_id.company_id',
index=True, readonly=True, store=True)
# TDE FIXME: necessary ? -> replace by location_id
state = fields.Selection(
'Status', related='inventory_id.state', readonly=True)
theoretical_qty = fields.Float(
'Theoretical Quantity', compute='_compute_theoretical_qty',
digits=dp.get_precision('Product Unit of Measure'), readonly=True, store=True)
inventory_location_id = fields.Many2one(
'stock.location', 'Location', related='inventory_id.location_id', related_sudo=False)
@api.one
@api.depends('location_id', 'product_id', 'package_id', 'product_uom_id', 'company_id', 'prod_lot_id', 'partner_id')
def _compute_theoretical_qty(self):
if not self.product_id:
self.theoretical_qty = 0
return
theoretical_qty = sum([x.quantity for x in self._get_quants()])
if theoretical_qty and self.product_uom_id and self.product_id.uom_id != self.product_uom_id:
theoretical_qty = self.product_id.uom_id._compute_quantity(theoretical_qty, self.product_uom_id)
self.theoretical_qty = theoretical_qty
@api.onchange('product_id')
def onchange_product(self):
res = {}
# If no UoM or incorrect UoM put default one from product
if self.product_id:
self.product_uom_id = self.product_id.uom_id
res['domain'] = {'product_uom_id': [('category_id', '=', self.product_id.uom_id.category_id.id)]}
return res
@api.onchange('product_id', 'location_id', 'product_uom_id', 'prod_lot_id', 'partner_id', 'package_id')
def onchange_quantity_context(self):
if self.product_id and self.location_id and self.product_id.uom_id.category_id == self.product_uom_id.category_id: # TDE FIXME: last part added because crash
self._compute_theoretical_qty()
self.product_qty = self.theoretical_qty
@api.multi
def write(self, values):
values.pop('product_name', False)
res = super(InventoryLine, self).write(values)
return res
@api.model
def create(self, values):
values.pop('product_name', False)
if 'product_id' in values and 'product_uom_id' not in values:
values['product_uom_id'] = self.env['product.product'].browse(values['product_id']).uom_id.id
existings = self.search([
('product_id', '=', values.get('product_id')),
('inventory_id.state', '=', 'confirm'),
('location_id', '=', values.get('location_id')),
('partner_id', '=', values.get('partner_id')),
('package_id', '=', values.get('package_id')),
('prod_lot_id', '=', values.get('prod_lot_id'))])
res = super(InventoryLine, self).create(values)
if existings:
raise UserError(_("You cannot have two inventory adjustements in state 'in Progress' with the same product "
"(%s), same location (%s), same package, same owner and same lot. Please first validate "
"the first inventory adjustement with this product before creating another one.") %
(res.product_id.display_name, res.location_id.display_name))
return res
@api.constrains('product_id')
def _check_product_id(self):
""" As no quants are created for consumable products, it should not be possible do adjust
their quantity.
"""
for line in self:
if line.product_id.type != 'product':
raise UserError(_("You can only adjust stockable products."))
def _get_quants(self):
return self.env['stock.quant'].search([
('company_id', '=', self.company_id.id),
('location_id', '=', self.location_id.id),
('lot_id', '=', self.prod_lot_id.id),
('product_id', '=', self.product_id.id),
('owner_id', '=', self.partner_id.id),
('package_id', '=', self.package_id.id)])
def _get_move_values(self, qty, location_id, location_dest_id, out):
self.ensure_one()
return {
'name': _('INV:') + (self.inventory_id.name or ''),
'product_id': self.product_id.id,
'product_uom': self.product_uom_id.id,
'product_uom_qty': qty,
'date': self.inventory_id.date,
'company_id': self.inventory_id.company_id.id,
'inventory_id': self.inventory_id.id,
'state': 'confirmed',
'restrict_partner_id': self.partner_id.id,
'location_id': location_id,
'location_dest_id': location_dest_id,
'move_line_ids': [(0, 0, {
'product_id': self.product_id.id,
'lot_id': self.prod_lot_id.id,
'product_uom_qty': 0, # bypass reservation here
'product_uom_id': self.product_uom_id.id,
'qty_done': qty,
'package_id': out and self.package_id.id or False,
'result_package_id': (not out) and self.package_id.id or False,
'location_id': location_id,
'location_dest_id': location_dest_id,
'owner_id': self.partner_id.id,
})]
}
def _generate_moves(self):
moves = self.env['stock.move']
for line in self:
if float_utils.float_compare(line.theoretical_qty, line.product_qty, precision_rounding=line.product_id.uom_id.rounding) == 0:
continue
diff = line.theoretical_qty - line.product_qty
if diff < 0: # found more than expected
vals = line._get_move_values(abs(diff), line.product_id.property_stock_inventory.id, line.location_id.id, False)
else:
vals = line._get_move_values(abs(diff), line.location_id.id, line.product_id.property_stock_inventory.id, True)
moves |= self.env['stock.move'].create(vals)
return moves