radicale_odoo/radicale_odoo_storage/__init__.py

491 lines
21 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2018-2020 Fabien Bourgeois <fabien@yaltik.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
""" Odoo Radicale Storage Plugin """
# # PLAN
# 1. OK : Implement readonly from Odoo only
# 2. OK : Implement contacts first (and so vcf)
# 3. OK : Implement unique events (.ics) and timezone
# 4. OK : Implement notifications for events
# 5. OK : Implement recurrent events
# 6. OK : Offer two (or more) calendar : own events, events where I'm attended
# and all readable events
# 7. Offer one calendar per user (via login/email)
# 8. OK : Begin write (two way) for contacts
# 9. Begin write (two way) for calendar
from contextlib import contextmanager
from time import strftime, strptime
from datetime import timedelta, datetime
import pytz
import vobject
from odoorpc import ODOO
from odoorpc.error import RPCError
from radicale import xmlutils
from radicale.storage import BaseCollection, Item, get_etag, get_uid_from_object
from radicale_odoo_auth import Auth
class Collection(BaseCollection):
""" BaseCollection implementation for Odoo Radicale Storage """
odoo = False
@classmethod
def static_init(cls):
""" Static init is usefull mainly for threading lock, not needed for
Odoo storage """
pass
def __init__(self, path):
""" Init function """
self.__class__.odoo = Auth.odoo
if not self.__class__.odoo:
self.logger.error('No auth Odoo found...')
raise RuntimeError('No auth Odoo found')
odoo_timezone = self.configuration.get('storage', 'odoo_timezone')
self.__class__.server_timezone = pytz.timezone(odoo_timezone)
attributes = path.strip('/').split('/')
self.tag = None
self.props = {}
if 'odoo-contact' in attributes:
self.tag = 'VADDRESSBOOK'
self.odoo_model = 'res.partner'
self.content_suffix = '.vcf'
self.props.update({'tag': 'VADDRESSBOOK',
'D:displayname': 'Odoo contacts',
'CR:addressbook-description': 'Contacts form your Odoo account'})
elif ('odoo-calendar-own' in attributes or
'odoo-calendar-in' in attributes or 'odoo-calendar-all' in attributes):
self.tag = 'VCALENDAR'
self.odoo_model = 'calendar.event'
self.content_suffix = '.ics'
self.props.update({'tag': 'VCALENDAR'})
if 'odoo-calendar-own' in attributes:
self.props.update({'D:displayname': 'Odoo calendar : own events',
'C:calendar-description': 'Own events, from your Odoo calendar'})
elif 'odoo-calendar-in' in attributes:
self.props.update({'D:displayname': 'Odoo calendar : events I\'m in',
'C:calendar-description': 'Events you are ' \
'attended from your Odoo calendar'})
else:
self.props.update({'D:displayname': 'Odoo calendar : all events',
'C:calendar-description': 'All events from your Odoo calendar'})
self.path = path.strip('/')
self.owner = attributes[0]
self.is_principal = len(attributes) == 0
@classmethod
def odoo_date_to_utc(cls, date_str):
""" Transform naive Odoo date object to UTC TZ """
date_obj = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
local_date = cls.server_timezone.localize(date_obj, is_dst=None)
return local_date.astimezone(pytz.utc)
@classmethod
@contextmanager
def acquire_lock(cls, mode, user=None):
cls.user = user
yield
@classmethod
def discover(cls, path, depth='0'):
"""Discover implementation """
attributes = path.strip('/').split('/') or []
if path and not cls.user:
cls.user = attributes[0]
depth = depth or '0' # Sometimes depth is '' ?
cls.logger.debug('Discover : %s (path), %s (depth), %s (cls.user), %s (attributes)' %
(path, depth, cls.user, attributes))
if len(attributes) == 1: # Root collection (user)
if depth == '0':
yield cls(path) # Do not ?
else: # Need crafted children
contact_path = '%sodoo-contact' % path
calendar_own_path = '%sodoo-calendar-own' % path
calendar_in_path = '%sodoo-calendar-in' % path
calendar_all_path = '%sodoo-calendar-all' % path
yield cls(contact_path)
yield cls(calendar_own_path)
yield cls(calendar_in_path)
yield cls(calendar_all_path)
elif len(attributes) == 2: # Collection
if depth == '0':
yield cls(path)
else: # We need all items
collection = cls(path)
yield collection
for item in collection.list():
yield collection.get(item)
elif len(attributes) == 3:
if depth =='0': # Read / Update / Create
if attributes[-1] in cls.partners: # Read or Update
# collection = cls('/'.join(attributes[:-1]))
collection = cls(path)
yield collection.get(attributes[-1])
# FIXME : and depth 1 ?
def get_meta(self, key=None):
"""Get metadata value for collection """
if key:
return self.props.get(key)
else:
return self.props
@classmethod
def create_collection(cls, href, collection=None, props=None):
""" Create collection implementation : only warns ATM """
cls.logger.error('Attempt to create a new collection for %s' % href)
@classmethod
def _get_contacts_from_odoo(cls, extra_domain=False):
""" Gets all contacts available from one Odoo login """
fields = ['name', 'write_date', 'comment', 'street', 'street2', 'zip',
'city', 'phone', 'mobile', 'fax', 'email', 'website',
'function', 'comment', 'image', 'ref',
'category_id', 'parent_id', 'state_id', 'country_id']
domain = [('type', '=', 'contact')]
if extra_domain:
domain.append(extra_domain)
return cls.odoo.execute_kw('res.partner', 'search_read', [domain],
{'fields': fields})
@classmethod
def get_contacts_from_odoo(cls, login):
""" Gets all contacts available from one Odoo login """
cls.logger.info('Get contacts for Odoo user %s' % login)
partners = cls._get_contacts_from_odoo()
cls.partners = {}
for p in partners:
if p.get('ref'):
cls.partners[p['ref']] = p
else:
cls.partners[str(p['id'])] = p
# cls.partners = {p['id']: p for p in partners}
categs = cls.odoo.execute_kw('res.partner.category', 'search_read', [[]],
{'fields': ['name']})
cls.categs = {str(c['id']): c['name'] for c in categs}
return cls.partners.keys()
# return ['res.partner:%s' % pid for pid in cls.partners.keys()]
@classmethod
def get_events_from_odoo(cls, login, path):
""" Gets all events available from one Odoo login """
cls.logger.info('Get events for Odoo user %s' % login)
fields = ['allday', 'start_date', 'stop_date', 'start_datetime',
'stop_datetime', 'write_date', 'name', 'location',
'description', 'recurrency', 'rrule', 'categ_ids', 'alarm_ids']
event_types = cls.odoo.execute_kw('calendar.event.type', 'search_read',
[[]], {'fields': ['name']})
cls.event_types = {et['id']: et['name'] for et in event_types}
event_alarms = cls.odoo.execute_kw('calendar.alarm', 'search_read',
[[]], {'fields': ['type',
'duration_minutes']})
cls.event_alarms = {ea['id']: ea for ea in event_alarms}
cls.odoo.env.context.update({'virtual_id': False}) # Only real events
if path.endswith('odoo-calendar-own'):
domain = [('user_id', '=', cls.odoo.env.uid)]
elif path.endswith('odoo-calendar-in'):
pid = cls.odoo.execute('res.users', 'read', [cls.odoo.env.uid],
['partner_id'])[0]['partner_id'][0]
domain = [('partner_ids', '=', pid)]
else:
domain = []
# Needed to ensure context
event_ids = cls.odoo.env['calendar.event'].search(domain)
events = cls.odoo.execute_kw('calendar.event', 'read',
[event_ids], {'fields': fields})
cls.events = {e['id']: e for e in events}
# WARNING: Odoo does not remove from database deleted recurrent events...
# Should be fixed on Odoo side and will be fixed via 2way here too
return ['calendar.event:%s' % eid for eid in cls.events.keys()]
def sync(self, old_token=None):
""" Debug purpose """
token, ilist = super(Collection, self).sync(old_token)
self.logger.debug('Sync token : %s' % token)
self.logger.debug('Sync list : %s' % ilist)
return token, ilist
def list(self):
"""List collection items."""
self.logger.debug('List collection %s' % self.path)
self.logger.debug('Collection tag %s' % self.tag)
if self.tag:
if self.tag == 'VADDRESSBOOK':
for oid in self.get_contacts_from_odoo(self.owner):
yield oid
elif self.tag == 'VCALENDAR':
for oid in self.get_events_from_odoo(self.owner, self.path):
yield oid
else:
raise NotImplementedError
@classmethod
def _generate_vcard_from_odoo(cls, database_id):
""" Generate and return vCard object from Odoo res.partner record """
# last_modified = strftime("%a, %d %b %Y %H:%M:%S GMT",
# strptime(data.get('write_date'), '%Y-%m-%d %H:%M:%S'))
partner = cls.partners.get(database_id)
last_modified = str(partner.get('write_date'))
vobject_item = vobject.vCard()
vobject_item.add('n')
vobject_item.n.value = vobject.vcard.Name(family=partner.get('name'))
vobject_item.add('fn')
vobject_item.fn.value = partner.get('name')
vobject_item.add('adr')
state_name = partner.get('state_id')[1] if partner.get('state_id') else ''
country_name = partner.get('country_id')[1] if partner.get('country_id') else ''
vobject_item.adr.value = vobject.vcard.Address(
street=' '.join([partner.get('street') or '', partner.get('street2') or '']),
code=partner.get('zip') or '', city=partner.get('city') or '',
region=state_name, country=country_name)
# Performance issue with large attached images...
# if partner.get('image'):
# vobject_item.add('photo;encoding=b;type=jpeg').value = partner.get(
# 'image').replace('\n', '')
if partner.get('phone'):
tel = vobject_item.add('tel')
tel.type_param = 'Tel'
tel.value = partner.get('phone')
if partner.get('mobile'):
tel = vobject_item.add('tel')
tel.type_param = 'Mobile'
tel.value = partner.get('mobile')
if partner.get('fax'):
tel = vobject_item.add('tel')
tel.type_param = 'Fax'
tel.value = partner.get('fax')
if partner.get('email'):
vobject_item.add('email').value = partner.get('email')
if partner.get('website'):
vobject_item.add('url').value = partner.get('website')
if partner.get('function'):
vobject_item.add('role').value = partner.get('function')
if partner.get('comment'):
vobject_item.add('note').value = partner.get('comment')
if partner.get('category_id'):
categs = [cls.categs.get(str(cid)) for cid in partner.get('category_id')]
vobject_item.add('categories').value = categs
if partner.get('parent_id'):
vobject_item.add('org').value = [partner.get('parent_id')[1]]
vobject_item.add('rev').value = last_modified
return vobject_item
@classmethod
def _generate_ics_from_odoo(cls, href, database_id):
""" Generate and return UCS object from Odoo calendar.event record """
# TODO/IMP : attendees management (not handled directly by vobject)
event = cls.events.get(database_id)
if event.get('allday'):
utc_dtstart = datetime.strptime(event.get('start_date'),
'%Y-%m-%d').date()
stop_date_obj = datetime.strptime(event.get('stop_date'),
'%Y-%m-%d').date()
utc_dtstop = stop_date_obj + timedelta(days=1)
else:
utc_dtstart = cls.odoo_date_to_utc(event.get('start_datetime'))
utc_dtstop = cls.odoo_date_to_utc(event.get('stop_datetime'))
last_modified = str(cls.odoo_date_to_utc(event.get('write_date')))
cal = vobject.iCalendar()
cal.add('vevent')
cal.vevent.add('summary').value = event.get('name')
cal.vevent.add('location').value = event.get('location') or ''
cal.vevent.add('description').value = event.get('description') or ''
cal.vevent.add('dtstart').value = utc_dtstart
cal.vevent.add('dtend').value = utc_dtstop
# cal.vevent.add('duration').value = event.duration
if event.get('categ_ids'):
categs = [cls.event_types.get(etid) for etid in event.get('categ_ids')]
cal.vevent.add('categories').value = categs
if event.get('alarm_ids'):
for alarm_id in event.get('alarm_ids'):
alarm = cls.event_alarms.get(alarm_id)
valarm = cal.vevent.add('valarm')
action = 'DISPLAY' if alarm.get('type') == 'notification' else 'EMAIL'
valarm.add('action').value = action
valarm.add('trigger').value = timedelta(
minutes=-alarm.get('duration_minutes'))
if event.get('recurrency'):
cal.vevent.add('rrule').value = event.get('rrule')
cal.vevent.add('uid').value = href
cal.vevent.add('rev').value = last_modified
cal.add('rev').value = last_modified
return cal
def _get_item_from_vobject(self, href, vobject_item):
""" Return Item from Vobject and HREF """
vobject_item.add('uid').value = str(href)
tag, start, end = xmlutils.find_tag_and_time_range(vobject_item)
text = vobject_item.serialize()
etag = get_etag(text)
return Item(
self, href=href, last_modified=vobject_item.rev.value, etag=etag,
text=text, item=vobject_item, uid=href,
name=vobject_item.name, component_name=tag), (tag, start, end)
def _get_with_metadata(self, href):
"""Fetch a single item from Odoo database"""
model = self.odoo_model
database_id = href
if model == 'res.partner':
vobject_item = self._generate_vcard_from_odoo(database_id)
return self._get_item_from_vobject(href, vobject_item)
elif model == 'calendar.event':
vobject_item = self._generate_ics_from_odoo(href, database_id)
return self._get_item_from_vobject(href, vobject_item)
else:
raise NotImplementedError
def get(self, href, verify_href=True):
item, metadata = self._get_with_metadata(str(href))
item.href = str(item.href) # fix for etag radicale concatenation
return item
def upload(self, href, item):
""" Creation or update """
self.logger.info('Upload href %s , item %s' % (href, item))
# TODO : handle image ?
# Ref : important to keep track of creations outside of Odoo
contact_data = {'ref': item.uid.value}
# Name, remove excessive spaces
name = ' '.join(filter(bool, str(item.n.value).split(' ')))
if not name:
name = item.fn.value
contact_data['name'] = name
# Address
if item.contents.get('adr'): # First only
address = item.adr_list[0]
contact_data.update({
'street': address.value.street,
'zip': address.value.code,
'city': address.value.city
})
country = address.value.country
country_dmn = [('name', 'ilike', country)]
country_id = False
if country:
country_id = self.odoo.env['res.country'].search(country_dmn, limit=1)
if country_id:
country_id = country_id[0]
if not country_id:
country_id = self.odoo.env.ref('base.fr').id
contact_data['country_id'] = country_id
# Mail
if item.contents.get('email'): # First only
contact_data['email'] = item.email_list[0].value
# Website
if item.contents.get('url'): # First only
contact_data['website'] = item.url_list[0].value
# Phones
if item.contents.get('tel'):
for tel in item.tel_list:
tel_type = tel.params.get('TYPE', [])
if 'CELL' in tel_type:
contact_data['mobile'] = tel.value
elif 'FAX' in tel_type:
contact_data['fax'] = tel.value
else:
contact_data['phone'] = tel.value
# Function
if item.contents.get('role'):
contact_data['function'] = item.role.value
# Comment
if item.contents.get('note'):
contact_data['comment'] = item.note.value
# Categories
if item.contents.get('categories'):
categ_ids = []
for categ in item.categories.value:
cat_obj = self.odoo.env['res.partner.category']
dmn = [('name', '=ilike', categ)]
categ_id = cat_obj.search(dmn, limit=1)
if not categ_id:
categ_id = cat_obj.create({'name': categ})
self.categs[str(categ_id)] = categ
else:
categ_id = categ_id[0]
categ_ids.append(categ_id)
contact_data['category_id'] = [(6, False, categ_ids)]
# Parent company (get, not create)
partner_obj = self.odoo.env['res.partner']
if item.contents.get('org'):
dmn = [('name', '=ilike', item.org.value), ('is_company', '=', True)]
parent_id = partner_obj.search(dmn, limit=1)
if parent_id:
contact_data['parent_id'] = parent_id.id
contact_data['is_company'] = False
contact_data['company_type'] = 'person'
else:
contact_data['is_company'] = True
contact_data['company_type'] = 'company'
# Create or update
if href in self.partners.keys():
partner = self.partners[href]
partner_id = partner['id']
# TODO: diff changes, do not write everything
partner_obj.write([partner_id], contact_data)
else:
partner_id = partner_obj.create(contact_data)
partner = self._get_contacts_from_odoo(('id', '=', partner_id))[0]
self.partners[item.uid.value] = partner
res_item = self.get(item.uid.value)
# Avoid bad mismatch for CardDAV client with live upload
del res_item.last_modified
return res_item
def delete(self, href=None):
""" Can not delete collection but item, yes """
if self.path and href is None:
href = self.path
if href is None:
# Delete the collection
self.logger.error('Attempt to delete collection %s' % self.path)
raise ValueError('Can not delete collection')
else:
# Delete an item
if self.odoo_model == 'res.partner':
obj = self.odoo.env[self.odoo_model]
record_id = href.split('/')[-1]
partner_id = self.partners.get(record_id)['id']
try:
obj.unlink([partner_id])
except RPCError: # if not removable, archive it
obj.write([partner_id], {'active': False})
del self.partners[record_id]
@property
def last_modified(self):
""" Return last modified """
last = self.odoo.env[self.odoo_model].search([], limit=1, order='write_date desc')
if last:
last_fields = self.odoo.execute(self.odoo_model, 'read', last, ['write_date'])[0]
return str(last_fields['write_date'])
return '1970-01-01 00:00:00'