# -*- coding: utf-8 -*- # Copyright 2018 Fabien Bourgeois # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """ Odoo Radicale Storage Plugin """ # # PLAN # 1. OK : Implement readonly from Odoo only # 2. OK : Implement contacts first (and so vcf) # 3. OK : Implement unique events (.ics) and timezone # 4. OK : Implement notifications for events # 5. OK : Implement recurrent events # 6. OK : Offer two (or more) calendar : own events, events where I'm attended # and all readable events # 7. Offer one calendar per user (via login/email) # 8. Begin write (two way) for contacts # 9. Begin write (two way) for calendar from contextlib import contextmanager from time import strftime, strptime from datetime import timedelta, datetime import pytz import vobject from odoorpc import ODOO from odoorpc.error import RPCError from radicale import xmlutils from radicale.storage import BaseCollection, Item, get_etag, get_uid_from_object from radicale_odoo_auth import Auth class Collection(BaseCollection): """ BaseCollection implementation for Odoo Radicale Storage """ odoo = False def __init__(self, path): """ Init function """ self.__class__.odoo = Auth.odoo if not self.__class__.odoo: self.logger.error('No auth Odoo found...') raise RuntimeError('No auth Odoo found') odoo_timezone = self.configuration.get('storage', 'odoo_timezone') self.__class__.server_timezone = pytz.timezone(odoo_timezone) attributes = path.strip('/').split('/') self.tag = None self.props = {} if 'odoo-contact' in attributes: self.tag = 'VADDRESSBOOK' self.odoo_model = 'res.partner' self.content_suffix = '.vcf' self.props.update({'tag': 'VADDRESSBOOK', 'D:displayname': 'Odoo contacts', 'CR:addressbook-description': 'Contacts form your Odoo account'}) elif ('odoo-calendar-own' in attributes or 'odoo-calendar-in' in attributes or 'odoo-calendar-all' in attributes): self.tag = 'VCALENDAR' self.odoo_model = 'calendar.event' self.content_suffix = '.ics' self.props.update({'tag': 'VCALENDAR'}) if 'odoo-calendar-own' in attributes: self.props.update({'D:displayname': 'Odoo calendar : own events', 'C:calendar-description': 'Own events, from your Odoo calendar'}) elif 'odoo-calendar-in' in attributes: self.props.update({'D:displayname': 'Odoo calendar : events I\'m in', 'C:calendar-description': 'Events you are ' \ 'attended from your Odoo calendar'}) else: self.props.update({'D:displayname': 'Odoo calendar : all events', 'C:calendar-description': 'All events from your Odoo calendar'}) self.path = path.strip('/') self.owner = attributes[0] self.is_principal = len(attributes) == 0 @classmethod def odoo_date_to_utc(cls, date_str): """ Transform naive Odoo date object to UTC TZ """ date_obj = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S') local_date = cls.server_timezone.localize(date_obj, is_dst=None) return local_date.astimezone(pytz.utc) @classmethod @contextmanager def acquire_lock(cls, mode, user=None): cls.user = user yield @classmethod def discover(cls, path, depth="0"): """Discover implementation """ attributes = path.strip('/').split('/') or [] if path and not cls.user: cls.user = attributes[0] cls.logger.debug('Discover : %s (path), %s (depth), %s (cls.user), %s (attributes)' % (path, depth, cls.user, attributes)) yield cls(path) if len(attributes) == 1: # Got all if root is needed contact_path = '%sodoo-contact' % path calendar_own_path = '%sodoo-calendar-own' % path calendar_in_path = '%sodoo-calendar-in' % path calendar_all_path = '%sodoo-calendar-all' % path yield cls(contact_path) yield cls(calendar_own_path) yield cls(calendar_in_path) yield cls(calendar_all_path) elif len(attributes) == 2: # Then we need children collection = cls(path) for item in collection.list(): yield collection.get(item) def get_meta(self, key=None): """Get metadata value for collection """ if key: return self.props.get(key) else: return self.props @classmethod def create_collection(cls, href, collection=None, props=None): """ Create collection implementation : only warns ATM """ cls.logger.error('Attemmpt to create a new collection for %s' % href) @classmethod def get_contacts_from_odoo(cls, login): """ Gets all contacts available from one Odoo login """ cls.logger.info('Get contacts for Odoo user %s' % login) fields = ['name', 'write_date', 'comment', 'street', 'street2', 'zip', 'city', 'phone', 'mobile', 'fax', 'email', 'website', 'function', 'comment', 'image', 'category_id', 'parent_id', 'state_id', 'country_id'] partners = cls.odoo.execute_kw('res.partner', 'search_read', [[]], {'fields': fields}) cls.partners = {p['id']: p for p in partners} categs = cls.odoo.execute_kw('res.partner.category', 'search_read', [[]], {'fields': ['name']}) cls.categs = {c['id']: c['name'] for c in categs} return ['res.partner:%s' % pid for pid in cls.partners.keys()] @classmethod def get_events_from_odoo(cls, login, path): """ Gets all events available from one Odoo login """ cls.logger.info('Get events for Odoo user %s' % login) fields = ['allday', 'start_date', 'stop_date', 'start_datetime', 'stop_datetime', 'write_date', 'name', 'location', 'description', 'recurrency', 'rrule', 'categ_ids', 'alarm_ids'] event_types = cls.odoo.execute_kw('calendar.event.type', 'search_read', [[]], {'fields': ['name']}) cls.event_types = {et['id']: et['name'] for et in event_types} event_alarms = cls.odoo.execute_kw('calendar.alarm', 'search_read', [[]], {'fields': ['type', 'duration_minutes']}) cls.event_alarms = {ea['id']: ea for ea in event_alarms} cls.odoo.env.context.update({'virtual_id': False}) # Only real events if path.endswith('odoo-calendar-own'): domain = [('user_id', '=', cls.odoo.env.uid)] elif path.endswith('odoo-calendar-in'): pid = cls.odoo.execute('res.users', 'read', [cls.odoo.env.uid], ['partner_id'])[0]['partner_id'][0] domain = [('partner_ids', '=', pid)] else: domain = [] # Needed to ensure context event_ids = cls.odoo.env['calendar.event'].search(domain) events = cls.odoo.execute_kw('calendar.event', 'read', [event_ids], {'fields': fields}) cls.events = {e['id']: e for e in events} # WARNING: Odoo does not remove from database deleted recurrent events... # Should be fixed on Odoo side and will be fixed via 2way here too return ['calendar.event:%s' % eid for eid in cls.events.keys()] def sync(self, old_token=None): """ Debug purpose """ token, ilist = super(Collection, self).sync(old_token) self.logger.debug('Sync token : %s' % token) self.logger.debug('Sync list : %s' % ilist) return token, ilist def list(self): """List collection items.""" self.logger.debug('List collection %s' % self.path) self.logger.debug('Collection tag %s' % self.tag) if self.tag: if self.tag == 'VADDRESSBOOK': for oid in self.get_contacts_from_odoo(self.owner): yield oid + self.content_suffix elif self.tag == 'VCALENDAR': for oid in self.get_events_from_odoo(self.owner, self.path): yield oid + self.content_suffix else: raise NotImplementedError @classmethod def _generate_vcard_from_odoo(cls, database_id): """ Generate and return vCard object from Odoo res.partner record """ # last_modified = strftime("%a, %d %b %Y %H:%M:%S GMT", # strptime(data.get('write_date'), '%Y-%m-%d %H:%M:%S')) partner = cls.partners.get(database_id) last_modified = str(partner.get('write_date')) vobject_item = vobject.vCard() vobject_item.add('n') vobject_item.add('fn') vobject_item.fn.value = partner.get('name') vobject_item.add('adr') state_name = partner.get('state_id')[1] if partner.get('state_id') else '' country_name = partner.get('country_id')[1] if partner.get('country_id') else '' vobject_item.adr.value = vobject.vcard.Address( street=' '.join([partner.get('street') or '', partner.get('street2') or '']), code=partner.get('zip') or '', city=partner.get('city') or '', region=state_name, country=country_name) if partner.get('image'): vobject_item.add('photo;encoding=b;type=jpeg').value = partner.get( 'image').replace('\n', '') if partner.get('phone'): tel = vobject_item.add('tel') tel.type_param = 'Tel' tel.value = partner.get('phone') if partner.get('mobile'): tel = vobject_item.add('tel') tel.type_param = 'Mobile' tel.value = partner.get('mobile') if partner.get('fax'): tel = vobject_item.add('tel') tel.type_param = 'Fax' tel.value = partner.get('fax') if partner.get('email'): vobject_item.add('email').value = partner.get('email') if partner.get('website'): vobject_item.add('url').value = partner.get('website') if partner.get('function'): vobject_item.add('role').value = partner.get('function') if partner.get('comment'): vobject_item.add('note').value = partner.get('comment') if partner.get('category_id'): categs = [cls.categs.get(cid) for cid in partner.get('category_id')] vobject_item.add('categories').value = categs if partner.get('parent_id'): vobject_item.add('org').value = [partner.get('parent_id')[1]] vobject_item.add('rev').value = last_modified return vobject_item @classmethod def _generate_ics_from_odoo(cls, href, database_id): """ Generate and return UCS object from Odoo calendar.event record """ # TODO/IMP : attendees management (not handled directly by vobject) event = cls.events.get(database_id) if event.get('allday'): utc_dtstart = datetime.strptime(event.get('start_date'), '%Y-%m-%d').date() stop_date_obj = datetime.strptime(event.get('stop_date'), '%Y-%m-%d').date() utc_dtstop = stop_date_obj + timedelta(days=1) else: utc_dtstart = cls.odoo_date_to_utc(event.get('start_datetime')) utc_dtstop = cls.odoo_date_to_utc(event.get('stop_datetime')) last_modified = str(cls.odoo_date_to_utc(event.get('write_date'))) cal = vobject.iCalendar() cal.add('vevent') cal.vevent.add('summary').value = event.get('name') cal.vevent.add('location').value = event.get('location') or '' cal.vevent.add('description').value = event.get('description') or '' cal.vevent.add('dtstart').value = utc_dtstart cal.vevent.add('dtend').value = utc_dtstop # cal.vevent.add('duration').value = event.duration if event.get('categ_ids'): categs = [cls.event_types.get(etid) for etid in event.get('categ_ids')] cal.vevent.add('categories').value = categs if event.get('alarm_ids'): for alarm_id in event.get('alarm_ids'): alarm = cls.event_alarms.get(alarm_id) valarm = cal.vevent.add('valarm') action = 'DISPLAY' if alarm.get('type') == 'notification' else 'EMAIL' valarm.add('action').value = action valarm.add('trigger').value = timedelta( minutes=-alarm.get('duration_minutes')) if event.get('recurrency'): cal.vevent.add('rrule').value = event.get('rrule') cal.vevent.add('uid').value = href cal.vevent.add('rev').value = last_modified cal.add('rev').value = last_modified return cal def _get_item_from_vobject(self, href, vobject_item): """ Return Item from Vobject and HREF """ vobject_item.add('uid').value = href tag, start, end = xmlutils.find_tag_and_time_range(vobject_item) text = vobject_item.serialize() etag = get_etag(text) # uid = get_uid_from_object(vobject_item) return Item( self, href=href, last_modified=vobject_item.rev.value, etag=etag, text=text, item=vobject_item, uid=href, name=vobject_item.name, component_name=tag), (tag, start, end) def _get_with_metadata(self, href): """Fetch a single item from Odoo database""" model, database_id = href.split(':') database_id = int(database_id[:-len(self.content_suffix)]) if model == 'res.partner': vobject_item = self._generate_vcard_from_odoo(database_id) return self._get_item_from_vobject(href, vobject_item) elif model == 'calendar.event': vobject_item = self._generate_ics_from_odoo(href, database_id) return self._get_item_from_vobject(href, vobject_item) else: raise NotImplementedError def get(self, href, verify_href=True): item, metadata = self._get_with_metadata(href) return item def delete(self, href=None): """ Can not delete collection but item, yes """ if href is None: # Delete the collection self.logger.error('Attempt to delete collection %s' % self.path) raise ValueError('Can not delete collection') else: # Delete an item raise NotImplementedError @property def last_modified(self): """ Return last modified """ last = self.odoo.env[self.odoo_model].search([], limit=1, order='write_date desc') if last: last_fields = self.odoo.execute(self.odoo_model, 'read', last, ['write_date'])[0] return str(last_fields['write_date']) return '1970-01-01 00:00:00'