279 lines
12 KiB
Python
279 lines
12 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright 2018 Fabien Bourgeois <fabien@yaltik.com>
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as
|
|
# published by the Free Software Foundation, either version 3 of the
|
|
# License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
""" Odoo Radicale Storage Plugin """
|
|
|
|
# # PLAN
|
|
# 1. OK : Implement readonly from Odoo only
|
|
# 2. OK : Implement contacts first (and so vcf)
|
|
# 3. OK : Implement unique events (.ics) and timezone
|
|
# 4. Implement notifications for events
|
|
# 5. Implement recurrent events
|
|
# 6. Begin write (two way) for contacts
|
|
# 7. Begin write (two way) for calendar
|
|
|
|
|
|
from contextlib import contextmanager
|
|
from time import strftime, strptime
|
|
from datetime import timedelta
|
|
import pytz
|
|
import vobject
|
|
from odoorpc import ODOO
|
|
from odoorpc.error import RPCError
|
|
from radicale import xmlutils
|
|
from radicale.storage import BaseCollection, Item, get_etag, get_uid_from_object
|
|
from radicale_odoo_auth import Auth
|
|
|
|
|
|
class Collection(BaseCollection):
|
|
""" BaseCollection implementation for Odoo Radicale Storage """
|
|
|
|
odoo = False
|
|
|
|
def __init__(self, path):
|
|
""" Init function """
|
|
self.__class__.odoo = Auth.odoo
|
|
if not self.__class__.odoo:
|
|
self.logger.error('No auth Odoo found...')
|
|
raise RuntimeError('No auth Odoo found')
|
|
odoo_timezone = self.configuration.get('storage', 'odoo_timezone')
|
|
self.__class__.server_timezone = pytz.timezone(odoo_timezone)
|
|
|
|
attributes = path.strip('/').split('/')
|
|
self.tag = None
|
|
self.props = {}
|
|
if 'odoo-contact' in attributes:
|
|
self.tag = 'VADDRESSBOOK'
|
|
self.odoo_model = 'res.partner'
|
|
self.content_suffix = '.vcf'
|
|
self.props.update({'tag': 'VADDRESSBOOK',
|
|
'D:displayname': 'Odoo contacts',
|
|
'CR:addressbook-description': 'Contacts form your Odoo account'})
|
|
elif 'odoo-calendar' in attributes:
|
|
self.tag = 'VCALENDAR'
|
|
self.odoo_model = 'calendar.event'
|
|
self.content_suffix = '.ics'
|
|
self.props.update({'tag': 'VCALENDAR',
|
|
'D:displayname': 'Odoo calendar',
|
|
'C:calendar-description': 'Events form your Odoo calendar'})
|
|
|
|
self.path = path.strip('/')
|
|
self.owner = attributes[0]
|
|
self.is_principal = len(attributes) == 0
|
|
|
|
@classmethod
|
|
def odoo_date_to_utc(cls, date_obj):
|
|
""" Transform naive Odoo date object to UTC TZ """
|
|
local_date = cls.server_timezone.localize(date_obj, is_dst=None)
|
|
return local_date.astimezone(pytz.utc)
|
|
|
|
@classmethod
|
|
@contextmanager
|
|
def acquire_lock(cls, mode, user=None):
|
|
cls.user = user
|
|
yield
|
|
|
|
@classmethod
|
|
def discover(cls, path, depth="0"):
|
|
"""Discover implementation """
|
|
attributes = path.strip('/').split('/') or []
|
|
if path and not cls.user:
|
|
cls.user = attributes[0]
|
|
|
|
cls.logger.warning('Discover : %s (path), %s (depth), %s (cls.user), %s (attributes)' %
|
|
(path, depth, cls.user, attributes))
|
|
yield cls(path)
|
|
if len(attributes) == 1: # Got all if root is needed
|
|
contact_path = '%sodoo-contact' % path
|
|
calendar_path = '%sodoo-calendar' % path
|
|
yield cls(contact_path)
|
|
yield cls(calendar_path)
|
|
elif len(attributes) == 2: # Then we need children
|
|
collection = cls(path)
|
|
for item in collection.list():
|
|
yield collection.get(item)
|
|
|
|
def get_meta(self, key=None):
|
|
"""Get metadata value for collection """
|
|
if key:
|
|
return self.props.get(key)
|
|
else:
|
|
return self.props
|
|
|
|
@classmethod
|
|
def create_collection(cls, href, collection=None, props=None):
|
|
""" Create collection implementation : only warns ATM """
|
|
cls.logger.error('Attemmpt to create a new collection for %s' % href)
|
|
|
|
@classmethod
|
|
def get_contacts_from_odoo(cls, login):
|
|
""" Gets all contacts available from one Odoo login """
|
|
cls.logger.info('Get contacts for Odoo user %s' % login)
|
|
partner_ids = cls.odoo.env['res.partner'].search([])
|
|
return ['res.partner:%s' % pid for pid in partner_ids]
|
|
|
|
@classmethod
|
|
def get_events_from_odoo(cls, login):
|
|
""" Gets all events available from one Odoo login """
|
|
cls.logger.info('Get events for Odoo user %s' % login)
|
|
event_ids = cls.odoo.env['calendar.event'].search([])
|
|
return ['calendar.event:%s' % eid for eid in event_ids]
|
|
|
|
def sync(self, old_token=None):
|
|
""" Debug purpose """
|
|
token, ilist = super(Collection, self).sync(old_token)
|
|
self.logger.debug('Sync token : %s' % token)
|
|
self.logger.debug('Sync list : %s' % ilist)
|
|
return token, ilist
|
|
|
|
def list(self):
|
|
"""List collection items."""
|
|
self.logger.debug('List collection %s' % self.path)
|
|
self.logger.debug('Collection tag %s' % self.tag)
|
|
if self.tag:
|
|
if self.tag == 'VADDRESSBOOK':
|
|
for oid in self.get_contacts_from_odoo(self.owner):
|
|
yield oid + self.content_suffix
|
|
elif self.tag == 'VCALENDAR':
|
|
for oid in self.get_events_from_odoo(self.owner):
|
|
yield oid + self.content_suffix
|
|
else:
|
|
raise NotImplementedError
|
|
|
|
@classmethod
|
|
def _generate_vcard_from_odoo(cls, partner):
|
|
""" Generate and return vCard object from Odoo res.partner record """
|
|
# last_modified = strftime("%a, %d %b %Y %H:%M:%S GMT",
|
|
# strptime(data.get('write_date'), '%Y-%m-%d %H:%M:%S'))
|
|
last_modified = str(partner.write_date)
|
|
vobject_item = vobject.vCard()
|
|
vobject_item.add('n')
|
|
vobject_item.add('fn')
|
|
vobject_item.fn.value = partner.name
|
|
vobject_item.add('adr')
|
|
state_name = partner.state_id.name if partner.state_id else ''
|
|
country_name = partner.country_id.name if partner.country_id else ''
|
|
vobject_item.adr.value = vobject.vcard.Address(
|
|
street=' '.join([partner.street or '', partner.street2 or '']),
|
|
code=partner.zip or '', city=partner.city or '',
|
|
region=state_name, country=country_name)
|
|
if partner.image:
|
|
vobject_item.add('photo;encoding=b;type=jpeg').value = partner.image.replace('\n', '')
|
|
if partner.phone:
|
|
vobject_item.add('tel').value = partner.phone
|
|
if partner.email:
|
|
vobject_item.add('email').value = partner.email
|
|
if partner.website:
|
|
vobject_item.add('url').value = partner.website
|
|
if partner.function:
|
|
vobject_item.add('role').value = partner.function
|
|
if partner.comment:
|
|
vobject_item.add('note').value = partner.comment
|
|
if partner.category_id:
|
|
categs = [categ.name for categ in partner.category_id]
|
|
vobject_item.add('categories').value = categs
|
|
if partner.parent_id:
|
|
vobject_item.add('org').value = [partner.parent_id.name]
|
|
vobject_item.add('rev').value = last_modified
|
|
return vobject_item
|
|
|
|
@classmethod
|
|
def _generate_ics_from_odoo(cls, href, event):
|
|
""" Generate and return UCS object from Odoo calendar.event record """
|
|
# TODO/IMP : attendees management (not handled directly by vobject)
|
|
if event.allday:
|
|
utc_dtstart = event.start_date
|
|
utc_dtstop = event.stop_date + timedelta(days=1)
|
|
else:
|
|
utc_dtstart = cls.odoo_date_to_utc(event.start_datetime)
|
|
utc_dtstop = cls.odoo_date_to_utc(event.stop_datetime)
|
|
last_modified = str(cls.odoo_date_to_utc(event.write_date))
|
|
cal = vobject.iCalendar()
|
|
cal.add('vevent')
|
|
cal.vevent.add('summary').value = event.name
|
|
cal.vevent.add('location').value = event.location or ''
|
|
cal.vevent.add('description').value = event.description or ''
|
|
cal.vevent.add('dtstart').value = utc_dtstart
|
|
cal.vevent.add('dtend').value = utc_dtstop
|
|
# cal.vevent.add('duration').value = event.duration
|
|
if event.categ_ids:
|
|
categs = [categ.name for categ in event.categ_ids]
|
|
cal.vevent.add('categories').value = categs
|
|
if event.alarm_ids:
|
|
for alarm in event.alarm_ids:
|
|
valarm = cal.vevent.add('valarm')
|
|
action = 'DISPLAY' if alarm.type == 'notification' else 'EMAIL'
|
|
valarm.add('action').value = action
|
|
valarm.add('trigger').value = timedelta(minutes=-alarm.duration_minutes)
|
|
cal.vevent.add('uid').value = href
|
|
cal.vevent.add('rev').value = last_modified
|
|
cal.add('rev').value = last_modified
|
|
return cal
|
|
|
|
def _get_item_from_vobject(self, href, vobject_item):
|
|
""" Return Item from Vobject and HREF """
|
|
vobject_item.add('uid').value = href
|
|
tag, start, end = xmlutils.find_tag_and_time_range(vobject_item)
|
|
text = vobject_item.serialize()
|
|
etag = get_etag(text)
|
|
# uid = get_uid_from_object(vobject_item)
|
|
return Item(
|
|
self, href=href, last_modified=vobject_item.rev.value, etag=etag,
|
|
text=text, item=vobject_item, uid=href,
|
|
name=vobject_item.name, component_name=tag), (tag, start, end)
|
|
|
|
def _get_with_metadata(self, href):
|
|
"""Fetch a single item from Odoo database"""
|
|
model, database_id = href.split(':')
|
|
database_id = int(database_id[:-len(self.content_suffix)])
|
|
# fields = ['name', 'write_date', 'comment', 'street', 'street2', 'zip',
|
|
# 'city', 'state_id' 'country_id']
|
|
# data = self.odoo.execute(model, 'read', [database_id], fields)[0]
|
|
record = self.odoo.env[model].browse([database_id])
|
|
if model == 'res.partner':
|
|
vobject_item = self._generate_vcard_from_odoo(record)
|
|
return self._get_item_from_vobject(href, vobject_item)
|
|
elif model == 'calendar.event':
|
|
vobject_item = self._generate_ics_from_odoo(href, record)
|
|
return self._get_item_from_vobject(href, vobject_item)
|
|
else:
|
|
raise NotImplementedError
|
|
|
|
def get(self, href, verify_href=True):
|
|
item, metadata = self._get_with_metadata(href)
|
|
return item
|
|
|
|
def delete(self, href=None):
|
|
""" Can not delete collection but item, yes """
|
|
if href is None:
|
|
# Delete the collection
|
|
self.logger.error('Attempt to delete collection %s' % self.path)
|
|
raise ValueError('Can not delete collection')
|
|
else:
|
|
# Delete an item
|
|
raise NotImplementedError
|
|
|
|
@property
|
|
def last_modified(self):
|
|
""" Return last modified """
|
|
last = self.odoo.env[self.odoo_model].search([], limit=1, order='write_date desc')
|
|
last_fields = self.odoo.execute(self.odoo_model, 'read', last, ['write_date'])[0]
|
|
self.logger.debug(last_fields)
|
|
# return strftime("%a, %d %b %Y %H:%M:%S GMT",
|
|
# strptime(last_fields.get('write_date'), '%Y-%m-%d %H:%M:%S'))
|
|
return str(last_fields['write_date'])
|