491 lines
21 KiB
Python
491 lines
21 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright 2018-2020 Fabien Bourgeois <fabien@yaltik.com>
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as
|
|
# published by the Free Software Foundation, either version 3 of the
|
|
# License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
""" Odoo Radicale Storage Plugin """
|
|
|
|
# # PLAN
|
|
# 1. OK : Implement readonly from Odoo only
|
|
# 2. OK : Implement contacts first (and so vcf)
|
|
# 3. OK : Implement unique events (.ics) and timezone
|
|
# 4. OK : Implement notifications for events
|
|
# 5. OK : Implement recurrent events
|
|
# 6. OK : Offer two (or more) calendar : own events, events where I'm attended
|
|
# and all readable events
|
|
# 7. Offer one calendar per user (via login/email)
|
|
# 8. OK : Begin write (two way) for contacts
|
|
# 9. Begin write (two way) for calendar
|
|
|
|
|
|
from contextlib import contextmanager
|
|
from time import strftime, strptime
|
|
from datetime import timedelta, datetime
|
|
import pytz
|
|
import vobject
|
|
from odoorpc import ODOO
|
|
from odoorpc.error import RPCError
|
|
from radicale import xmlutils
|
|
from radicale.storage import BaseCollection, Item, get_etag, get_uid_from_object
|
|
from radicale_odoo_auth import Auth
|
|
|
|
|
|
class Collection(BaseCollection):
|
|
""" BaseCollection implementation for Odoo Radicale Storage """
|
|
|
|
odoo = False
|
|
|
|
@classmethod
|
|
def static_init(cls):
|
|
""" Static init is usefull mainly for threading lock, not needed for
|
|
Odoo storage """
|
|
pass
|
|
|
|
def __init__(self, path):
|
|
""" Init function """
|
|
self.__class__.odoo = Auth.odoo
|
|
if not self.__class__.odoo:
|
|
self.logger.error('No auth Odoo found...')
|
|
raise RuntimeError('No auth Odoo found')
|
|
odoo_timezone = self.configuration.get('storage', 'odoo_timezone')
|
|
self.__class__.server_timezone = pytz.timezone(odoo_timezone)
|
|
|
|
attributes = path.strip('/').split('/')
|
|
self.tag = None
|
|
self.props = {}
|
|
if 'odoo-contact' in attributes:
|
|
self.tag = 'VADDRESSBOOK'
|
|
self.odoo_model = 'res.partner'
|
|
self.content_suffix = '.vcf'
|
|
self.props.update({'tag': 'VADDRESSBOOK',
|
|
'D:displayname': 'Odoo contacts',
|
|
'CR:addressbook-description': 'Contacts form your Odoo account'})
|
|
elif ('odoo-calendar-own' in attributes or
|
|
'odoo-calendar-in' in attributes or 'odoo-calendar-all' in attributes):
|
|
self.tag = 'VCALENDAR'
|
|
self.odoo_model = 'calendar.event'
|
|
self.content_suffix = '.ics'
|
|
self.props.update({'tag': 'VCALENDAR'})
|
|
if 'odoo-calendar-own' in attributes:
|
|
self.props.update({'D:displayname': 'Odoo calendar : own events',
|
|
'C:calendar-description': 'Own events, from your Odoo calendar'})
|
|
elif 'odoo-calendar-in' in attributes:
|
|
self.props.update({'D:displayname': 'Odoo calendar : events I\'m in',
|
|
'C:calendar-description': 'Events you are ' \
|
|
'attended from your Odoo calendar'})
|
|
else:
|
|
self.props.update({'D:displayname': 'Odoo calendar : all events',
|
|
'C:calendar-description': 'All events from your Odoo calendar'})
|
|
|
|
self.path = path.strip('/')
|
|
self.owner = attributes[0]
|
|
self.is_principal = len(attributes) == 0
|
|
|
|
@classmethod
|
|
def odoo_date_to_utc(cls, date_str):
|
|
""" Transform naive Odoo date object to UTC TZ """
|
|
date_obj = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
|
|
local_date = cls.server_timezone.localize(date_obj, is_dst=None)
|
|
return local_date.astimezone(pytz.utc)
|
|
|
|
@classmethod
|
|
@contextmanager
|
|
def acquire_lock(cls, mode, user=None):
|
|
cls.user = user
|
|
yield
|
|
|
|
@classmethod
|
|
def discover(cls, path, depth='0'):
|
|
"""Discover implementation """
|
|
attributes = path.strip('/').split('/') or []
|
|
if path and not cls.user:
|
|
cls.user = attributes[0]
|
|
|
|
depth = depth or '0' # Sometimes depth is '' ?
|
|
cls.logger.debug('Discover : %s (path), %s (depth), %s (cls.user), %s (attributes)' %
|
|
(path, depth, cls.user, attributes))
|
|
|
|
if len(attributes) == 1: # Root collection (user)
|
|
if depth == '0':
|
|
yield cls(path) # Do not ?
|
|
else: # Need crafted children
|
|
contact_path = '%sodoo-contact' % path
|
|
calendar_own_path = '%sodoo-calendar-own' % path
|
|
calendar_in_path = '%sodoo-calendar-in' % path
|
|
calendar_all_path = '%sodoo-calendar-all' % path
|
|
yield cls(contact_path)
|
|
yield cls(calendar_own_path)
|
|
yield cls(calendar_in_path)
|
|
yield cls(calendar_all_path)
|
|
elif len(attributes) == 2: # Collection
|
|
if depth == '0':
|
|
yield cls(path)
|
|
else: # We need all items
|
|
collection = cls(path)
|
|
yield collection
|
|
for item in collection.list():
|
|
yield collection.get(item)
|
|
elif len(attributes) == 3:
|
|
if depth =='0': # Read / Update / Create
|
|
if attributes[-1] in cls.partners: # Read or Update
|
|
# collection = cls('/'.join(attributes[:-1]))
|
|
collection = cls(path)
|
|
yield collection.get(attributes[-1])
|
|
# FIXME : and depth 1 ?
|
|
|
|
def get_meta(self, key=None):
|
|
"""Get metadata value for collection """
|
|
if key:
|
|
return self.props.get(key)
|
|
else:
|
|
return self.props
|
|
|
|
@classmethod
|
|
def create_collection(cls, href, collection=None, props=None):
|
|
""" Create collection implementation : only warns ATM """
|
|
cls.logger.error('Attempt to create a new collection for %s' % href)
|
|
|
|
@classmethod
|
|
def _get_contacts_from_odoo(cls, extra_domain=False):
|
|
""" Gets all contacts available from one Odoo login """
|
|
fields = ['name', 'write_date', 'comment', 'street', 'street2', 'zip',
|
|
'city', 'phone', 'mobile', 'fax', 'email', 'website',
|
|
'function', 'comment', 'image', 'ref',
|
|
'category_id', 'parent_id', 'state_id', 'country_id']
|
|
domain = [('type', '=', 'contact')]
|
|
if extra_domain:
|
|
domain.append(extra_domain)
|
|
return cls.odoo.execute_kw('res.partner', 'search_read', [domain],
|
|
{'fields': fields})
|
|
|
|
@classmethod
|
|
def get_contacts_from_odoo(cls, login):
|
|
""" Gets all contacts available from one Odoo login """
|
|
cls.logger.info('Get contacts for Odoo user %s' % login)
|
|
partners = cls._get_contacts_from_odoo()
|
|
cls.partners = {}
|
|
for p in partners:
|
|
if p.get('ref'):
|
|
cls.partners[p['ref']] = p
|
|
else:
|
|
cls.partners[str(p['id'])] = p
|
|
# cls.partners = {p['id']: p for p in partners}
|
|
categs = cls.odoo.execute_kw('res.partner.category', 'search_read', [[]],
|
|
{'fields': ['name']})
|
|
cls.categs = {str(c['id']): c['name'] for c in categs}
|
|
return cls.partners.keys()
|
|
# return ['res.partner:%s' % pid for pid in cls.partners.keys()]
|
|
|
|
@classmethod
|
|
def get_events_from_odoo(cls, login, path):
|
|
""" Gets all events available from one Odoo login """
|
|
cls.logger.info('Get events for Odoo user %s' % login)
|
|
fields = ['allday', 'start_date', 'stop_date', 'start_datetime',
|
|
'stop_datetime', 'write_date', 'name', 'location',
|
|
'description', 'recurrency', 'rrule', 'categ_ids', 'alarm_ids']
|
|
event_types = cls.odoo.execute_kw('calendar.event.type', 'search_read',
|
|
[[]], {'fields': ['name']})
|
|
cls.event_types = {et['id']: et['name'] for et in event_types}
|
|
event_alarms = cls.odoo.execute_kw('calendar.alarm', 'search_read',
|
|
[[]], {'fields': ['type',
|
|
'duration_minutes']})
|
|
cls.event_alarms = {ea['id']: ea for ea in event_alarms}
|
|
|
|
cls.odoo.env.context.update({'virtual_id': False}) # Only real events
|
|
if path.endswith('odoo-calendar-own'):
|
|
domain = [('user_id', '=', cls.odoo.env.uid)]
|
|
elif path.endswith('odoo-calendar-in'):
|
|
pid = cls.odoo.execute('res.users', 'read', [cls.odoo.env.uid],
|
|
['partner_id'])[0]['partner_id'][0]
|
|
domain = [('partner_ids', '=', pid)]
|
|
else:
|
|
domain = []
|
|
# Needed to ensure context
|
|
event_ids = cls.odoo.env['calendar.event'].search(domain)
|
|
events = cls.odoo.execute_kw('calendar.event', 'read',
|
|
[event_ids], {'fields': fields})
|
|
cls.events = {e['id']: e for e in events}
|
|
# WARNING: Odoo does not remove from database deleted recurrent events...
|
|
# Should be fixed on Odoo side and will be fixed via 2way here too
|
|
return ['calendar.event:%s' % eid for eid in cls.events.keys()]
|
|
|
|
def sync(self, old_token=None):
|
|
""" Debug purpose """
|
|
token, ilist = super(Collection, self).sync(old_token)
|
|
self.logger.debug('Sync token : %s' % token)
|
|
self.logger.debug('Sync list : %s' % ilist)
|
|
return token, ilist
|
|
|
|
def list(self):
|
|
"""List collection items."""
|
|
self.logger.debug('List collection %s' % self.path)
|
|
self.logger.debug('Collection tag %s' % self.tag)
|
|
if self.tag:
|
|
if self.tag == 'VADDRESSBOOK':
|
|
for oid in self.get_contacts_from_odoo(self.owner):
|
|
yield oid
|
|
elif self.tag == 'VCALENDAR':
|
|
for oid in self.get_events_from_odoo(self.owner, self.path):
|
|
yield oid
|
|
else:
|
|
raise NotImplementedError
|
|
|
|
@classmethod
|
|
def _generate_vcard_from_odoo(cls, database_id):
|
|
""" Generate and return vCard object from Odoo res.partner record """
|
|
# last_modified = strftime("%a, %d %b %Y %H:%M:%S GMT",
|
|
# strptime(data.get('write_date'), '%Y-%m-%d %H:%M:%S'))
|
|
partner = cls.partners.get(database_id)
|
|
last_modified = str(partner.get('write_date'))
|
|
vobject_item = vobject.vCard()
|
|
vobject_item.add('n')
|
|
vobject_item.n.value = vobject.vcard.Name(family=partner.get('name'))
|
|
vobject_item.add('fn')
|
|
vobject_item.fn.value = partner.get('name')
|
|
vobject_item.add('adr')
|
|
state_name = partner.get('state_id')[1] if partner.get('state_id') else ''
|
|
country_name = partner.get('country_id')[1] if partner.get('country_id') else ''
|
|
vobject_item.adr.value = vobject.vcard.Address(
|
|
street=' '.join([partner.get('street') or '', partner.get('street2') or '']),
|
|
code=partner.get('zip') or '', city=partner.get('city') or '',
|
|
region=state_name, country=country_name)
|
|
# Performance issue with large attached images...
|
|
# if partner.get('image'):
|
|
# vobject_item.add('photo;encoding=b;type=jpeg').value = partner.get(
|
|
# 'image').replace('\n', '')
|
|
if partner.get('phone'):
|
|
tel = vobject_item.add('tel')
|
|
tel.type_param = 'Tel'
|
|
tel.value = partner.get('phone')
|
|
if partner.get('mobile'):
|
|
tel = vobject_item.add('tel')
|
|
tel.type_param = 'Mobile'
|
|
tel.value = partner.get('mobile')
|
|
if partner.get('fax'):
|
|
tel = vobject_item.add('tel')
|
|
tel.type_param = 'Fax'
|
|
tel.value = partner.get('fax')
|
|
if partner.get('email'):
|
|
vobject_item.add('email').value = partner.get('email')
|
|
if partner.get('website'):
|
|
vobject_item.add('url').value = partner.get('website')
|
|
if partner.get('function'):
|
|
vobject_item.add('role').value = partner.get('function')
|
|
if partner.get('comment'):
|
|
vobject_item.add('note').value = partner.get('comment')
|
|
if partner.get('category_id'):
|
|
categs = [cls.categs.get(str(cid)) for cid in partner.get('category_id')]
|
|
vobject_item.add('categories').value = categs
|
|
if partner.get('parent_id'):
|
|
vobject_item.add('org').value = [partner.get('parent_id')[1]]
|
|
vobject_item.add('rev').value = last_modified
|
|
return vobject_item
|
|
|
|
@classmethod
|
|
def _generate_ics_from_odoo(cls, href, database_id):
|
|
""" Generate and return UCS object from Odoo calendar.event record """
|
|
# TODO/IMP : attendees management (not handled directly by vobject)
|
|
event = cls.events.get(database_id)
|
|
if event.get('allday'):
|
|
utc_dtstart = datetime.strptime(event.get('start_date'),
|
|
'%Y-%m-%d').date()
|
|
stop_date_obj = datetime.strptime(event.get('stop_date'),
|
|
'%Y-%m-%d').date()
|
|
utc_dtstop = stop_date_obj + timedelta(days=1)
|
|
else:
|
|
utc_dtstart = cls.odoo_date_to_utc(event.get('start_datetime'))
|
|
utc_dtstop = cls.odoo_date_to_utc(event.get('stop_datetime'))
|
|
last_modified = str(cls.odoo_date_to_utc(event.get('write_date')))
|
|
cal = vobject.iCalendar()
|
|
cal.add('vevent')
|
|
cal.vevent.add('summary').value = event.get('name')
|
|
cal.vevent.add('location').value = event.get('location') or ''
|
|
cal.vevent.add('description').value = event.get('description') or ''
|
|
cal.vevent.add('dtstart').value = utc_dtstart
|
|
cal.vevent.add('dtend').value = utc_dtstop
|
|
# cal.vevent.add('duration').value = event.duration
|
|
if event.get('categ_ids'):
|
|
categs = [cls.event_types.get(etid) for etid in event.get('categ_ids')]
|
|
cal.vevent.add('categories').value = categs
|
|
if event.get('alarm_ids'):
|
|
for alarm_id in event.get('alarm_ids'):
|
|
alarm = cls.event_alarms.get(alarm_id)
|
|
valarm = cal.vevent.add('valarm')
|
|
action = 'DISPLAY' if alarm.get('type') == 'notification' else 'EMAIL'
|
|
valarm.add('action').value = action
|
|
valarm.add('trigger').value = timedelta(
|
|
minutes=-alarm.get('duration_minutes'))
|
|
if event.get('recurrency'):
|
|
cal.vevent.add('rrule').value = event.get('rrule')
|
|
cal.vevent.add('uid').value = href
|
|
cal.vevent.add('rev').value = last_modified
|
|
cal.add('rev').value = last_modified
|
|
return cal
|
|
|
|
def _get_item_from_vobject(self, href, vobject_item):
|
|
""" Return Item from Vobject and HREF """
|
|
vobject_item.add('uid').value = str(href)
|
|
tag, start, end = xmlutils.find_tag_and_time_range(vobject_item)
|
|
text = vobject_item.serialize()
|
|
etag = get_etag(text)
|
|
return Item(
|
|
self, href=href, last_modified=vobject_item.rev.value, etag=etag,
|
|
text=text, item=vobject_item, uid=href,
|
|
name=vobject_item.name, component_name=tag), (tag, start, end)
|
|
|
|
def _get_with_metadata(self, href):
|
|
"""Fetch a single item from Odoo database"""
|
|
model = self.odoo_model
|
|
database_id = href
|
|
if model == 'res.partner':
|
|
vobject_item = self._generate_vcard_from_odoo(database_id)
|
|
return self._get_item_from_vobject(href, vobject_item)
|
|
elif model == 'calendar.event':
|
|
vobject_item = self._generate_ics_from_odoo(href, database_id)
|
|
return self._get_item_from_vobject(href, vobject_item)
|
|
else:
|
|
raise NotImplementedError
|
|
|
|
def get(self, href, verify_href=True):
|
|
item, metadata = self._get_with_metadata(str(href))
|
|
item.href = str(item.href) # fix for etag radicale concatenation
|
|
return item
|
|
|
|
def upload(self, href, item):
|
|
""" Creation or update """
|
|
self.logger.info('Upload href %s , item %s' % (href, item))
|
|
# TODO : handle image ?
|
|
|
|
# Ref : important to keep track of creations outside of Odoo
|
|
contact_data = {'ref': item.uid.value}
|
|
# Name, remove excessive spaces
|
|
name = ' '.join(filter(bool, str(item.n.value).split(' ')))
|
|
if not name:
|
|
name = item.fn.value
|
|
contact_data['name'] = name
|
|
# Address
|
|
if item.contents.get('adr'): # First only
|
|
address = item.adr_list[0]
|
|
|
|
contact_data.update({
|
|
'street': address.value.street,
|
|
'zip': address.value.code,
|
|
'city': address.value.city
|
|
})
|
|
country = address.value.country
|
|
country_dmn = [('name', 'ilike', country)]
|
|
country_id = False
|
|
|
|
if country:
|
|
country_id = self.odoo.env['res.country'].search(country_dmn, limit=1)
|
|
if country_id:
|
|
country_id = country_id[0]
|
|
if not country_id:
|
|
country_id = self.odoo.env.ref('base.fr').id
|
|
contact_data['country_id'] = country_id
|
|
|
|
# Mail
|
|
if item.contents.get('email'): # First only
|
|
contact_data['email'] = item.email_list[0].value
|
|
# Website
|
|
if item.contents.get('url'): # First only
|
|
contact_data['website'] = item.url_list[0].value
|
|
# Phones
|
|
if item.contents.get('tel'):
|
|
for tel in item.tel_list:
|
|
tel_type = tel.params.get('TYPE', [])
|
|
if 'CELL' in tel_type:
|
|
contact_data['mobile'] = tel.value
|
|
elif 'FAX' in tel_type:
|
|
contact_data['fax'] = tel.value
|
|
else:
|
|
contact_data['phone'] = tel.value
|
|
# Function
|
|
if item.contents.get('role'):
|
|
contact_data['function'] = item.role.value
|
|
# Comment
|
|
if item.contents.get('note'):
|
|
contact_data['comment'] = item.note.value
|
|
# Categories
|
|
if item.contents.get('categories'):
|
|
categ_ids = []
|
|
for categ in item.categories.value:
|
|
cat_obj = self.odoo.env['res.partner.category']
|
|
dmn = [('name', '=ilike', categ)]
|
|
categ_id = cat_obj.search(dmn, limit=1)
|
|
if not categ_id:
|
|
categ_id = cat_obj.create({'name': categ})
|
|
self.categs[str(categ_id)] = categ
|
|
else:
|
|
categ_id = categ_id[0]
|
|
categ_ids.append(categ_id)
|
|
contact_data['category_id'] = [(6, False, categ_ids)]
|
|
# Parent company (get, not create)
|
|
partner_obj = self.odoo.env['res.partner']
|
|
if item.contents.get('org'):
|
|
dmn = [('name', '=ilike', item.org.value), ('is_company', '=', True)]
|
|
parent_id = partner_obj.search(dmn, limit=1)
|
|
if parent_id:
|
|
contact_data['parent_id'] = parent_id.id
|
|
contact_data['is_company'] = False
|
|
contact_data['company_type'] = 'person'
|
|
else:
|
|
contact_data['is_company'] = True
|
|
contact_data['company_type'] = 'company'
|
|
# Create or update
|
|
if href in self.partners.keys():
|
|
partner = self.partners[href]
|
|
partner_id = partner['id']
|
|
# TODO: diff changes, do not write everything
|
|
partner_obj.write([partner_id], contact_data)
|
|
else:
|
|
partner_id = partner_obj.create(contact_data)
|
|
partner = self._get_contacts_from_odoo(('id', '=', partner_id))[0]
|
|
self.partners[item.uid.value] = partner
|
|
|
|
res_item = self.get(item.uid.value)
|
|
# Avoid bad mismatch for CardDAV client with live upload
|
|
del res_item.last_modified
|
|
return res_item
|
|
|
|
def delete(self, href=None):
|
|
""" Can not delete collection but item, yes """
|
|
if self.path and href is None:
|
|
href = self.path
|
|
if href is None:
|
|
# Delete the collection
|
|
self.logger.error('Attempt to delete collection %s' % self.path)
|
|
raise ValueError('Can not delete collection')
|
|
else:
|
|
# Delete an item
|
|
if self.odoo_model == 'res.partner':
|
|
obj = self.odoo.env[self.odoo_model]
|
|
record_id = href.split('/')[-1]
|
|
partner_id = self.partners.get(record_id)['id']
|
|
try:
|
|
obj.unlink([partner_id])
|
|
except RPCError: # if not removable, archive it
|
|
obj.write([partner_id], {'active': False})
|
|
del self.partners[record_id]
|
|
|
|
@property
|
|
def last_modified(self):
|
|
""" Return last modified """
|
|
last = self.odoo.env[self.odoo_model].search([], limit=1, order='write_date desc')
|
|
if last:
|
|
last_fields = self.odoo.execute(self.odoo_model, 'read', last, ['write_date'])[0]
|
|
return str(last_fields['write_date'])
|
|
return '1970-01-01 00:00:00'
|