flectra/addons/anonymization/models/anonymization.py

149 lines
6.4 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
2018-01-16 02:34:37 -08:00
# Part of Odoo, Flectra. See LICENSE file for full copyright and licensing details.
from itertools import groupby
from operator import itemgetter
2018-01-16 02:34:37 -08:00
from flectra import api, fields, models, _
from flectra.exceptions import UserError
from flectra.tools import pycompat
FIELD_STATES = [('clear', 'Clear'), ('anonymized', 'Anonymized'), ('not_existing', 'Not Existing'), ('new', 'New')]
ANONYMIZATION_HISTORY_STATE = [('started', 'Started'), ('done', 'Done'), ('in_exception', 'Exception occured')]
ANONYMIZATION_DIRECTION = [('clear -> anonymized', 'clear -> anonymized'), ('anonymized -> clear', 'anonymized -> clear')]
def group(lst, cols):
if isinstance(cols, pycompat.string_types):
cols = [cols]
return dict((k, [v for v in itr]) for k, itr in groupby(sorted(lst, key=itemgetter(*cols)), itemgetter(*cols)))
class IrModelFieldsAnonymization(models.Model):
_name = 'ir.model.fields.anonymization'
_rec_name = 'field_id'
model_name = fields.Char('Object Name', required=True)
model_id = fields.Many2one('ir.model', string='Object', ondelete='set null')
field_name = fields.Char(required=True)
field_id = fields.Many2one('ir.model.fields', string='Field', ondelete='set null')
state = fields.Selection(selection=FIELD_STATES, string='Status', required=True, readonly=True, default='clear')
_sql_constraints = [
('model_id_field_id_uniq', 'unique (model_name, field_name)', "You cannot have two fields with the same name on the same object!"),
]
@api.model
def _get_global_state(self):
field_ids = self.search([('state', '!=', 'not_existing')])
if not field_ids or len(field_ids) == len(field_ids.filtered(lambda field: field.state == "clear")):
state = 'clear' # all fields are clear
elif len(field_ids) == len(field_ids.filtered(lambda field: field.state == "anonymized")):
state = 'anonymized' # all fields are anonymized
else:
state = 'unstable' # fields are mixed: this should be fixed
return state
@api.model
def _check_write(self):
"""check that the field is created from the menu and not from an database update
otherwise the database update can crash:"""
if self.env.context.get('manual'):
global_state = self._get_global_state()
if global_state == 'anonymized':
raise UserError(_("The database is currently anonymized, you cannot create, modify or delete fields."))
elif global_state == 'unstable':
raise UserError(_("The database anonymization is currently in an unstable state. Some fields are anonymized,"
" while some fields are not anonymized. You should try to solve this problem before trying to create, write or delete fields."))
return True
@api.model
def _get_model_and_field_ids(self, vals):
if vals.get('field_name') and vals.get('model_name'):
field = self.env['ir.model.fields']._get(vals['model_name'], vals['field_name'])
return (field.model_id.id, field.id)
return (False, False)
@api.model
def create(self, vals):
# check field state: all should be clear before we can add a new field to anonymize:
self._check_write()
if vals.get('field_name') and vals.get('model_name'):
vals['model_id'], vals['field_id'] = self._get_model_and_field_ids(vals)
# check not existing fields:
vals['state'] = self._get_global_state() if vals.get('field_id') else 'not_existing'
return super(IrModelFieldsAnonymization, self).create(vals)
@api.multi
def write(self, vals):
# check field state: all should be clear before we can modify a field:
if not len(vals) == 1 and vals.get('state') == 'clear':
self._check_write()
if vals.get('field_name') and vals.get('model_name'):
vals['model_id'], vals['field_id'] = self._get_model_and_field_ids(vals)
# check not existing fields:
if 'field_id' in vals:
if not vals['field_id']:
vals['state'] = 'not_existing'
else:
global_state = self._get_global_state()
if global_state != 'unstable':
vals['state'] = global_state
return super(IrModelFieldsAnonymization, self).write(vals)
@api.multi
def unlink(self):
# check field state: all should be clear before we can unlink a field:
self._check_write()
return super(IrModelFieldsAnonymization, self).unlink()
@api.onchange('model_id')
def _onchange_model_id(self):
self.field_name = False
self.field_id = False
self.model_name = self.model_id.model
@api.onchange('model_name')
def _onchange_model_name(self):
self.field_name = False
self.field_id = False
self.model_id = self.env['ir.model']._get(self.model_name)
@api.onchange('field_name')
def _onchange_field_name(self):
if self.field_name and self.model_name:
self.field_id = self.env['ir.model.fields']._get(self.model_name, self.field_name)
else:
self.field_id = False
@api.onchange('field_id')
def _onchange_field_id(self):
self.field_name = self.field_id.name
class IrModelFieldsAnonymizationHistory(models.Model):
_name = 'ir.model.fields.anonymization.history'
_order = "date desc"
date = fields.Datetime(required=True, readonly=True)
field_ids = fields.Many2many(
'ir.model.fields.anonymization', 'anonymized_field_to_history_rel',
'field_id', 'history_id', string='Fields', readonly=True
)
state = fields.Selection(selection=ANONYMIZATION_HISTORY_STATE, string='Status', required=True, readonly=True)
direction = fields.Selection(selection=ANONYMIZATION_DIRECTION, required=True, readonly=True)
msg = fields.Text('Message', readonly=True)
filepath = fields.Char('File path', readonly=True)
class IrModelFieldsAnonymizationMigrationFix(models.Model):
_name = 'ir.model.fields.anonymization.migration.fix'
_order = "sequence"
target_version = fields.Char('Target Version')
model_name = fields.Char('Model')
field_name = fields.Char('Field')
query = fields.Text()
query_type = fields.Selection(selection=[('sql', 'sql'), ('python', 'python')], string='Query')
sequence = fields.Integer()