# -*- coding: utf-8 -*- import datetime import json import logging import random import select import threading import time import flectra from flectra import api, fields, models, SUPERUSER_ID from flectra.tools.misc import DEFAULT_SERVER_DATETIME_FORMAT _logger = logging.getLogger(__name__) # longpolling timeout connection TIMEOUT = 50 #---------------------------------------------------------- # Bus #---------------------------------------------------------- def json_dump(v): return json.dumps(v, separators=(',', ':')) def hashable(key): if isinstance(key, list): key = tuple(key) return key class ImBus(models.Model): _name = 'bus.bus' create_date = fields.Datetime('Create date') channel = fields.Char('Channel') message = fields.Char('Message') @api.model def gc(self): timeout_ago = datetime.datetime.utcnow()-datetime.timedelta(seconds=TIMEOUT*2) domain = [('create_date', '<', timeout_ago.strftime(DEFAULT_SERVER_DATETIME_FORMAT))] return self.sudo().search(domain).unlink() @api.model def sendmany(self, notifications): channels = set() for channel, message in notifications: channels.add(channel) values = { "channel": json_dump(channel), "message": json_dump(message) } self.sudo().create(values) if random.random() < 0.01: self.gc() if channels: # We have to wait until the notifications are commited in database. # When calling `NOTIFY imbus`, some concurrent threads will be # awakened and will fetch the notification in the bus table. If the # transaction is not commited yet, there will be nothing to fetch, # and the longpolling will return no notification. def notify(): with flectra.sql_db.db_connect('postgres').cursor() as cr: cr.execute("notify imbus, %s", (json_dump(list(channels)),)) self._cr.after('commit', notify) @api.model def sendone(self, channel, message): self.sendmany([[channel, message]]) @api.model def poll(self, channels, last=0, options=None, force_status=False): if options is None: options = {} # first poll return the notification in the 'buffer' if last == 0: timeout_ago = datetime.datetime.utcnow()-datetime.timedelta(seconds=TIMEOUT) domain = [('create_date', '>', timeout_ago.strftime(DEFAULT_SERVER_DATETIME_FORMAT))] else: # else returns the unread notifications domain = [('id', '>', last)] channels = [json_dump(c) for c in channels] domain.append(('channel', 'in', channels)) notifications = self.sudo().search_read(domain) # list of notification to return result = [] for notif in notifications: result.append({ 'id': notif['id'], 'channel': json.loads(notif['channel']), 'message': json.loads(notif['message']), }) if result or force_status: partner_ids = options.get('bus_presence_partner_ids') if partner_ids: partners = self.env['res.partner'].browse(partner_ids) result += [{ 'id': -1, 'channel': (self._cr.dbname, 'bus.presence'), 'message': {'id': r.id, 'im_status': r.im_status}} for r in partners] return result #---------------------------------------------------------- # Dispatcher #---------------------------------------------------------- class ImDispatch(object): def __init__(self): self.channels = {} self.started = False def poll(self, dbname, channels, last, options=None, timeout=TIMEOUT): if options is None: options = {} # Dont hang ctrl-c for a poll request, we need to bypass private # attribute access because we dont know before starting the thread that # it will handle a longpolling request if not flectra.evented: current = threading.current_thread() current._Thread__daemonic = True # PY2 current._daemonic = True # PY3 # rename the thread to avoid tests waiting for a longpolling current.setName("flectra.longpolling.request.%s" % current.ident) registry = flectra.registry(dbname) # immediatly returns if past notifications exist with registry.cursor() as cr: env = api.Environment(cr, SUPERUSER_ID, {}) notifications = env['bus.bus'].poll(channels, last, options) # immediatly returns in peek mode if options.get('peek'): return dict(notifications=notifications, channels=channels) # or wait for future ones if not notifications: if not self.started: # Lazy start of events listener self.start() event = self.Event() for channel in channels: self.channels.setdefault(hashable(channel), []).append(event) try: event.wait(timeout=timeout) with registry.cursor() as cr: env = api.Environment(cr, SUPERUSER_ID, {}) notifications = env['bus.bus'].poll(channels, last, options, force_status=True) except Exception: # timeout pass return notifications def loop(self): """ Dispatch postgres notifications to the relevant polling threads/greenlets """ _logger.info("Bus.loop listen imbus on db postgres") with flectra.sql_db.db_connect('postgres').cursor() as cr: conn = cr._cnx cr.execute("listen imbus") cr.commit(); while True: if select.select([conn], [], [], TIMEOUT) == ([], [], []): pass else: conn.poll() channels = [] while conn.notifies: channels.extend(json.loads(conn.notifies.pop().payload)) # dispatch to local threads/greenlets events = set() for channel in channels: events.update(self.channels.pop(hashable(channel), [])) for event in events: event.set() def run(self): while True: try: self.loop() except Exception as e: _logger.exception("Bus.loop error, sleep and retry") time.sleep(TIMEOUT) def start(self): if flectra.evented: # gevent mode import gevent self.Event = gevent.event.Event gevent.spawn(self.run) else: # threaded mode self.Event = threading.Event t = threading.Thread(name="%s.Bus" % __name__, target=self.run) t.daemon = True t.start() self.started = True return self dispatch = None if not flectra.multi_process or flectra.evented: # We only use the event dispatcher in threaded and gevent mode dispatch = ImDispatch()