# -*- coding: utf-8 -*- # Part of Odoo, Flectra. See LICENSE file for full copyright and licensing details. from psycopg2 import OperationalError, Error from flectra import api, fields, models, _ from flectra.exceptions import UserError, ValidationError from flectra.osv import expression from flectra.tools.float_utils import float_compare, float_is_zero import logging _logger = logging.getLogger(__name__) class StockQuant(models.Model): _name = 'stock.quant' _description = 'Quants' _rec_name = 'product_id' product_id = fields.Many2one( 'product.product', 'Product', ondelete='restrict', readonly=True, required=True) # so user can filter on template in webclient product_tmpl_id = fields.Many2one( 'product.template', string='Product Template', related='product_id.product_tmpl_id') product_uom_id = fields.Many2one( 'product.uom', 'Unit of Measure', readonly=True, related='product_id.uom_id') company_id = fields.Many2one(related='location_id.company_id', string='Company', store=True, readonly=True) location_id = fields.Many2one( 'stock.location', 'Location', auto_join=True, ondelete='restrict', readonly=True, required=True) branch_id = fields.Many2one(related='location_id.branch_id', string='Branch', ) lot_id = fields.Many2one( 'stock.production.lot', 'Lot/Serial Number', ondelete='restrict', readonly=True) package_id = fields.Many2one( 'stock.quant.package', 'Package', help='The package containing this quant', readonly=True, ondelete='restrict') owner_id = fields.Many2one( 'res.partner', 'Owner', help='This is the owner of the quant', readonly=True) quantity = fields.Float( 'Quantity', help='Quantity of products in this quant, in the default unit of measure of the product', readonly=True, required=True, oldname='qty') reserved_quantity = fields.Float( 'Reserved Quantity', default=0.0, help='Quantity of reserved products in this quant, in the default unit of measure of the product', readonly=True, required=True) in_date = fields.Datetime('Incoming Date', readonly=True) def action_view_stock_moves(self): self.ensure_one() action = self.env.ref('stock.stock_move_line_action').read()[0] action['domain'] = [ ('product_id', '=', self.product_id.id), '|', ('location_id', '=', self.location_id.id), ('location_dest_id', '=', self.location_id.id), ('lot_id', '=', self.lot_id.id), '|', ('package_id', '=', self.package_id.id), ('result_package_id', '=', self.package_id.id), ] return action @api.constrains('product_id') def check_product_id(self): if any(elem.product_id.type != 'product' for elem in self): raise ValidationError(_('Quants cannot be created for consumables or services.')) @api.constrains('quantity') def check_quantity(self): for quant in self: if float_compare(quant.quantity, 1, precision_rounding=quant.product_uom_id.rounding) > 0 and quant.lot_id and quant.product_id.tracking == 'serial': raise ValidationError(_('A serial number should only be linked to a single product.')) @api.constrains('location_id') def check_location_id(self): for quant in self: if quant.location_id.usage == 'view': raise ValidationError(_('You cannot take products from or deliver products to a location of type "view".')) @api.one def _compute_name(self): self.name = '%s: %s%s' % (self.lot_id.name or self.product_id.code or '', self.quantity, self.product_id.uom_id.name) @api.model def _get_removal_strategy(self, product_id, location_id): if product_id.categ_id.removal_strategy_id: return product_id.categ_id.removal_strategy_id.method loc = location_id while loc: if loc.removal_strategy_id: return loc.removal_strategy_id.method loc = loc.location_id return 'fifo' @api.model def _get_removal_strategy_order(self, removal_strategy): if removal_strategy == 'fifo': return 'in_date ASC NULLS FIRST, id' elif removal_strategy == 'lifo': return 'in_date DESC NULLS LAST, id desc' raise UserError(_('Removal strategy %s not implemented.') % (removal_strategy,)) def _gather(self, product_id, location_id, lot_id=None, package_id=None, owner_id=None, strict=False): removal_strategy = self._get_removal_strategy(product_id, location_id) removal_strategy_order = self._get_removal_strategy_order(removal_strategy) domain = [ ('product_id', '=', product_id.id), ] if not strict: if lot_id: domain = expression.AND([[('lot_id', '=', lot_id.id)], domain]) if package_id: domain = expression.AND([[('package_id', '=', package_id.id)], domain]) if owner_id: domain = expression.AND([[('owner_id', '=', owner_id.id)], domain]) domain = expression.AND([[('location_id', 'child_of', location_id.id)], domain]) else: domain = expression.AND([[('lot_id', '=', lot_id and lot_id.id or False)], domain]) domain = expression.AND([[('package_id', '=', package_id and package_id.id or False)], domain]) domain = expression.AND([[('owner_id', '=', owner_id and owner_id.id or False)], domain]) domain = expression.AND([[('location_id', '=', location_id.id)], domain]) # Copy code of _search for special NULLS FIRST/LAST order self.sudo(self._uid).check_access_rights('read') query = self._where_calc(domain) self._apply_ir_rules(query, 'read') from_clause, where_clause, where_clause_params = query.get_sql() where_str = where_clause and (" WHERE %s" % where_clause) or '' query_str = 'SELECT "%s".id FROM ' % self._table + from_clause + where_str + " ORDER BY "+ removal_strategy_order self._cr.execute(query_str, where_clause_params) res = self._cr.fetchall() # No uniquify list necessary as auto_join is not applied anyways... return self.browse([x[0] for x in res]) @api.model def _get_available_quantity(self, product_id, location_id, lot_id=None, package_id=None, owner_id=None, strict=False, allow_negative=False): """ Return the available quantity, i.e. the sum of `quantity` minus the sum of `reserved_quantity`, for the set of quants sharing the combination of `product_id, location_id` if `strict` is set to False or sharing the *exact same characteristics* otherwise. This method is called in the following usecases: - when a stock move checks its availability - when a stock move actually assign - when editing a move line, to check if the new value is forced or not - when validating a move line with some forced values and have to potentially unlink an equivalent move line in another picking In the two first usecases, `strict` should be set to `False`, as we don't know what exact quants we'll reserve, and the characteristics are meaningless in this context. In the last ones, `strict` should be set to `True`, as we work on a specific set of characteristics. :return: available quantity as a float """ self = self.sudo() quants = self._gather(product_id, location_id, lot_id=lot_id, package_id=package_id, owner_id=owner_id, strict=strict) rounding = product_id.uom_id.rounding if product_id.tracking == 'none': available_quantity = sum(quants.mapped('quantity')) - sum(quants.mapped('reserved_quantity')) if allow_negative: return available_quantity else: return available_quantity if float_compare(available_quantity, 0.0, precision_rounding=rounding) >= 0.0 else 0.0 else: availaible_quantities = {lot_id: 0.0 for lot_id in list(set(quants.mapped('lot_id'))) + ['untracked']} for quant in quants: if not quant.lot_id: availaible_quantities['untracked'] += quant.quantity - quant.reserved_quantity else: availaible_quantities[quant.lot_id] += quant.quantity - quant.reserved_quantity if allow_negative: return sum(availaible_quantities.values()) else: return sum([available_quantity for available_quantity in availaible_quantities.values() if float_compare(available_quantity, 0, precision_rounding=rounding) > 0]) @api.model def _update_available_quantity(self, product_id, location_id, quantity, lot_id=None, package_id=None, owner_id=None, in_date=None): """ Increase or decrease `reserved_quantity` of a set of quants for a given set of product_id/location_id/lot_id/package_id/owner_id. :param product_id: :param location_id: :param quantity: :param lot_id: :param package_id: :param owner_id: :param datetime in_date: Should only be passed when calls to this method are done in order to move a quant. When creating a tracked quant, the current datetime will be used. :return: tuple (available_quantity, in_date as a datetime) """ self = self.sudo() quants = self._gather(product_id, location_id, lot_id=lot_id, package_id=package_id, owner_id=owner_id, strict=True) rounding = product_id.uom_id.rounding incoming_dates = [d for d in quants.mapped('in_date') if d] incoming_dates = [fields.Datetime.from_string(incoming_date) for incoming_date in incoming_dates] if in_date: incoming_dates += [in_date] # If multiple incoming dates are available for a given lot_id/package_id/owner_id, we # consider only the oldest one as being relevant. if incoming_dates: in_date = fields.Datetime.to_string(min(incoming_dates)) else: in_date = fields.Datetime.now() for quant in quants: try: with self._cr.savepoint(): self._cr.execute("SELECT 1 FROM stock_quant WHERE id = %s FOR UPDATE NOWAIT", [quant.id], log_exceptions=False) quant.write({ 'quantity': quant.quantity + quantity, 'in_date': in_date, }) # cleanup empty quants if float_is_zero(quant.quantity, precision_rounding=rounding) and float_is_zero(quant.reserved_quantity, precision_rounding=rounding): quant.unlink() break except OperationalError as e: if e.pgcode == '55P03': # could not obtain the lock continue else: raise else: self.create({ 'product_id': product_id.id, 'location_id': location_id.id, 'quantity': quantity, 'lot_id': lot_id and lot_id.id, 'package_id': package_id and package_id.id, 'owner_id': owner_id and owner_id.id, 'in_date': in_date, }) return self._get_available_quantity(product_id, location_id, lot_id=lot_id, package_id=package_id, owner_id=owner_id, strict=False, allow_negative=True), fields.Datetime.from_string(in_date) @api.model def _update_reserved_quantity(self, product_id, location_id, quantity, lot_id=None, package_id=None, owner_id=None, strict=False): """ Increase the reserved quantity, i.e. increase `reserved_quantity` for the set of quants sharing the combination of `product_id, location_id` if `strict` is set to False or sharing the *exact same characteristics* otherwise. Typically, this method is called when reserving a move or updating a reserved move line. When reserving a chained move, the strict flag should be enabled (to reserve exactly what was brought). When the move is MTS,it could take anything from the stock, so we disable the flag. When editing a move line, we naturally enable the flag, to reflect the reservation according to the edition. :return: a list of tuples (quant, quantity_reserved) showing on which quant the reservation was done and how much the system was able to reserve on it """ self = self.sudo() rounding = product_id.uom_id.rounding quants = self._gather(product_id, location_id, lot_id=lot_id, package_id=package_id, owner_id=owner_id, strict=strict) reserved_quants = [] if float_compare(quantity, 0, precision_rounding=rounding) > 0: # if we want to reserve available_quantity = self._get_available_quantity(product_id, location_id, lot_id=lot_id, package_id=package_id, owner_id=owner_id, strict=strict) if float_compare(quantity, available_quantity, precision_rounding=rounding) > 0: raise UserError(_('It is not possible to reserve more products of %s than you have in stock.') % product_id.display_name) elif float_compare(quantity, 0, precision_rounding=rounding) < 0: # if we want to unreserve available_quantity = sum(quants.mapped('reserved_quantity')) if float_compare(abs(quantity), available_quantity, precision_rounding=rounding) > 0: raise UserError(_('It is not possible to unreserve more products of %s than you have in stock.') % product_id.display_name) else: return reserved_quants for quant in quants: if float_compare(quantity, 0, precision_rounding=rounding) > 0: max_quantity_on_quant = quant.quantity - quant.reserved_quantity if float_compare(max_quantity_on_quant, 0, precision_rounding=rounding) <= 0: continue max_quantity_on_quant = min(max_quantity_on_quant, quantity) quant.reserved_quantity += max_quantity_on_quant reserved_quants.append((quant, max_quantity_on_quant)) quantity -= max_quantity_on_quant available_quantity -= max_quantity_on_quant else: max_quantity_on_quant = min(quant.reserved_quantity, abs(quantity)) quant.reserved_quantity -= max_quantity_on_quant reserved_quants.append((quant, -max_quantity_on_quant)) quantity += max_quantity_on_quant available_quantity += max_quantity_on_quant if float_is_zero(quantity, precision_rounding=rounding) or float_is_zero(available_quantity, precision_rounding=rounding): break return reserved_quants @api.model def _merge_quants(self): """ In a situation where one transaction is updating a quant via `_update_available_quantity` and another concurrent one calls this function with the same argument, we’ll create a new quant in order for these transactions to not rollback. This method will find and deduplicate these quants. """ query = """WITH dupes AS ( SELECT min(id) as to_update_quant_id, (array_agg(id ORDER BY id))[2:array_length(array_agg(id), 1)] as to_delete_quant_ids, SUM(reserved_quantity) as reserved_quantity, SUM(quantity) as quantity FROM stock_quant GROUP BY product_id, company_id, location_id, lot_id, package_id, owner_id, in_date HAVING count(id) > 1 ), _up AS ( UPDATE stock_quant q SET quantity = d.quantity, reserved_quantity = d.reserved_quantity FROM dupes d WHERE d.to_update_quant_id = q.id ) DELETE FROM stock_quant WHERE id in (SELECT unnest(to_delete_quant_ids) from dupes) """ try: with self.env.cr.savepoint(): self.env.cr.execute(query) except Error as e: _logger.info('an error occured while merging quants: %s', e.pgerror) class QuantPackage(models.Model): """ Packages containing quants and/or other packages """ _name = "stock.quant.package" _description = "Physical Packages" _order = 'name' name = fields.Char( 'Package Reference', copy=False, index=True, default=lambda self: self.env['ir.sequence'].next_by_code('stock.quant.package') or _('Unknown Pack')) quant_ids = fields.One2many('stock.quant', 'package_id', 'Bulk Content', readonly=True) packaging_id = fields.Many2one( 'product.packaging', 'Package Type', index=True, help="This field should be completed only if everything inside the package share the same product, otherwise it doesn't really makes sense.") location_id = fields.Many2one( 'stock.location', 'Location', compute='_compute_package_info', search='_search_location', index=True, readonly=True) company_id = fields.Many2one( 'res.company', 'Company', compute='_compute_package_info', search='_search_company', index=True, readonly=True) owner_id = fields.Many2one( 'res.partner', 'Owner', compute='_compute_package_info', search='_search_owner', index=True, readonly=True) move_line_ids = fields.One2many('stock.move.line', 'result_package_id') current_picking_move_line_ids = fields.One2many('stock.move.line', compute="_compute_current_picking_info") current_picking_id = fields.Boolean(compute="_compute_current_picking_info") current_source_location_id = fields.Many2one('stock.location', compute="_compute_current_picking_info") current_destination_location_id = fields.Many2one('stock.location', compute="_compute_current_picking_info") is_processed = fields.Boolean(compute="_compute_current_picking_info") @api.depends('quant_ids.package_id', 'quant_ids.location_id', 'quant_ids.company_id', 'quant_ids.owner_id') def _compute_package_info(self): for package in self: values = {'location_id': False, 'company_id': self.env.user.company_id.id, 'owner_id': False} if package.quant_ids: values['location_id'] = package.quant_ids[0].location_id package.location_id = values['location_id'] package.company_id = values['company_id'] package.owner_id = values['owner_id'] def name_get(self): return list(self._compute_complete_name().items()) def _compute_complete_name(self): """ Forms complete name of location from parent location to child location. """ res = {} for package in self: name = package.name res[package.id] = name return res def _compute_current_picking_info(self): """ When a package is in displayed in picking, it gets the picking id trough the context, and this function populates the different fields used when we move entire packages in pickings. """ for package in self: picking_id = self.env.context.get('picking_id') if not picking_id: package.current_picking_move_line_ids = False package.current_picking_id = False package.is_processed = False package.current_source_location_id = False package.current_destination_location_id = False continue package.current_picking_move_line_ids = package.move_line_ids.filtered(lambda ml: ml.picking_id.id == picking_id) package.current_picking_id = True package.current_source_location_id = package.current_picking_move_line_ids[:1].location_id package.current_destination_location_id = package.current_picking_move_line_ids[:1].location_dest_id package.is_processed = not bool(package.current_picking_move_line_ids.filtered(lambda ml: ml.qty_done < ml.product_uom_qty)) def action_toggle_processed(self): """ This method set the quantity done to the reserved quantity of all move lines of a package or to 0 if the package is already processed""" picking_id = self.env.context.get('picking_id') if picking_id: self.ensure_one() move_lines = self.current_picking_move_line_ids if move_lines.filtered(lambda ml: ml.qty_done < ml.product_uom_qty): destination_location = self.env.context.get('destination_location') for ml in move_lines: vals = {'qty_done': ml.product_uom_qty} if destination_location: vals['location_dest_id'] = destination_location ml.write(vals) else: for ml in move_lines: ml.qty_done = 0 def _search_location(self, operator, value): if value: packs = self.search([('quant_ids.location_id', operator, value)]) else: packs = self.search([('quant_ids', operator, value)]) if packs: return [('id', 'in', packs.ids)] else: return [('id', '=', False)] def _search_company(self, operator, value): if value: packs = self.search([('quant_ids.company_id', operator, value)]) else: packs = self.search([('quant_ids', operator, value)]) if packs: return [('id', 'parent_of', packs.ids)] else: return [('id', '=', False)] def _search_owner(self, operator, value): if value: packs = self.search([('quant_ids.owner_id', operator, value)]) else: packs = self.search([('quant_ids', operator, value)]) if packs: return [('id', 'parent_of', packs.ids)] else: return [('id', '=', False)] def _check_location_constraint(self): '''checks that all quants in a package are stored in the same location. This function cannot be used as a constraint because it needs to be checked on pack operations (they may not call write on the package) ''' for pack in self: locations = pack.get_content().filtered(lambda quant: quant.qty > 0.0).mapped('location_id') if len(locations) != 1: raise UserError(_('Everything inside a package should be in the same location')) return True def unpack(self): for package in self: move_lines_to_remove = package.move_line_ids.filtered(lambda move_line: move_line.state != 'done') if move_lines_to_remove: move_lines_to_remove.write({'result_package_id': False}) else: move_line_to_modify = self.env['stock.move.line'].search([ ('package_id', '=', package.id), ('state', 'in', ('assigned', 'partially_available')), ('product_qty', '!=', 0), ]) move_line_to_modify.write({'package_id': False}) package.mapped('quant_ids').write({'package_id': False}) def action_view_picking(self): action = self.env.ref('stock.action_picking_tree_all').read()[0] domain = ['|', ('result_package_id', 'in', self.ids), ('package_id', 'in', self.ids)] pickings = self.env['stock.move.line'].search(domain).mapped('picking_id') action['domain'] = [('id', 'in', pickings.ids)] return action def view_content_package(self): action = self.env['ir.actions.act_window'].for_xml_id('stock', 'quantsact') action['domain'] = [('id', 'in', self._get_contained_quants().ids)] return action def _get_contained_quants(self): return self.env['stock.quant'].search([('package_id', 'child_of', self.ids)]) def _get_all_products_quantities(self): '''This function computes the different product quantities for the given package ''' # TDE CLEANME: probably to move somewhere else, like in pack op res = {} for quant in self._get_contained_quants(): if quant.product_id not in res: res[quant.product_id] = 0 res[quant.product_id] += quant.qty return res