[ADD][WIP]Odoo storage plugin : allow creation of partner / company

This commit is contained in:
Fabien BOURGEOIS 2020-04-09 15:07:29 +02:00
parent 2587cfa999
commit 60c6f10392
1 changed files with 172 additions and 36 deletions

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2018 Fabien Bourgeois <fabien@yaltik.com>
# Copyright 2018-2020 Fabien Bourgeois <fabien@yaltik.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
@ -41,6 +41,8 @@ from radicale import xmlutils
from radicale.storage import BaseCollection, Item, get_etag, get_uid_from_object
from radicale_odoo_auth import Auth
# Application.do_PUT = do_PUT
class Collection(BaseCollection):
""" BaseCollection implementation for Odoo Radicale Storage """
@ -101,28 +103,43 @@ class Collection(BaseCollection):
yield
@classmethod
def discover(cls, path, depth="0"):
def discover(cls, path, depth='0'):
"""Discover implementation """
attributes = path.strip('/').split('/') or []
if path and not cls.user:
cls.user = attributes[0]
depth = depth or '0' # Sometimes depth is '' ?
cls.logger.debug('Discover : %s (path), %s (depth), %s (cls.user), %s (attributes)' %
(path, depth, cls.user, attributes))
yield cls(path)
if len(attributes) == 1: # Got all if root is needed
contact_path = '%sodoo-contact' % path
calendar_own_path = '%sodoo-calendar-own' % path
calendar_in_path = '%sodoo-calendar-in' % path
calendar_all_path = '%sodoo-calendar-all' % path
yield cls(contact_path)
yield cls(calendar_own_path)
yield cls(calendar_in_path)
yield cls(calendar_all_path)
elif len(attributes) == 2: # Then we need children
collection = cls(path)
for item in collection.list():
yield collection.get(item)
if len(attributes) == 1: # Root collection (user)
if depth == '0':
yield cls(path) # Do not ?
else: # Need crafted children
contact_path = '%sodoo-contact' % path
calendar_own_path = '%sodoo-calendar-own' % path
calendar_in_path = '%sodoo-calendar-in' % path
calendar_all_path = '%sodoo-calendar-all' % path
yield cls(contact_path)
yield cls(calendar_own_path)
yield cls(calendar_in_path)
yield cls(calendar_all_path)
elif len(attributes) == 2: # Collection
if depth == '0':
yield cls(path)
else: # We need all items
collection = cls(path)
yield collection
for item in collection.list():
yield collection.get(item)
elif len(attributes) == 3:
if depth =='0': # Read / Update / Create
if attributes[-1] in cls.partners: # Read or Update
# collection = cls('/'.join(attributes[:-1]))
collection = cls(path)
yield collection.get(attributes[-1])
# FIXME : and depth 1 ?
def get_meta(self, key=None):
"""Get metadata value for collection """
@ -134,24 +151,38 @@ class Collection(BaseCollection):
@classmethod
def create_collection(cls, href, collection=None, props=None):
""" Create collection implementation : only warns ATM """
cls.logger.error('Attemmpt to create a new collection for %s' % href)
cls.logger.error('Attempt to create a new collection for %s' % href)
@classmethod
def _get_contacts_from_odoo(cls, extra_domain=False):
""" Gets all contacts available from one Odoo login """
fields = ['name', 'write_date', 'comment', 'street', 'street2', 'zip',
'city', 'phone', 'mobile', 'fax', 'email', 'website',
'function', 'comment', 'image', 'ref',
'category_id', 'parent_id', 'state_id', 'country_id']
domain = [('type', '=', 'contact')]
if extra_domain:
domain.append(extra_domain)
return cls.odoo.execute_kw('res.partner', 'search_read', [domain],
{'fields': fields})
@classmethod
def get_contacts_from_odoo(cls, login):
""" Gets all contacts available from one Odoo login """
cls.logger.info('Get contacts for Odoo user %s' % login)
fields = ['name', 'write_date', 'comment', 'street', 'street2', 'zip',
'city', 'phone', 'mobile', 'fax', 'email', 'website',
'function', 'comment', 'image',
'category_id', 'parent_id', 'state_id', 'country_id']
domain = [('type', '=', 'contact')]
partners = cls.odoo.execute_kw('res.partner', 'search_read', [domain],
{'fields': fields})
cls.partners = {p['id']: p for p in partners}
partners = cls._get_contacts_from_odoo()
cls.partners = {}
for p in partners:
if p.get('ref'):
cls.partners[p['ref']] = p
else:
cls.partners[str(p['id'])] = p
# cls.partners = {p['id']: p for p in partners}
categs = cls.odoo.execute_kw('res.partner.category', 'search_read', [[]],
{'fields': ['name']})
cls.categs = {c['id']: c['name'] for c in categs}
return ['res.partner:%s' % pid for pid in cls.partners.keys()]
cls.categs = {str(c['id']): c['name'] for c in categs}
return cls.partners.keys()
# return ['res.partner:%s' % pid for pid in cls.partners.keys()]
@classmethod
def get_events_from_odoo(cls, login, path):
@ -200,10 +231,10 @@ class Collection(BaseCollection):
if self.tag:
if self.tag == 'VADDRESSBOOK':
for oid in self.get_contacts_from_odoo(self.owner):
yield oid + self.content_suffix
yield oid
elif self.tag == 'VCALENDAR':
for oid in self.get_events_from_odoo(self.owner, self.path):
yield oid + self.content_suffix
yield oid
else:
raise NotImplementedError
@ -250,7 +281,7 @@ class Collection(BaseCollection):
if partner.get('comment'):
vobject_item.add('note').value = partner.get('comment')
if partner.get('category_id'):
categs = [cls.categs.get(cid) for cid in partner.get('category_id')]
categs = [cls.categs.get(str(cid)) for cid in partner.get('category_id')]
vobject_item.add('categories').value = categs
if partner.get('parent_id'):
vobject_item.add('org').value = [partner.get('parent_id')[1]]
@ -300,11 +331,10 @@ class Collection(BaseCollection):
def _get_item_from_vobject(self, href, vobject_item):
""" Return Item from Vobject and HREF """
vobject_item.add('uid').value = href
vobject_item.add('uid').value = str(href)
tag, start, end = xmlutils.find_tag_and_time_range(vobject_item)
text = vobject_item.serialize()
etag = get_etag(text)
# uid = get_uid_from_object(vobject_item)
return Item(
self, href=href, last_modified=vobject_item.rev.value, etag=etag,
text=text, item=vobject_item, uid=href,
@ -312,8 +342,8 @@ class Collection(BaseCollection):
def _get_with_metadata(self, href):
"""Fetch a single item from Odoo database"""
model, database_id = href.split(':')
database_id = int(database_id[:-len(self.content_suffix)])
model = self.odoo_model
database_id = href
if model == 'res.partner':
vobject_item = self._generate_vcard_from_odoo(database_id)
return self._get_item_from_vobject(href, vobject_item)
@ -324,18 +354,124 @@ class Collection(BaseCollection):
raise NotImplementedError
def get(self, href, verify_href=True):
item, metadata = self._get_with_metadata(href)
item, metadata = self._get_with_metadata(str(href))
item.href = str(item.href) # fix for etag radicale concatenation
return item
def upload(self, href, item):
""" Creation or update """
self.logger.info('Upload href %s , item %s' % (href, item))
# TODO : handle image
# Ref : important to keep track of creations outside of Odoo
contact_data = {'ref': item.uid.value}
# Name, remove excessive spaces
name = ' '.join(filter(bool, str(item.n.value).split(' ')))
if not name:
name = item.fn.value
contact_data['name'] = name
# Address
if item.contents.get('adr'): # First only
address = item.adr_list[0]
contact_data.update({
'street': address.value.street,
'zip': address.value.code,
'city': address.value.city
})
country = address.value.country
country_dmn = [('name', 'ilike', country)]
country_id = False
if country:
country_id = self.odoo.env['res.country'].search(country_dmn, limit=1)
if country_id:
country_id = country_id[0]
if not country_id:
country_id = self.odoo.env.ref('base.fr').id
contact_data['country_id'] = country_id
# Mail
if item.contents.get('email'): # First only
contact_data['email'] = item.email_list[0].value
# Website
if item.contents.get('url'): # First only
contact_data['website'] = item.url_list[0].value
# Phones
if item.contents.get('tel'):
for tel in item.tel_list:
tel_type = tel.params.get('TYPE', [])
if 'CELL' in tel_type:
contact_data['mobile'] = tel.value
elif 'FAX' in tel_type:
contact_data['fax'] = tel.value
else:
contact_data['phone'] = tel.value
# Function
if item.contents.get('role'):
contact_data['function'] = item.role.value
# Comment
if item.contents.get('note'):
contact_data['comment'] = item.note.value
# Categories
if item.contents.get('categories'):
categ_ids = []
for categ in item.categories.value:
cat_obj = self.odoo.env['res.partner.category']
dmn = [('name', '=ilike', categ)]
categ_id = cat_obj.search(dmn, limit=1)
if not categ_id:
categ_id = cat_obj.create({'name': categ})
self.categs[str(categ_id)] = categ
else:
categ_id = categ_id[0]
categ_ids.append(categ_id)
contact_data['category_id'] = [(6, False, categ_ids)]
# Parent company (get, not create)
partner_obj = self.odoo.env['res.partner']
if item.contents.get('org'):
dmn = [('name', '=ilike', item.org.value), ('is_company', '=', True)]
parent_id = partner_obj.search(dmn, limit=1)
if parent_id:
contact_data['parent_id'] = parent_id.id
contact_data['is_company'] = False
contact_data['company_type'] = 'person'
else:
contact_data['is_company'] = True
contact_data['company_type'] = 'company'
# Create or update
if href in self.partners.keys():
partner = self.partners[href]
partner_id = partner['id']
partner_obj.write([partner_id], contact_data)
else:
partner_id = partner_obj.create(contact_data)
partner = self._get_contacts_from_odoo(('id', '=', partner_id))[0]
self.partners[item.uid.value] = partner
res_item = self.get(item.uid.value)
# print('item.etag : %s // res_item.etag : %s' % (item.etag, res_item.etag))
return res_item
def delete(self, href=None):
""" Can not delete collection but item, yes """
if self.path and href is None:
href = self.path
if href is None:
# Delete the collection
self.logger.error('Attempt to delete collection %s' % self.path)
raise ValueError('Can not delete collection')
else:
# Delete an item
raise NotImplementedError
if self.odoo_model == 'res.partner':
obj = self.odoo.env[self.odoo_model]
record_id = href.split('/')[-1]
partner_id = self.partners.get(record_id)['id']
try:
obj.unlink([partner_id])
except RPCError: # if not removable, archive it
obj.write([partner_id], {'active': False})
del self.partners[record_id]
@property
def last_modified(self):