From 200e016ab89e8bb3025e70013442273b90afca44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexandre=20D=C3=ADaz?= Date: Mon, 18 Nov 2019 11:46:59 +0100 Subject: [PATCH] [IMP] mail_tracking: black, isort --- mail_tracking/__manifest__.py | 14 +- mail_tracking/controllers/main.py | 84 ++-- mail_tracking/hooks.py | 34 +- mail_tracking/models/ir_mail_server.py | 107 +++-- mail_tracking/models/mail_bounced_mixin.py | 26 +- mail_tracking/models/mail_mail.py | 24 +- mail_tracking/models/mail_message.py | 218 +++++----- mail_tracking/models/mail_resend_message.py | 56 +-- mail_tracking/models/mail_thread.py | 110 ++--- mail_tracking/models/mail_tracking_email.py | 370 ++++++++-------- mail_tracking/models/mail_tracking_event.py | 163 +++---- mail_tracking/models/res_partner.py | 19 +- mail_tracking/tests/test_mail_tracking.py | 456 ++++++++++---------- 13 files changed, 894 insertions(+), 787 deletions(-) diff --git a/mail_tracking/__manifest__.py b/mail_tracking/__manifest__.py index 14b7ea7..5914214 100644 --- a/mail_tracking/__manifest__.py +++ b/mail_tracking/__manifest__.py @@ -10,15 +10,11 @@ "version": "12.0.2.0.1", "category": "Social Network", "website": "http://github.com/OCA/social", - "author": "Tecnativa, " - "Odoo Community Association (OCA)", + "author": ("Tecnativa, " "Odoo Community Association (OCA)"), "license": "AGPL-3", "application": False, - 'installable': True, - "depends": [ - "decimal_precision", - "mail", - ], + "installable": True, + "depends": ["decimal_precision", "mail"], "data": [ "data/tracking_data.xml", "security/mail_tracking_email_security.xml", @@ -35,8 +31,6 @@ "static/src/xml/failed_message/thread.xml", "static/src/xml/failed_message/discuss.xml", ], - 'demo': [ - 'demo/demo.xml', - ], + "demo": ["demo/demo.xml"], "pre_init_hook": "pre_init_hook", } diff --git a/mail_tracking/controllers/main.py b/mail_tracking/controllers/main.py index c4a5e0e..cf111c3 100644 --- a/mail_tracking/controllers/main.py +++ b/mail_tracking/controllers/main.py @@ -1,17 +1,20 @@ # Copyright 2016 Antonio Espinosa - # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). -import werkzeug -import odoo +import base64 +import logging from contextlib import contextmanager -from odoo import api, http, SUPERUSER_ID + +import werkzeug + +import odoo +from odoo import SUPERUSER_ID, api, http from odoo.addons.mail.controllers.main import MailController -import logging -import base64 + _logger = logging.getLogger(__name__) -BLANK = 'R0lGODlhAQABAIAAANvf7wAAACH5BAEAAAAALAAAAAABAAEAAAICRAEAOw==' +BLANK = "R0lGODlhAQABAIAAANvf7wAAACH5BAEAAAAALAAAAAABAAEAAAICRAEAOw==" @contextmanager @@ -28,60 +31,74 @@ def db_env(dbname): class MailTrackingController(MailController): - def _request_metadata(self): """Prepare remote info metadata""" request = http.request.httprequest return { - 'ip': request.remote_addr or False, - 'user_agent': request.user_agent or False, - 'os_family': request.user_agent.platform or False, - 'ua_family': request.user_agent.browser or False, + "ip": request.remote_addr or False, + "user_agent": request.user_agent or False, + "os_family": request.user_agent.platform or False, + "ua_family": request.user_agent.browser or False, } - @http.route(['/mail/tracking/all/', - '/mail/tracking/event//'], - type='http', auth='none', csrf=False) + @http.route( + [ + "/mail/tracking/all/", + "/mail/tracking/event//", + ], + type="http", + auth="none", + csrf=False, + ) def mail_tracking_event(self, db, event_type=None, **kw): """Route used by external mail service""" metadata = self._request_metadata() res = None with db_env(db) as env: try: - res = env['mail.tracking.email'].event_process( - http.request, kw, metadata, event_type=event_type) + res = env["mail.tracking.email"].event_process( + http.request, kw, metadata, event_type=event_type + ) except Exception: pass - if not res or res == 'NOT FOUND': + if not res or res == "NOT FOUND": return werkzeug.exceptions.NotAcceptable() return res - @http.route(['/mail/tracking/open/' - '//blank.gif', - '/mail/tracking/open/' - '///blank.gif'], - type='http', auth='none', methods=['GET']) + @http.route( + [ + "/mail/tracking/open/" "//blank.gif", + "/mail/tracking/open/" + "///blank.gif", + ], + type="http", + auth="none", + methods=["GET"], + ) def mail_tracking_open(self, db, tracking_email_id, token=False, **kw): """Route used to track mail openned (With & Without Token)""" metadata = self._request_metadata() with db_env(db) as env: try: - tracking_email = env['mail.tracking.email'].search([ - ('id', '=', tracking_email_id), - ('state', 'in', ['sent', 'delivered']), - ('token', '=', token), - ]) + tracking_email = env["mail.tracking.email"].search( + [ + ("id", "=", tracking_email_id), + ("state", "in", ["sent", "delivered"]), + ("token", "=", token), + ] + ) if tracking_email: - tracking_email.event_create('open', metadata) + tracking_email.event_create("open", metadata) else: _logger.warning( - "MailTracking email '%s' not found", tracking_email_id) + "MailTracking email '%s' not found", tracking_email_id + ) except Exception: pass # Always return GIF blank image response = werkzeug.wrappers.Response() - response.mimetype = 'image/gif' + response.mimetype = "image/gif" response.data = base64.b64decode(BLANK) return response @@ -89,8 +106,7 @@ class MailTrackingController(MailController): def mail_init_messaging(self): """Route used to initial values of Discuss app""" values = super().mail_init_messaging() - values.update({ - 'failed_counter': - http.request.env['mail.message'].get_failed_count(), - }) + values.update( + {"failed_counter": http.request.env["mail.message"].get_failed_count()} + ) return values diff --git a/mail_tracking/hooks.py b/mail_tracking/hooks.py index afe6e65..5693ebc 100644 --- a/mail_tracking/hooks.py +++ b/mail_tracking/hooks.py @@ -2,34 +2,40 @@ # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). import logging + from psycopg2.extensions import AsIs _logger = logging.getLogger(__name__) def column_exists(cr, table, column): - cr.execute(""" + cr.execute( + """ SELECT column_name FROM information_schema.columns - WHERE table_name = %s AND column_name = %s""", (table, column)) + WHERE table_name = %s AND column_name = %s""", + (table, column), + ) return bool(cr.fetchall()) def column_add_with_value(cr, table, column, field_type, value): if not column_exists(cr, table, column): - cr.execute(""" + cr.execute( + """ ALTER TABLE %s - ADD COLUMN %s %s""", (AsIs(table), AsIs(column), AsIs(field_type))) - cr.execute(""" - UPDATE %s SET %s = %s""", (AsIs(table), AsIs(column), value)) + ADD COLUMN %s %s""", + (AsIs(table), AsIs(column), AsIs(field_type)), + ) + cr.execute( + """ + UPDATE %s SET %s = %s""", + (AsIs(table), AsIs(column), value), + ) def pre_init_hook(cr): - _logger.info("Creating res.partner.tracking_emails_count column " - "with value 0") - column_add_with_value( - cr, "res_partner", "tracking_emails_count", "integer", 0) - _logger.info("Creating res.partner.email_score column " - "with value 50.0") - column_add_with_value( - cr, "res_partner", "email_score", "double precision", 50.0) + _logger.info("Creating res.partner.tracking_emails_count column " "with value 0") + column_add_with_value(cr, "res_partner", "tracking_emails_count", "integer", 0) + _logger.info("Creating res.partner.email_score column " "with value 50.0") + column_add_with_value(cr, "res_partner", "email_score", "double precision", 50.0) diff --git a/mail_tracking/models/ir_mail_server.py b/mail_tracking/models/ir_mail_server.py index 6aaf465..8bf1404 100644 --- a/mail_tracking/models/ir_mail_server.py +++ b/mail_tracking/models/ir_mail_server.py @@ -3,7 +3,8 @@ import re import threading -from odoo import models, api, tools + +from odoo import api, models, tools class IrMailServer(models.Model): @@ -12,40 +13,63 @@ class IrMailServer(models.Model): def _tracking_headers_add(self, tracking_email_id, headers): """Allow other addons to add its own tracking SMTP headers""" headers = headers or {} - headers['X-Odoo-Database'] = getattr( - threading.currentThread(), 'dbname', None), - headers['X-Odoo-Tracking-ID'] = tracking_email_id + headers["X-Odoo-Database"] = ( + getattr(threading.currentThread(), "dbname", None), + ) + headers["X-Odoo-Tracking-ID"] = tracking_email_id return headers def _tracking_email_id_body_get(self, body): - body = body or '' + body = body or "" # https://regex101.com/r/lW4cB1/2 - match = re.search( - r']*data-odoo-tracking-email=["\']([0-9]*)["\']', body) + match = re.search(r']*data-odoo-tracking-email=["\']([0-9]*)["\']', body) return int(match.group(1)) if match.group(1) else False - def build_email(self, email_from, email_to, subject, body, email_cc=None, - email_bcc=None, reply_to=False, attachments=None, - message_id=None, references=None, object_id=False, - subtype='plain', headers=None, body_alternative=None, - subtype_alternative='plain'): + def build_email( + self, + email_from, + email_to, + subject, + body, + email_cc=None, + email_bcc=None, + reply_to=False, + attachments=None, + message_id=None, + references=None, + object_id=False, + subtype="plain", + headers=None, + body_alternative=None, + subtype_alternative="plain", + ): tracking_email_id = self._tracking_email_id_body_get(body) if tracking_email_id: headers = self._tracking_headers_add(tracking_email_id, headers) msg = super(IrMailServer, self).build_email( - email_from, email_to, subject, body, email_cc=email_cc, - email_bcc=email_bcc, reply_to=reply_to, attachments=attachments, - message_id=message_id, references=references, object_id=object_id, - subtype=subtype, headers=headers, + email_from, + email_to, + subject, + body, + email_cc=email_cc, + email_bcc=email_bcc, + reply_to=reply_to, + attachments=attachments, + message_id=message_id, + references=references, + object_id=object_id, + subtype=subtype, + headers=headers, body_alternative=body_alternative, - subtype_alternative=subtype_alternative) + subtype_alternative=subtype_alternative, + ) return msg def _tracking_email_get(self, message): tracking_email_id = False - if message.get('X-Odoo-Tracking-ID', '').isdigit(): - tracking_email_id = int(message['X-Odoo-Tracking-ID']) - return self.env['mail.tracking.email'].browse(tracking_email_id) + if message.get("X-Odoo-Tracking-ID", "").isdigit(): + tracking_email_id = int(message["X-Odoo-Tracking-ID"]) + return self.env["mail.tracking.email"].browse(tracking_email_id) def _smtp_server_get(self, mail_server_id, smtp_server): smtp_server_used = False @@ -53,36 +77,49 @@ class IrMailServer(models.Model): if mail_server_id: mail_server = self.browse(mail_server_id) elif not smtp_server: - mail_server_ids = self.search([], order='sequence', limit=1) + mail_server_ids = self.search([], order="sequence", limit=1) mail_server = mail_server_ids[0] if mail_server_ids else None if mail_server: smtp_server_used = mail_server.smtp_host else: - smtp_server_used = smtp_server or tools.config.get('smtp_server') + smtp_server_used = smtp_server or tools.config.get("smtp_server") return smtp_server_used @api.model - def send_email(self, message, mail_server_id=None, smtp_server=None, - smtp_port=None, smtp_user=None, smtp_password=None, - smtp_encryption=None, smtp_debug=False, smtp_session=None): + def send_email( + self, + message, + mail_server_id=None, + smtp_server=None, + smtp_port=None, + smtp_user=None, + smtp_password=None, + smtp_encryption=None, + smtp_debug=False, + smtp_session=None, + ): message_id = False tracking_email = self._tracking_email_get(message) - smtp_server_used = self.sudo()._smtp_server_get( - mail_server_id, smtp_server, - ) + smtp_server_used = self.sudo()._smtp_server_get(mail_server_id, smtp_server) try: message_id = super(IrMailServer, self).send_email( - message, mail_server_id=mail_server_id, - smtp_server=smtp_server, smtp_port=smtp_port, - smtp_user=smtp_user, smtp_password=smtp_password, - smtp_encryption=smtp_encryption, smtp_debug=smtp_debug, - smtp_session=smtp_session) + message, + mail_server_id=mail_server_id, + smtp_server=smtp_server, + smtp_port=smtp_port, + smtp_user=smtp_user, + smtp_password=smtp_password, + smtp_encryption=smtp_encryption, + smtp_debug=smtp_debug, + smtp_session=smtp_session, + ) except Exception as e: if tracking_email: tracking_email.smtp_error(self, smtp_server_used, e) if message_id and tracking_email: vals = tracking_email._tracking_sent_prepare( - self, smtp_server_used, message, message_id) + self, smtp_server_used, message, message_id + ) if vals: - self.env['mail.tracking.event'].sudo().create(vals) + self.env["mail.tracking.event"].sudo().create(vals) return message_id diff --git a/mail_tracking/models/mail_bounced_mixin.py b/mail_tracking/models/mail_bounced_mixin.py index 91737d4..d63b445 100644 --- a/mail_tracking/models/mail_bounced_mixin.py +++ b/mail_tracking/models/mail_bounced_mixin.py @@ -1,7 +1,7 @@ # Copyright 2018 Tecnativa - Ernesto Tejeda # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). -from odoo import api, fields, models +from odoo import fields, models class MailBouncedMixin(models.AbstractModel): @@ -10,9 +10,9 @@ class MailBouncedMixin(models.AbstractModel): the mixin and must contain the email field of the model. """ - _name = 'mail.bounced.mixin' - _description = 'Mail bounced mixin' - _primary_email = ['email'] + _name = "mail.bounced.mixin" + _description = "Mail bounced mixin" + _primary_email = ["email"] email_bounced = fields.Boolean(index=True) @@ -20,25 +20,25 @@ class MailBouncedMixin(models.AbstractModel): def email_bounced_set(self, tracking_emails, reason, event=None): """Inherit this method to make any other actions to the model that inherit the mixin""" - if self.env.context.get('write_loop'): + if self.env.context.get("write_loop"): # We avoid with the context an infinite recursion calling write # method from other write method. return True partners = self.filtered(lambda r: not r.email_bounced) - return partners.write({'email_bounced': True}) + return partners.write({"email_bounced": True}) def write(self, vals): [email_field] = self._primary_email if email_field not in vals: return super().write(vals) email = vals[email_field].lower() if vals[email_field] else False - mte_obj = self.env['mail.tracking.email'] - vals['email_bounced'] = mte_obj.email_is_bounced(email) - if vals['email_bounced']: + mte_obj = self.env["mail.tracking.email"] + vals["email_bounced"] = mte_obj.email_is_bounced(email) + if vals["email_bounced"]: res = mte_obj._email_last_tracking_state(email) - tracking = mte_obj.browse(res[0].get('id')) + tracking = mte_obj.browse(res[0].get("id")) event = tracking.tracking_event_ids[:1] - self.with_context( - write_loop=True, - ).email_bounced_set(tracking, event.error_details, event) + self.with_context(write_loop=True).email_bounced_set( + tracking, event.error_details, event + ) return super().write(vals) diff --git a/mail_tracking/models/mail_mail.py b/mail_tracking/models/mail_mail.py index 1d4f42b..4a5a430 100644 --- a/mail_tracking/models/mail_mail.py +++ b/mail_tracking/models/mail_mail.py @@ -5,27 +5,27 @@ import time from datetime import datetime from email.utils import COMMASPACE -from odoo import models, fields +from odoo import fields, models class MailMail(models.Model): - _inherit = 'mail.mail' + _inherit = "mail.mail" def _tracking_email_prepare(self, partner, email): """Prepare email.tracking.email record values""" ts = time.time() dt = datetime.utcfromtimestamp(ts) - email_to_list = email.get('email_to', []) + email_to_list = email.get("email_to", []) email_to = COMMASPACE.join(email_to_list) return { - 'name': self.subject, - 'timestamp': '%.6f' % ts, - 'time': fields.Datetime.to_string(dt), - 'mail_id': self.id, - 'mail_message_id': self.mail_message_id.id, - 'partner_id': partner.id if partner else False, - 'recipient': email_to, - 'sender': self.email_from, + "name": self.subject, + "timestamp": "%.6f" % ts, + "time": fields.Datetime.to_string(dt), + "mail_id": self.id, + "mail_message_id": self.mail_message_id.id, + "partner_id": partner.id if partner else False, + "recipient": email_to, + "sender": self.email_from, } def _send_prepare_values(self, partner=None): @@ -33,5 +33,5 @@ class MailMail(models.Model): to the email""" email = super()._send_prepare_values(partner=partner) vals = self._tracking_email_prepare(partner, email) - tracking_email = self.env['mail.tracking.email'].sudo().create(vals) + tracking_email = self.env["mail.tracking.email"].sudo().create(vals) return tracking_email.tracking_img_add(email) diff --git a/mail_tracking/models/mail_message.py b/mail_tracking/models/mail_message.py index 6f407c2..f709c33 100644 --- a/mail_tracking/models/mail_message.py +++ b/mail_tracking/models/mail_message.py @@ -2,7 +2,7 @@ # Copyright 2019 Alexandre Díaz # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). -from odoo import _, models, api, fields +from odoo import _, api, fields, models from odoo.tools import email_split @@ -10,16 +10,16 @@ class MailMessage(models.Model): _inherit = "mail.message" # Recipients - email_cc = fields.Char("Cc", help='Additional recipients that receive a ' - '"Carbon Copy" of the e-mail') + email_cc = fields.Char( + "Cc", help="Additional recipients that receive a " '"Carbon Copy" of the e-mail' + ) mail_tracking_ids = fields.One2many( - comodel_name='mail.tracking.email', - inverse_name='mail_message_id', + comodel_name="mail.tracking.email", + inverse_name="mail_message_id", string="Mail Trackings", ) mail_tracking_needs_action = fields.Boolean( - help="The message tracking will be considered" - " to filter tracking issues", + help="The message tracking will be considered" " to filter tracking issues", default=False, ) is_failed_message = fields.Boolean(compute="_compute_is_failed_message") @@ -27,95 +27,105 @@ class MailMessage(models.Model): @api.model def get_failed_states(self): """The 'failed' states of the message""" - return {'error', 'rejected', 'spam', 'bounced', 'soft-bounced'} + return {"error", "rejected", "spam", "bounced", "soft-bounced"} - @api.depends('mail_tracking_needs_action', 'author_id', 'partner_ids', - 'mail_tracking_ids.state') + @api.depends( + "mail_tracking_needs_action", + "author_id", + "partner_ids", + "mail_tracking_ids.state", + ) def _compute_is_failed_message(self): """Compute 'is_failed_message' field for the active user""" failed_states = self.get_failed_states() for message in self: needs_action = message.mail_tracking_needs_action involves_me = self.env.user.partner_id in ( - message.author_id | message.partner_ids) + message.author_id | message.partner_ids + ) has_failed_trackings = failed_states.intersection( - message.mapped("mail_tracking_ids.state")) + message.mapped("mail_tracking_ids.state") + ) message.is_failed_message = bool( - needs_action and involves_me and has_failed_trackings) + needs_action and involves_me and has_failed_trackings + ) def _tracking_status_map_get(self): """Map tracking states to be used in chatter""" return { - 'False': 'waiting', - 'error': 'error', - 'deferred': 'sent', - 'sent': 'sent', - 'delivered': 'delivered', - 'opened': 'opened', - 'rejected': 'error', - 'spam': 'error', - 'unsub': 'opened', - 'bounced': 'error', - 'soft-bounced': 'error', + "False": "waiting", + "error": "error", + "deferred": "sent", + "sent": "sent", + "delivered": "delivered", + "opened": "opened", + "rejected": "error", + "spam": "error", + "unsub": "opened", + "bounced": "error", + "soft-bounced": "error", } def _partner_tracking_status_get(self, tracking_email): """Determine tracking status""" tracking_status_map = self._tracking_status_map_get() - status = 'unknown' + status = "unknown" if tracking_email: tracking_email_status = str(tracking_email.state) - status = tracking_status_map.get(tracking_email_status, 'unknown') + status = tracking_status_map.get(tracking_email_status, "unknown") return status def _partner_tracking_status_human_get(self, status): """Translations for tracking statuses to be used on qweb""" - statuses = {'waiting': _('Waiting'), 'error': _('Error'), - 'sent': _('Sent'), 'delivered': _('Delivered'), - 'opened': _('Opened'), 'unknown': _('Unknown')} + statuses = { + "waiting": _("Waiting"), + "error": _("Error"), + "sent": _("Sent"), + "delivered": _("Delivered"), + "opened": _("Opened"), + "unknown": _("Unknown"), + } return _("Status: %s") % statuses[status] @api.model def _get_error_description(self, tracking): """Translations for error descriptions to be used on qweb""" - descriptions = { - 'no_recipient': _("The partner doesn't have a defined email"), - } - return descriptions.get(tracking.error_type, - tracking.error_description) + descriptions = {"no_recipient": _("The partner doesn't have a defined email")} + return descriptions.get(tracking.error_type, tracking.error_description) def tracking_status(self): """Generates a complete status tracking of the messages by partner""" res = {} for message in self: partner_trackings = [] - partners_already = self.env['res.partner'] - partners = self.env['res.partner'] - trackings = self.env['mail.tracking.email'].sudo().search([ - ('mail_message_id', '=', message.id), - ]) + partners_already = self.env["res.partner"] + partners = self.env["res.partner"] + trackings = ( + self.env["mail.tracking.email"] + .sudo() + .search([("mail_message_id", "=", message.id)]) + ) # Get Cc recipients email_cc_list = email_split(message.email_cc) if any(email_cc_list): - partners |= partners.search([('email', 'in', email_cc_list)]) + partners |= partners.search([("email", "in", email_cc_list)]) email_cc_list = set(email_cc_list) # Search all trackings for this message for tracking in trackings: status = self._partner_tracking_status_get(tracking) - recipient = ( - tracking.partner_id.name or tracking.recipient) - partner_trackings.append({ - 'status': status, - 'status_human': - self._partner_tracking_status_human_get(status), - 'error_type': tracking.error_type, - 'error_description': - self._get_error_description(tracking), - 'tracking_id': tracking.id, - 'recipient': recipient, - 'partner_id': tracking.partner_id.id, - 'isCc': False, - }) + recipient = tracking.partner_id.name or tracking.recipient + partner_trackings.append( + { + "status": status, + "status_human": self._partner_tracking_status_human_get(status), + "error_type": tracking.error_type, + "error_description": self._get_error_description(tracking), + "tracking_id": tracking.id, + "recipient": recipient, + "partner_id": tracking.partner_id.id, + "isCc": False, + } + ) if tracking.partner_id: email_cc_list.discard(tracking.partner_id.email) partners_already |= tracking.partner_id @@ -127,12 +137,11 @@ class MailMessage(models.Model): # Remove recipients already included partners -= partners_already tracking_unkown_values = { - 'status': 'unknown', - 'status_human': self._partner_tracking_status_human_get( - 'unknown'), - 'error_type': False, - 'error_description': False, - 'tracking_id': False, + "status": "unknown", + "status_human": self._partner_tracking_status_human_get("unknown"), + "error_type": False, + "error_description": False, + "tracking_id": False, } for partner in partners: # If there is partners not included, then status is 'unknown' @@ -141,36 +150,31 @@ class MailMessage(models.Model): if partner.email in email_cc_list: email_cc_list.discard(partner.email) isCc = True - tracking_unkown_values.update({ - 'recipient': partner.name, - 'partner_id': partner.id, - 'isCc': isCc, - }) + tracking_unkown_values.update( + {"recipient": partner.name, "partner_id": partner.id, "isCc": isCc} + ) partner_trackings.append(tracking_unkown_values.copy()) for email in email_cc_list: # If there is Cc without partner - tracking_unkown_values.update({ - 'recipient': email, - 'partner_id': False, - 'isCc': True, - }) + tracking_unkown_values.update( + {"recipient": email, "partner_id": False, "isCc": True} + ) partner_trackings.append(tracking_unkown_values.copy()) res[message.id] = { - 'partner_trackings': partner_trackings, - 'is_failed_message': message.is_failed_message, + "partner_trackings": partner_trackings, + "is_failed_message": message.is_failed_message, } return res @api.model def _message_read_dict_postprocess(self, messages, message_tree): """Preare values to be used by the chatter widget""" - res = super()._message_read_dict_postprocess( - messages, message_tree) - mail_message_ids = {m.get('id') for m in messages if m.get('id')} + res = super()._message_read_dict_postprocess(messages, message_tree) + mail_message_ids = {m.get("id") for m in messages if m.get("id")} mail_messages = self.browse(mail_message_ids) tracking_statuses = mail_messages.tracking_status() for message_dict in messages: - mail_message_id = message_dict.get('id', False) + mail_message_id = message_dict.get("id", False) if mail_message_id: message_dict.update(tracking_statuses[mail_message_id]) return res @@ -180,27 +184,30 @@ class MailMessage(models.Model): """Preare values to be used by the chatter widget""" self.ensure_one() failed_trackings = self.mail_tracking_ids.filtered( - lambda x: x.state in self.get_failed_states()) - failed_partners = failed_trackings.mapped('partner_id') + lambda x: x.state in self.get_failed_states() + ) + failed_partners = failed_trackings.mapped("partner_id") failed_recipients = failed_partners.name_get() if self.author_id: author = self.author_id.name_get()[0] else: - author = (-1, _('-Unknown Author-')) + author = (-1, _("-Unknown Author-")) return { - 'id': self.id, - 'date': self.date, - 'author': author, - 'body': self.body, - 'failed_recipients': failed_recipients, + "id": self.id, + "date": self.date, + "author": author, + "body": self.body, + "failed_recipients": failed_recipients, } @api.multi def get_failed_messages(self): """Returns the list of failed messages to be used by the failed_messages widget""" - return [msg._prepare_dict_failed_message() - for msg in self.sorted('date', reverse=True)] + return [ + msg._prepare_dict_failed_message() + for msg in self.sorted("date", reverse=True) + ] @api.multi def set_need_action_done(self): @@ -208,23 +215,23 @@ class MailMessage(models.Model): This will mark them to be ignored in the tracking issues filter. """ - self.check_access_rule('read') - self.write({'mail_tracking_needs_action': False}) + self.check_access_rule("read") + self.write({"mail_tracking_needs_action": False}) notification = { - 'type': 'toggle_tracking_status', - 'message_ids': self.ids, - 'needs_actions': False + "type": "toggle_tracking_status", + "message_ids": self.ids, + "needs_actions": False, } - self.env['bus.bus'].sendone( - (self._cr.dbname, 'res.partner', self.env.user.partner_id.id), - notification) + self.env["bus.bus"].sendone( + (self._cr.dbname, "res.partner", self.env.user.partner_id.id), notification + ) def _get_failed_message_domain(self): - domain = self.env['mail.thread']._get_failed_message_domain() + domain = self.env["mail.thread"]._get_failed_message_domain() domain += [ - '|', - ('partner_ids', 'in', [self.env.user.partner_id.id]), - ('author_id', '=', self.env.user.partner_id.id), + "|", + ("partner_ids", "in", [self.env.user.partner_id.id]), + ("author_id", "=", self.env.user.partner_id.id), ] return domain @@ -240,17 +247,16 @@ class MailMessage(models.Model): Used by Discuss """ unreviewed_messages = self.search(self._get_failed_message_domain()) - unreviewed_messages.write({'mail_tracking_needs_action': False}) + unreviewed_messages.write({"mail_tracking_needs_action": False}) ids = unreviewed_messages.ids - self.env['bus.bus'].sendone( - (self._cr.dbname, 'res.partner', - self.env.user.partner_id.id), + self.env["bus.bus"].sendone( + (self._cr.dbname, "res.partner", self.env.user.partner_id.id), { - 'type': 'toggle_tracking_status', - 'message_ids': ids, - 'needs_actions': False, - } + "type": "toggle_tracking_status", + "message_ids": ids, + "needs_actions": False, + }, ) return ids diff --git a/mail_tracking/models/mail_resend_message.py b/mail_tracking/models/mail_resend_message.py index 8758f97..1ba8c19 100644 --- a/mail_tracking/models/mail_resend_message.py +++ b/mail_tracking/models/mail_resend_message.py @@ -1,7 +1,7 @@ # Copyright 2019 Alexandre Díaz # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). -from odoo import models, api +from odoo import api, models class MailResendMessage(models.TransientModel): @@ -10,45 +10,53 @@ class MailResendMessage(models.TransientModel): @api.model def default_get(self, fields): rec = super().default_get(fields) - message_id = self._context.get('mail_message_to_resend') + message_id = self._context.get("mail_message_to_resend") if message_id: - MailMessageObj = self.env['mail.message'] + MailMessageObj = self.env["mail.message"] mail_message_id = MailMessageObj.browse(message_id) failed_states = MailMessageObj.get_failed_states() tracking_ids = mail_message_id.mail_tracking_ids.filtered( - lambda x: x.state in failed_states) + lambda x: x.state in failed_states + ) if any(tracking_ids): - partner_ids = [(0, 0, { - "partner_id": tracking.partner_id.id, - "name": tracking.partner_id.name, - "email": tracking.partner_id.email, - "resend": True, - "message": tracking.error_description, - }) for tracking in tracking_ids] - rec['partner_ids'].extend(partner_ids) + partner_ids = [ + ( + 0, + 0, + { + "partner_id": tracking.partner_id.id, + "name": tracking.partner_id.name, + "email": tracking.partner_id.email, + "resend": True, + "message": tracking.error_description, + }, + ) + for tracking in tracking_ids + ] + rec["partner_ids"].extend(partner_ids) return rec @api.multi def resend_mail_action(self): for wizard in self: - to_send = wizard.partner_ids.filtered("resend").mapped( - "partner_id") + to_send = wizard.partner_ids.filtered("resend").mapped("partner_id") if to_send: # Set as reviewed wizard.mail_message_id.mail_tracking_needs_action = False # Reset mail.tracking.email state - tracking_ids = wizard.mail_message_id.mail_tracking_ids\ - .filtered(lambda x: x.partner_id in to_send) - tracking_ids.write({'state': False}) + tracking_ids = wizard.mail_message_id.mail_tracking_ids.filtered( + lambda x: x.partner_id in to_send + ) + tracking_ids.write({"state": False}) # Send bus notifications to update Discuss and # mail_failed_messages widget notification = { - 'type': 'toggle_tracking_status', - 'message_ids': [self.mail_message_id.id], - 'needs_actions': False + "type": "toggle_tracking_status", + "message_ids": [self.mail_message_id.id], + "needs_actions": False, } - self.env['bus.bus'].sendone( - (self._cr.dbname, 'res.partner', - self.env.user.partner_id.id), - notification) + self.env["bus.bus"].sendone( + (self._cr.dbname, "res.partner", self.env.user.partner_id.id), + notification, + ) super().resend_mail_action() diff --git a/mail_tracking/models/mail_thread.py b/mail_tracking/models/mail_thread.py index 4a385b4..3f6525f 100644 --- a/mail_tracking/models/mail_thread.py +++ b/mail_tracking/models/mail_thread.py @@ -1,32 +1,36 @@ # Copyright 2019 Alexandre Díaz # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). -from odoo import fields, models, api, _ from email.utils import getaddresses -from odoo.tools import email_split_and_format + from lxml import etree +from odoo import _, api, fields, models +from odoo.tools import email_split_and_format + class MailThread(models.AbstractModel): _inherit = "mail.thread" failed_message_ids = fields.One2many( - 'mail.message', 'res_id', string='Failed Messages', - domain=lambda self: - [('model', '=', self._name)] - + self._get_failed_message_domain()) + "mail.message", + "res_id", + string="Failed Messages", + domain=lambda self: [("model", "=", self._name)] + + self._get_failed_message_domain(), + ) def _get_failed_message_domain(self): """Domain used to display failed messages on the 'failed_messages' widget""" - failed_states = self.env['mail.message'].get_failed_states() + failed_states = self.env["mail.message"].get_failed_states() return [ - ('mail_tracking_needs_action', '=', True), - ('mail_tracking_ids.state', 'in', list(failed_states)), + ("mail_tracking_needs_action", "=", True), + ("mail_tracking_ids.state", "in", list(failed_states)), ] @api.multi - @api.returns('self', lambda value: value.id) + @api.returns("self", lambda value: value.id) def message_post(self, *args, **kwargs): """Adds CC recipient to the message. @@ -34,11 +38,9 @@ class MailThread(models.AbstractModel): this information its written into the mail.message record. """ new_message = super().message_post(*args, **kwargs) - email_cc = kwargs.get('cc') + email_cc = kwargs.get("cc") if email_cc: - new_message.sudo().write({ - 'email_cc': email_cc, - }) + new_message.sudo().write({"email_cc": email_cc}) return new_message @api.multi @@ -48,71 +50,79 @@ class MailThread(models.AbstractModel): If the recipient has a res.partner, use it. """ res = super().message_get_suggested_recipients() - ResPartnerObj = self.env['res.partner'] + ResPartnerObj = self.env["res.partner"] email_cc_formated_list = [] for record in self: - emails_cc = record.message_ids.mapped('email_cc') + emails_cc = record.message_ids.mapped("email_cc") for email in emails_cc: email_cc_formated_list.extend(email_split_and_format(email)) email_cc_formated_list = set(email_cc_formated_list) for cc in email_cc_formated_list: email_parts = getaddresses([cc])[0] - partner_id = record.message_partner_info_from_emails( - [email_parts[1]])[0].get('partner_id') + partner_id = record._message_partner_info_from_emails([email_parts[1]])[ + 0 + ].get("partner_id") if not partner_id: - record._message_add_suggested_recipient( - res, email=cc, reason=_('Cc')) + record._message_add_suggested_recipient(res, email=cc, reason=_("Cc")) else: partner = ResPartnerObj.browse(partner_id, self._prefetch) record._message_add_suggested_recipient( - res, partner=partner, reason=_('Cc')) + res, partner=partner, reason=_("Cc") + ) return res @api.model - def fields_view_get(self, view_id=None, view_type='form', toolbar=False, - submenu=False): + def fields_view_get( + self, view_id=None, view_type="form", toolbar=False, submenu=False + ): """Add filters for failed messages. These filters will show up on any form or search views of any model inheriting from ``mail.thread``. """ res = super().fields_view_get( - view_id=view_id, view_type=view_type, toolbar=toolbar, - submenu=submenu) - if view_type not in {'search', 'form'}: + view_id=view_id, view_type=view_type, toolbar=toolbar, submenu=submenu + ) + if view_type not in {"search", "form"}: return res - doc = etree.XML(res['arch']) - if view_type == 'search': + doc = etree.XML(res["arch"]) + if view_type == "search": # Modify view to add new filter element nodes = doc.xpath("//search") if nodes: # Create filter element new_filter = etree.Element( - 'filter', { - 'string': _('Failed sent messages'), - 'name': "failed_message_ids", - 'domain': str([ - ['failed_message_ids.mail_tracking_ids.state', - 'in', - list( - self.env['mail.message'].get_failed_states() - )], - ['failed_message_ids.mail_tracking_needs_action', - '=', True] - ]) - }) - nodes[0].append(etree.Element('separator')) + "filter", + { + "string": _("Failed sent messages"), + "name": "failed_message_ids", + "domain": str( + [ + [ + "failed_message_ids.mail_tracking_ids.state", + "in", + list(self.env["mail.message"].get_failed_states()), + ], + [ + "failed_message_ids.mail_tracking_needs_action", + "=", + True, + ], + ] + ), + }, + ) + nodes[0].append(etree.Element("separator")) nodes[0].append(new_filter) - elif view_type == 'form': + elif view_type == "form": # Modify view to add new field element - nodes = doc.xpath( - "//field[@name='message_ids' and @widget='mail_thread']") + nodes = doc.xpath("//field[@name='message_ids' and @widget='mail_thread']") if nodes: # Create field - field_failed_messages = etree.Element('field', { - 'name': 'failed_message_ids', - 'widget': 'mail_failed_message', - }) + field_failed_messages = etree.Element( + "field", + {"name": "failed_message_ids", "widget": "mail_failed_message"}, + ) nodes[0].addprevious(field_failed_messages) - res['arch'] = etree.tostring(doc, encoding='unicode') + res["arch"] = etree.tostring(doc, encoding="unicode") return res diff --git a/mail_tracking/models/mail_tracking_email.py b/mail_tracking/models/mail_tracking_email.py index c7e84f0..37eda13 100644 --- a/mail_tracking/models/mail_tracking_email.py +++ b/mail_tracking/models/mail_tracking_email.py @@ -2,9 +2,9 @@ # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). import logging -import urllib.parse -import time import re +import time +import urllib.parse import uuid from datetime import datetime @@ -19,9 +19,9 @@ EVENT_CLICK_DELTA = 5 # seconds class MailTrackingEmail(models.Model): _name = "mail.tracking.email" - _order = 'time desc' - _rec_name = 'display_name' - _description = 'MailTracking email' + _order = "time desc" + _rec_name = "display_name" + _description = "MailTracking email" # This table is going to grow fast and to infinite, so we index: # - name: Search in tree view @@ -30,127 +30,153 @@ class MailTrackingEmail(models.Model): # - state: Search and group_by in tree view name = fields.Char(string="Subject", readonly=True, index=True) display_name = fields.Char( - string="Display name", readonly=True, store=True, - compute="_compute_tracking_display_name") + string="Display name", + readonly=True, + store=True, + compute="_compute_tracking_display_name", + ) timestamp = fields.Float( string='UTC timestamp', readonly=True, digits=dp.get_precision('MailTracking Timestamp')) time = fields.Datetime(string="Time", readonly=True, index=True) date = fields.Date( - string="Date", readonly=True, compute="_compute_date", store=True) + string="Date", readonly=True, compute="_compute_date", store=True + ) mail_message_id = fields.Many2one( - string="Message", comodel_name='mail.message', readonly=True, - index=True) - mail_id = fields.Many2one( - string="Email", comodel_name='mail.mail', readonly=True) + string="Message", comodel_name="mail.message", readonly=True, index=True + ) + mail_id = fields.Many2one(string="Email", comodel_name="mail.mail", readonly=True) partner_id = fields.Many2one( - string="Partner", comodel_name='res.partner', readonly=True) - recipient = fields.Char(string='Recipient email', readonly=True) + string="Partner", comodel_name="res.partner", readonly=True + ) + recipient = fields.Char(string="Recipient email", readonly=True) recipient_address = fields.Char( - string='Recipient email address', readonly=True, store=True, - compute='_compute_recipient_address', index=True) - sender = fields.Char(string='Sender email', readonly=True) - state = fields.Selection([ - ('error', 'Error'), - ('deferred', 'Deferred'), - ('sent', 'Sent'), - ('delivered', 'Delivered'), - ('opened', 'Opened'), - ('rejected', 'Rejected'), - ('spam', 'Spam'), - ('unsub', 'Unsubscribed'), - ('bounced', 'Bounced'), - ('soft-bounced', 'Soft bounced'), - ], string='State', index=True, readonly=True, default=False, + string="Recipient email address", + readonly=True, + store=True, + compute="_compute_recipient_address", + index=True, + ) + sender = fields.Char(string="Sender email", readonly=True) + state = fields.Selection( + [ + ("error", "Error"), + ("deferred", "Deferred"), + ("sent", "Sent"), + ("delivered", "Delivered"), + ("opened", "Opened"), + ("rejected", "Rejected"), + ("spam", "Spam"), + ("unsub", "Unsubscribed"), + ("bounced", "Bounced"), + ("soft-bounced", "Soft bounced"), + ], + string="State", + index=True, + readonly=True, + default=False, help=" * The 'Error' status indicates that there was an error " - "when trying to sent the email, for example, " - "'No valid recipient'\n" - " * The 'Sent' status indicates that message was succesfully " - "sent via outgoing email server (SMTP).\n" - " * The 'Delivered' status indicates that message was " - "succesfully delivered to recipient Mail Exchange (MX) server.\n" - " * The 'Opened' status indicates that message was opened or " - "clicked by recipient.\n" - " * The 'Rejected' status indicates that recipient email " - "address is blacklisted by outgoing email server (SMTP). " - "It is recomended to delete this email address.\n" - " * The 'Spam' status indicates that outgoing email " - "server (SMTP) consider this message as spam.\n" - " * The 'Unsubscribed' status indicates that recipient has " - "requested to be unsubscribed from this message.\n" - " * The 'Bounced' status indicates that message was bounced " - "by recipient Mail Exchange (MX) server.\n" - " * The 'Soft bounced' status indicates that message was soft " - "bounced by recipient Mail Exchange (MX) server.\n") - error_smtp_server = fields.Char(string='Error SMTP server', readonly=True) - error_type = fields.Char(string='Error type', readonly=True) - error_description = fields.Char( - string='Error description', readonly=True) - bounce_type = fields.Char(string='Bounce type', readonly=True) - bounce_description = fields.Char( - string='Bounce description', readonly=True) + "when trying to sent the email, for example, " + "'No valid recipient'\n" + " * The 'Sent' status indicates that message was succesfully " + "sent via outgoing email server (SMTP).\n" + " * The 'Delivered' status indicates that message was " + "succesfully delivered to recipient Mail Exchange (MX) server.\n" + " * The 'Opened' status indicates that message was opened or " + "clicked by recipient.\n" + " * The 'Rejected' status indicates that recipient email " + "address is blacklisted by outgoing email server (SMTP). " + "It is recomended to delete this email address.\n" + " * The 'Spam' status indicates that outgoing email " + "server (SMTP) consider this message as spam.\n" + " * The 'Unsubscribed' status indicates that recipient has " + "requested to be unsubscribed from this message.\n" + " * The 'Bounced' status indicates that message was bounced " + "by recipient Mail Exchange (MX) server.\n" + " * The 'Soft bounced' status indicates that message was soft " + "bounced by recipient Mail Exchange (MX) server.\n", + ) + error_smtp_server = fields.Char(string="Error SMTP server", readonly=True) + error_type = fields.Char(string="Error type", readonly=True) + error_description = fields.Char(string="Error description", readonly=True) + bounce_type = fields.Char(string="Bounce type", readonly=True) + bounce_description = fields.Char(string="Bounce description", readonly=True) tracking_event_ids = fields.One2many( - string="Tracking events", comodel_name='mail.tracking.event', - inverse_name='tracking_email_id', readonly=True) + string="Tracking events", + comodel_name="mail.tracking.event", + inverse_name="tracking_email_id", + readonly=True, + ) # Token isn't generated here to have compatibility with older trackings. # New trackings have token and older not - token = fields.Char(string="Security Token", readonly=True, - default=lambda s: uuid.uuid4().hex, - groups="base.group_system") + token = fields.Char( + string="Security Token", + readonly=True, + default=lambda s: uuid.uuid4().hex, + groups="base.group_system", + ) @api.model_create_multi def create(self, vals_list): records = super().create(vals_list) - failed_states = self.env['mail.message'].get_failed_states() - records \ - .filtered(lambda one: one.state in failed_states) \ - .mapped("mail_message_id") \ - .write({'mail_tracking_needs_action': True}) + failed_states = self.env["mail.message"].get_failed_states() + records.filtered(lambda one: one.state in failed_states).mapped( + "mail_message_id" + ).write({"mail_tracking_needs_action": True}) return records @api.multi def write(self, vals): super().write(vals) - state = vals.get('state') - if state and state in self.env['mail.message'].get_failed_states(): - self.mapped('mail_message_id').write({ - 'mail_tracking_needs_action': True, - }) + state = vals.get("state") + if state and state in self.env["mail.message"].get_failed_states(): + self.mapped("mail_message_id").write({"mail_tracking_needs_action": True}) @api.model def email_is_bounced(self, email): if not email: return False res = self._email_last_tracking_state(email) - return res and res[0].get('state', '') in {'rejected', 'error', - 'spam', 'bounced'} + return res and res[0].get("state", "") in { + "rejected", + "error", + "spam", + "bounced", + } @api.model def _email_last_tracking_state(self, email): - return self.search_read([('recipient_address', '=', email.lower())], - ['state'], limit=1, order='time DESC') + return self.search_read( + [("recipient_address", "=", email.lower())], + ["state"], + limit=1, + order="time DESC", + ) @api.model def email_score_from_email(self, email): if not email: - return 0. - data = self.read_group([('recipient_address', '=', email.lower())], - ['recipient_address', 'state'], ['state']) - mapped_data = {state['state']: state['state_count'] for state in data} + return 0.0 + data = self.read_group( + [("recipient_address", "=", email.lower())], + ["recipient_address", "state"], + ["state"], + ) + mapped_data = {state["state"]: state["state_count"] for state in data} return self.with_context(mt_states=mapped_data).email_score() @api.model def _email_score_weights(self): """Default email score weights. Ready to be inherited""" return { - 'error': -50.0, - 'rejected': -25.0, - 'spam': -25.0, - 'bounced': -25.0, - 'soft-bounced': -10.0, - 'unsub': -10.0, - 'delivered': 1.0, - 'opened': 5.0, + "error": -50.0, + "rejected": -25.0, + "spam": -25.0, + "bounced": -25.0, + "soft-bounced": -10.0, + "unsub": -10.0, + "delivered": 1.0, + "opened": 5.0, } def email_score(self): @@ -163,7 +189,7 @@ class MailTrackingEmail(models.Model): """ weights = self._email_score_weights() score = 50.0 - states = self.env.context.get('mt_states', False) + states = self.env.context.get("mt_states", False) if states: for state in states.keys(): score += weights.get(state, 0.0) * states[state] @@ -176,13 +202,12 @@ class MailTrackingEmail(models.Model): score = 0.0 return score - @api.depends('recipient') + @api.depends("recipient") def _compute_recipient_address(self): for email in self: - is_empty_recipient = (not email.recipient - or '' in email.recipient) + is_empty_recipient = not email.recipient or "" in email.recipient if not is_empty_recipient: - matches = re.search(r'<(.*@.*)>', email.recipient) + matches = re.search(r"<(.*@.*)>", email.recipient) if matches: email.recipient_address = matches.group(1).lower() else: @@ -190,46 +215,45 @@ class MailTrackingEmail(models.Model): else: email.recipient_address = False - @api.depends('name', 'recipient') + @api.depends("name", "recipient") def _compute_tracking_display_name(self): for email in self: - parts = [email.name or ''] + parts = [email.name or ""] if email.recipient: parts.append(email.recipient) - email.display_name = ' - '.join(parts) + email.display_name = " - ".join(parts) - @api.depends('time') + @api.depends("time") def _compute_date(self): for email in self: - email.date = fields.Date.to_string( - fields.Date.from_string(email.time)) + email.date = fields.Date.to_string(fields.Date.from_string(email.time)) def _get_mail_tracking_img(self): - m_config = self.env['ir.config_parameter'] - base_url = (m_config.get_param('mail_tracking.base.url') - or m_config.get_param('web.base.url')) + m_config = self.env["ir.config_parameter"] + base_url = m_config.get_param("mail_tracking.base.url") or m_config.get_param( + "web.base.url" + ) if self.token: path_url = ( - 'mail/tracking/open/%(db)s/%(tracking_email_id)s/%(token)s/' - 'blank.gif' % { - 'db': self.env.cr.dbname, - 'tracking_email_id': self.id, - 'token': self.token, - }) + "mail/tracking/open/%(db)s/%(tracking_email_id)s/%(token)s/" + "blank.gif" + % { + "db": self.env.cr.dbname, + "tracking_email_id": self.id, + "token": self.token, + } + ) else: # This is here for compatibility with older records - path_url = ( - 'mail/tracking/open/%(db)s/%(tracking_email_id)s/blank.gif' % { - 'db': self.env.cr.dbname, - 'tracking_email_id': self.id, - }) + path_url = "mail/tracking/open/{db}/{tracking_email_id}/blank.gif".format( + db=self.env.cr.dbname, tracking_email_id=self.id + ) track_url = urllib.parse.urljoin(base_url, path_url) return ( '' % { - 'url': track_url, - 'tracking_email_id': self.id, - }) + 'data-odoo-tracking-email="%(tracking_email_id)s"/>' + % {"url": track_url, "tracking_email_id": self.id} + ) @api.multi def _partners_email_bounced_set(self, reason, event=None): @@ -237,32 +261,35 @@ class MailTrackingEmail(models.Model): if event and event.recipient_address: recipients.append(event.recipient_address) else: - recipients = [x for x in self.mapped('recipient_address') if x] + recipients = [x for x in self.mapped("recipient_address") if x] for recipient in recipients: - self.env['res.partner'].search([ - ('email', '=ilike', recipient) - ]).email_bounced_set(self, reason, event=event) + self.env["res.partner"].search( + [("email", "=ilike", recipient)] + ).email_bounced_set(self, reason, event=event) @api.multi def smtp_error(self, mail_server, smtp_server, exception): - values = { - 'state': 'error', - } - IrMailServer = self.env['ir.mail_server'] - if str(exception) == IrMailServer.NO_VALID_RECIPIENT \ - and not self.recipient_address: - values.update({ - 'error_type': 'no_recipient', - 'error_description': - "The partner doesn't have a defined email", - }) + values = {"state": "error"} + IrMailServer = self.env["ir.mail_server"] + if ( + str(exception) == IrMailServer.NO_VALID_RECIPIENT + and not self.recipient_address + ): + values.update( + { + "error_type": "no_recipient", + "error_description": "The partner doesn't have a defined email", + } + ) else: - values.update({ - 'error_smtp_server': tools.ustr(smtp_server), - 'error_type': exception.__class__.__name__, - 'error_description': tools.ustr(exception), - }) - self.sudo()._partners_email_bounced_set('error') + values.update( + { + "error_smtp_server": tools.ustr(smtp_server), + "error_type": exception.__class__.__name__, + "error_description": tools.ustr(exception), + } + ) + self.sudo()._partners_email_bounced_set("error") self.sudo().write(values) @api.multi @@ -270,14 +297,14 @@ class MailTrackingEmail(models.Model): self.ensure_one() tracking_url = self._get_mail_tracking_img() if tracking_url: - content = email.get('body', '') + content = email.get("body", "") content = re.sub( - r']*data-odoo-tracking-email=["\'][0-9]*["\'][^>]*>', - '', content) + r']*data-odoo-tracking-email=["\'][0-9]*["\'][^>]*>', "", content + ) body = tools.append_content_to_html( - content, tracking_url, plaintext=False, - container_tag='div') - email['body'] = body + content, tracking_url, plaintext=False, container_tag="div" + ) + email["body"] = body return email def _message_partners_check(self, message, message_id): @@ -294,70 +321,67 @@ class MailTrackingEmail(models.Model): 'needaction_partner_ids': [(4, self.partner_id.id)], }) else: - mail_message.sudo().write({ - 'partner_ids': [(4, self.partner_id.id)], - }) + mail_message.sudo().write({"partner_ids": [(4, self.partner_id.id)]}) return True @api.multi - def _tracking_sent_prepare(self, mail_server, smtp_server, message, - message_id): + def _tracking_sent_prepare(self, mail_server, smtp_server, message, message_id): self.ensure_one() ts = time.time() dt = datetime.utcfromtimestamp(ts) self._message_partners_check(message, message_id) - self.sudo().write({'state': 'sent'}) + self.sudo().write({"state": "sent"}) return { - 'recipient': message['To'], - 'timestamp': '%.6f' % ts, - 'time': fields.Datetime.to_string(dt), - 'tracking_email_id': self.id, - 'event_type': 'sent', - 'smtp_server': smtp_server, + "recipient": message["To"], + "timestamp": "%.6f" % ts, + "time": fields.Datetime.to_string(dt), + "tracking_email_id": self.id, + "event_type": "sent", + "smtp_server": smtp_server, } def _event_prepare(self, event_type, metadata): self.ensure_one() - m_event = self.env['mail.tracking.event'] - method = getattr(m_event, 'process_' + event_type, None) - if method and hasattr(method, '__call__'): + m_event = self.env["mail.tracking.event"] + method = getattr(m_event, "process_" + event_type, None) + if method and callable(method): return method(self, metadata) else: # pragma: no cover - _logger.info('Unknown event type: %s' % event_type) + _logger.info("Unknown event type: %s" % event_type) return False def _concurrent_events(self, event_type, metadata): - m_event = self.env['mail.tracking.event'] + m_event = self.env["mail.tracking.event"] self.ensure_one() concurrent_event_ids = False - if event_type in {'open', 'click'}: - ts = metadata.get('timestamp', time.time()) - delta = EVENT_OPEN_DELTA if event_type == 'open' \ - else EVENT_CLICK_DELTA + if event_type in {"open", "click"}: + ts = metadata.get("timestamp", time.time()) + delta = EVENT_OPEN_DELTA if event_type == "open" else EVENT_CLICK_DELTA domain = [ - ('timestamp', '>=', ts - delta), - ('timestamp', '<=', ts + delta), - ('tracking_email_id', '=', self.id), - ('event_type', '=', event_type), + ("timestamp", ">=", ts - delta), + ("timestamp", "<=", ts + delta), + ("tracking_email_id", "=", self.id), + ("event_type", "=", event_type), ] - if event_type == 'click': - domain.append(('url', '=', metadata.get('url', False))) + if event_type == "click": + domain.append(("url", "=", metadata.get("url", False))) concurrent_event_ids = m_event.search(domain) return concurrent_event_ids @api.multi def event_create(self, event_type, metadata): - event_ids = self.env['mail.tracking.event'] + event_ids = self.env["mail.tracking.event"] for tracking_email in self: other_ids = tracking_email._concurrent_events(event_type, metadata) if not other_ids: vals = tracking_email._event_prepare(event_type, metadata) if vals: events = event_ids.sudo().create(vals) - if event_type in {'hard_bounce', 'spam', 'reject'}: + if event_type in {"hard_bounce", "spam", "reject"}: for event in events: self.sudo()._partners_email_bounced_set( - event_type, event=event) + event_type, event=event + ) event_ids += events else: _logger.debug("Concurrent event '%s' discarded", event_type) @@ -369,4 +393,4 @@ class MailTrackingEmail(models.Model): # - return 'OK' if processed # - return 'NONE' if this request is not for you # - return 'ERROR' if any error - return 'NONE' # pragma: no cover + return "NONE" # pragma: no cover diff --git a/mail_tracking/models/mail_tracking_event.py b/mail_tracking/models/mail_tracking_event.py index e3deab8..b7f91df 100644 --- a/mail_tracking/models/mail_tracking_event.py +++ b/mail_tracking/models/mail_tracking_event.py @@ -11,55 +11,69 @@ import odoo.addons.decimal_precision as dp class MailTrackingEvent(models.Model): _name = "mail.tracking.event" - _order = 'timestamp desc' - _rec_name = 'event_type' - _description = 'MailTracking event' + _order = "timestamp desc" + _rec_name = "event_type" + _description = "MailTracking event" recipient = fields.Char(string="Recipient", readonly=True) recipient_address = fields.Char( - string='Recipient email address', readonly=True, store=True, - compute='_compute_recipient_address', index=True) + string="Recipient email address", + readonly=True, + store=True, + compute="_compute_recipient_address", + index=True, + ) timestamp = fields.Float( string='UTC timestamp', readonly=True, digits=dp.get_precision('MailTracking Timestamp')) time = fields.Datetime(string="Time", readonly=True) date = fields.Date( - string="Date", readonly=True, compute="_compute_date", store=True) + string="Date", readonly=True, compute="_compute_date", store=True + ) tracking_email_id = fields.Many2one( - string='Message', readonly=True, required=True, ondelete='cascade', - comodel_name='mail.tracking.email', index=True) - event_type = fields.Selection(string='Event type', selection=[ - ('sent', 'Sent'), - ('delivered', 'Delivered'), - ('deferral', 'Deferral'), - ('hard_bounce', 'Hard bounce'), - ('soft_bounce', 'Soft bounce'), - ('open', 'Open'), - ('click', 'Clicked'), - ('spam', 'Spam'), - ('unsub', 'Unsubscribed'), - ('reject', 'Rejected'), - ], readonly=True) - smtp_server = fields.Char(string='SMTP server', readonly=True) - url = fields.Char(string='Clicked URL', readonly=True) - ip = fields.Char(string='User IP', readonly=True) - user_agent = fields.Char(string='User agent', readonly=True) - mobile = fields.Boolean(string='Is mobile?', readonly=True) - os_family = fields.Char(string='Operating system family', readonly=True) - ua_family = fields.Char(string='User agent family', readonly=True) - ua_type = fields.Char(string='User agent type', readonly=True) - user_country_id = fields.Many2one(string='User country', readonly=True, - comodel_name='res.country') - error_type = fields.Char(string='Error type', readonly=True) - error_description = fields.Char(string='Error description', readonly=True) - error_details = fields.Text(string='Error details', readonly=True) + string="Message", + readonly=True, + required=True, + ondelete="cascade", + comodel_name="mail.tracking.email", + index=True, + ) + event_type = fields.Selection( + string="Event type", + selection=[ + ("sent", "Sent"), + ("delivered", "Delivered"), + ("deferral", "Deferral"), + ("hard_bounce", "Hard bounce"), + ("soft_bounce", "Soft bounce"), + ("open", "Open"), + ("click", "Clicked"), + ("spam", "Spam"), + ("unsub", "Unsubscribed"), + ("reject", "Rejected"), + ], + readonly=True, + ) + smtp_server = fields.Char(string="SMTP server", readonly=True) + url = fields.Char(string="Clicked URL", readonly=True) + ip = fields.Char(string="User IP", readonly=True) + user_agent = fields.Char(string="User agent", readonly=True) + mobile = fields.Boolean(string="Is mobile?", readonly=True) + os_family = fields.Char(string="Operating system family", readonly=True) + ua_family = fields.Char(string="User agent family", readonly=True) + ua_type = fields.Char(string="User agent type", readonly=True) + user_country_id = fields.Many2one( + string="User country", readonly=True, comodel_name="res.country" + ) + error_type = fields.Char(string="Error type", readonly=True) + error_description = fields.Char(string="Error description", readonly=True) + error_details = fields.Text(string="Error details", readonly=True) - @api.multi - @api.depends('recipient') + @api.depends("recipient") def _compute_recipient_address(self): for email in self: if email.recipient: - matches = re.search(r'<(.*@.*)>', email.recipient) + matches = re.search(r"<(.*@.*)>", email.recipient) if matches: email.recipient_address = matches.group(1).lower() else: @@ -68,85 +82,82 @@ class MailTrackingEvent(models.Model): email.recipient_address = False @api.multi - @api.depends('time') + @api.depends("time") def _compute_date(self): for email in self: - email.date = fields.Date.to_string( - fields.Date.from_string(email.time)) + email.date = fields.Date.to_string(fields.Date.from_string(email.time)) def _process_data(self, tracking_email, metadata, event_type, state): ts = time.time() dt = datetime.utcfromtimestamp(ts) return { - 'recipient': metadata.get('recipient', tracking_email.recipient), - 'timestamp': metadata.get('timestamp', ts), - 'time': metadata.get('time', fields.Datetime.to_string(dt)), - 'date': metadata.get('date', fields.Date.to_string(dt)), - 'tracking_email_id': tracking_email.id, - 'event_type': event_type, - 'ip': metadata.get('ip', False), - 'url': metadata.get('url', False), - 'user_agent': metadata.get('user_agent', False), - 'mobile': metadata.get('mobile', False), - 'os_family': metadata.get('os_family', False), - 'ua_family': metadata.get('ua_family', False), - 'ua_type': metadata.get('ua_type', False), - 'user_country_id': metadata.get('user_country_id', False), - 'error_type': metadata.get('error_type', False), - 'error_description': metadata.get('error_description', False), - 'error_details': metadata.get('error_details', False), + "recipient": metadata.get("recipient", tracking_email.recipient), + "timestamp": metadata.get("timestamp", ts), + "time": metadata.get("time", fields.Datetime.to_string(dt)), + "date": metadata.get("date", fields.Date.to_string(dt)), + "tracking_email_id": tracking_email.id, + "event_type": event_type, + "ip": metadata.get("ip", False), + "url": metadata.get("url", False), + "user_agent": metadata.get("user_agent", False), + "mobile": metadata.get("mobile", False), + "os_family": metadata.get("os_family", False), + "ua_family": metadata.get("ua_family", False), + "ua_type": metadata.get("ua_type", False), + "user_country_id": metadata.get("user_country_id", False), + "error_type": metadata.get("error_type", False), + "error_description": metadata.get("error_description", False), + "error_details": metadata.get("error_details", False), } def _process_status(self, tracking_email, metadata, event_type, state): - tracking_email.sudo().write({'state': state}) + tracking_email.sudo().write({"state": state}) return self._process_data(tracking_email, metadata, event_type, state) def _process_bounce(self, tracking_email, metadata, event_type, state): - tracking_email.sudo().write({ - 'state': state, - 'bounce_type': metadata.get('bounce_type', False), - 'bounce_description': metadata.get('bounce_description', False), - }) + tracking_email.sudo().write( + { + "state": state, + "bounce_type": metadata.get("bounce_type", False), + "bounce_description": metadata.get("bounce_description", False), + } + ) return self._process_data(tracking_email, metadata, event_type, state) @api.model def process_delivered(self, tracking_email, metadata): - return self._process_status( - tracking_email, metadata, 'delivered', 'delivered') + return self._process_status(tracking_email, metadata, "delivered", "delivered") @api.model def process_deferral(self, tracking_email, metadata): - return self._process_status( - tracking_email, metadata, 'deferral', 'deferred') + return self._process_status(tracking_email, metadata, "deferral", "deferred") @api.model def process_hard_bounce(self, tracking_email, metadata): - return self._process_bounce( - tracking_email, metadata, 'hard_bounce', 'bounced') + return self._process_bounce(tracking_email, metadata, "hard_bounce", "bounced") @api.model def process_soft_bounce(self, tracking_email, metadata): return self._process_bounce( - tracking_email, metadata, 'soft_bounce', 'soft-bounced') + tracking_email, metadata, "soft_bounce", "soft-bounced" + ) @api.model def process_open(self, tracking_email, metadata): - return self._process_status(tracking_email, metadata, 'open', 'opened') + return self._process_status(tracking_email, metadata, "open", "opened") @api.model def process_click(self, tracking_email, metadata): - return self._process_status( - tracking_email, metadata, 'click', 'opened') + return self._process_status(tracking_email, metadata, "click", "opened") @api.model def process_spam(self, tracking_email, metadata): - return self._process_status(tracking_email, metadata, 'spam', 'spam') + return self._process_status(tracking_email, metadata, "spam", "spam") @api.model def process_unsub(self, tracking_email, metadata): - return self._process_status(tracking_email, metadata, 'unsub', 'unsub') + return self._process_status(tracking_email, metadata, "unsub", "unsub") @api.model def process_reject(self, tracking_email, metadata): - return self._process_status( - tracking_email, metadata, 'reject', 'rejected') + return self._process_status(tracking_email, metadata, "reject", "rejected") diff --git a/mail_tracking/models/res_partner.py b/mail_tracking/models/res_partner.py index 7dbee8f..9a72af8 100644 --- a/mail_tracking/models/res_partner.py +++ b/mail_tracking/models/res_partner.py @@ -1,20 +1,21 @@ # Copyright 2016 Antonio Espinosa - # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). -from odoo import models, api, fields +from odoo import api, fields, models class ResPartner(models.Model): - _name = 'res.partner' - _inherit = ['res.partner', 'mail.bounced.mixin'] + _name = "res.partner" + _inherit = ["res.partner", "mail.bounced.mixin"] # tracking_emails_count and email_score are non-store fields in order # to improve performance tracking_emails_count = fields.Integer( - compute='_compute_tracking_emails_count', readonly=True) - email_score = fields.Float(compute='_compute_email_score', readonly=True) + compute="_compute_tracking_emails_count", readonly=True + ) + email_score = fields.Float(compute="_compute_email_score", readonly=True) - @api.depends('email') + @api.depends("email") def _compute_email_score(self): for partner in self.filtered('email'): partner.email_score = self.env['mail.tracking.email'].\ @@ -26,7 +27,7 @@ class ResPartner(models.Model): for partner in self: count = 0 if partner.email: - count = self.env['mail.tracking.email'].search_count([ - ('recipient_address', '=', partner.email.lower()) - ]) + count = self.env["mail.tracking.email"].search_count( + [("recipient_address", "=", partner.email.lower())] + ) partner.tracking_emails_count = count diff --git a/mail_tracking/tests/test_mail_tracking.py b/mail_tracking/tests/test_mail_tracking.py index 733e1bb..208f4cd 100644 --- a/mail_tracking/tests/test_mail_tracking.py +++ b/mail_tracking/tests/test_mail_tracking.py @@ -1,77 +1,78 @@ # Copyright 2016 Antonio Espinosa - # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). -import mock -from odoo.tools import mute_logger -import time import base64 +import time + +import mock import psycopg2 import psycopg2.errorcodes -from odoo import http -from odoo.tests.common import TransactionCase -from ..controllers.main import MailTrackingController, BLANK from lxml import etree -mock_send_email = ('odoo.addons.base.models.ir_mail_server.' - 'IrMailServer.send_email') +from odoo import _, http +from odoo.tests.common import TransactionCase +from odoo.tools import mute_logger + +from ..controllers.main import BLANK, MailTrackingController + +mock_send_email = "odoo.addons.base.models.ir_mail_server." "IrMailServer.send_email" class FakeUserAgent(object): - browser = 'Test browser' - platform = 'Test platform' + browser = "Test browser" + platform = "Test platform" def __str__(self): """Return name""" - return 'Test suite' + return "Test suite" class TestMailTracking(TransactionCase): def setUp(self, *args, **kwargs): super(TestMailTracking, self).setUp(*args, **kwargs) - self.sender = self.env['res.partner'].create({ - 'name': 'Test sender', - 'email': 'sender@example.com', - }) - self.recipient = self.env['res.partner'].create({ - 'name': 'Test recipient', - 'email': 'recipient@example.com', - }) + self.sender = self.env["res.partner"].create( + {"name": "Test sender", "email": "sender@example.com"} + ) + self.recipient = self.env["res.partner"].create( + {"name": "Test recipient", "email": "recipient@example.com"} + ) self.last_request = http.request - http.request = type('obj', (object,), { - 'env': self.env, - 'cr': self.env.cr, - 'db': self.env.cr.dbname, - 'endpoint': type('obj', (object,), { - 'routing': [], - }), - 'httprequest': type('obj', (object,), { - 'remote_addr': '123.123.123.123', - 'user_agent': FakeUserAgent(), - }), - }) + http.request = type( + "obj", + (object,), + { + "env": self.env, + "cr": self.env.cr, + "db": self.env.cr.dbname, + "endpoint": type("obj", (object,), {"routing": []}), + "httprequest": type( + "obj", + (object,), + {"remote_addr": "123.123.123.123", "user_agent": FakeUserAgent()}, + ), + }, + ) def tearDown(self, *args, **kwargs): http.request = self.last_request return super(TestMailTracking, self).tearDown(*args, **kwargs) def test_empty_email(self): - self.recipient.write({'email_bounced': True}) - self.recipient.write({'email': False}) + self.recipient.write({"email_bounced": True}) + self.recipient.write({"email": False}) self.assertEqual(False, self.recipient.email) self.assertEqual(False, self.recipient.email_bounced) - self.recipient.write({'email_bounced': True}) - self.recipient.write({'email': ''}) + self.recipient.write({"email_bounced": True}) + self.recipient.write({"email": ""}) self.assertEqual(False, self.recipient.email_bounced) + self.assertEqual(False, self.env["mail.tracking.email"].email_is_bounced(False)) self.assertEqual( - False, - self.env['mail.tracking.email'].email_is_bounced(False)) - self.assertEqual( - 0., - self.env['mail.tracking.email'].email_score_from_email(False)) + 0.0, self.env["mail.tracking.email"].email_score_from_email(False) + ) def test_recipient_address_compute(self): mail, tracking = self.mail_send(self.recipient.email) - tracking.write({'recipient': False}) + tracking.write({"recipient": False}) self.assertEqual(False, tracking.recipient_address) def test_message_post(self): @@ -88,154 +89,164 @@ class TestMailTracking(TransactionCase): }) message._notify(message, {}, force_send=True) # Search tracking created - tracking_email = self.env['mail.tracking.email'].search([ - ('mail_message_id', '=', message.id), - ('partner_id', '=', self.recipient.id), - ]) + tracking_email = self.env["mail.tracking.email"].search( + [ + ("mail_message_id", "=", message.id), + ("partner_id", "=", self.recipient.id), + ] + ) # The tracking email must be sent self.assertTrue(tracking_email) - self.assertEqual(tracking_email.state, 'sent') + self.assertEqual(tracking_email.state, "sent") # message_dict read by web interface message_dict = message.message_format()[0] - self.assertTrue(len(message_dict['partner_ids']) > 0) + self.assertTrue(len(message_dict["partner_ids"]) > 0) # First partner is recipient - partner_id = message_dict['partner_ids'][0][0] + partner_id = message_dict["partner_ids"][0][0] self.assertEqual(partner_id, self.recipient.id) - status = message_dict['partner_trackings'][0] + status = message_dict["partner_trackings"][0] # Tracking status must be sent and # mail tracking must be the one search before - self.assertEqual(status['status'], 'sent') - self.assertEqual(status['tracking_id'], tracking_email.id) - self.assertEqual(status['recipient'], self.recipient.display_name) - self.assertEqual(status['partner_id'], self.recipient.id) - self.assertEqual(status['isCc'], False) + self.assertEqual(status["status"], "sent") + self.assertEqual(status["tracking_id"], tracking_email.id) + self.assertEqual(status["recipient"], self.recipient.display_name) + self.assertEqual(status["partner_id"], self.recipient.id) + self.assertEqual(status["isCc"], False) # And now open the email metadata = { - 'ip': '127.0.0.1', - 'user_agent': 'Odoo Test/1.0', - 'os_family': 'linux', - 'ua_family': 'odoo', + "ip": "127.0.0.1", + "user_agent": "Odoo Test/1.0", + "os_family": "linux", + "ua_family": "odoo", } - tracking_email.event_create('open', metadata) - self.assertEqual(tracking_email.state, 'opened') + tracking_email.event_create("open", metadata) + self.assertEqual(tracking_email.state, "opened") def test_message_post_partner_no_email(self): # Create message with recipient without defined email - self.recipient.write({'email': False}) - message = self.env['mail.message'].create({ - 'subject': 'Message test', - 'author_id': self.sender.id, - 'email_from': self.sender.email, - 'message_type': 'comment', - 'model': 'res.partner', - 'res_id': self.recipient.id, - 'partner_ids': [(4, self.recipient.id)], - 'body': '

This is a test message

', - }) + self.recipient.write({"email": False}) + message = self.env["mail.message"].create( + { + "subject": "Message test", + "author_id": self.sender.id, + "email_from": self.sender.email, + "message_type": "comment", + "model": "res.partner", + "res_id": self.recipient.id, + "partner_ids": [(4, self.recipient.id)], + "body": "

This is a test message

", + } + ) message._notify(message, {}, force_send=True) # Search tracking created - tracking_email = self.env['mail.tracking.email'].search([ - ('mail_message_id', '=', message.id), - ('partner_id', '=', self.recipient.id), - ]) + tracking_email = self.env["mail.tracking.email"].search( + [ + ("mail_message_id", "=", message.id), + ("partner_id", "=", self.recipient.id), + ] + ) # No email should generate a error state: no_recipient - self.assertEqual(tracking_email.state, 'error') - self.assertEqual(tracking_email.error_type, 'no_recipient') + self.assertEqual(tracking_email.state, "error") + self.assertEqual(tracking_email.error_type, "no_recipient") self.assertFalse(self.recipient.email_bounced) def _check_partner_trackings(self, message): message_dict = message.message_format()[0] - self.assertEqual(len(message_dict['partner_trackings']), 3) + self.assertEqual(len(message_dict["partner_trackings"]), 3) # mail cc foundPartner = False foundNoPartner = False - for tracking in message_dict['partner_trackings']: - if tracking['partner_id'] == self.sender.id: + for tracking in message_dict["partner_trackings"]: + if tracking["partner_id"] == self.sender.id: foundPartner = True - self.assertTrue(tracking['isCc']) - elif tracking['recipient'] == 'unnamed@test.com': + self.assertTrue(tracking["isCc"]) + elif tracking["recipient"] == "unnamed@test.com": foundNoPartner = True - self.assertFalse(tracking['partner_id']) - self.assertTrue(tracking['isCc']) - elif tracking['partner_id'] == self.recipient.id: - self.assertFalse(tracking['isCc']) + self.assertFalse(tracking["partner_id"]) + self.assertTrue(tracking["isCc"]) + elif tracking["partner_id"] == self.recipient.id: + self.assertFalse(tracking["isCc"]) self.assertTrue(foundPartner) self.assertTrue(foundNoPartner) def test_email_cc(self): - sender_user = self.env['res.users'].create({ - 'name': 'Sender User Test', - 'partner_id': self.sender.id, - 'login': 'sender-test', - }) - message = self.recipient.sudo(user=sender_user).message_post( - body='

This is a test message

', - cc='unnamed@test.com, sender@example.com' + sender_user = self.env["res.users"].create( + { + "name": "Sender User Test", + "partner_id": self.sender.id, + "login": "sender-test", + } + ) + message = self.recipient.with_user(sender_user).message_post( + body=_("

This is a test message

"), + cc="unnamed@test.com, sender@example.com", ) # suggested recipients - recipients = self.recipient.message_get_suggested_recipients() - suggested_mails = { - email[1] for email in recipients[self.recipient.id] - } - self.assertTrue('unnamed@test.com' in suggested_mails) + recipients = self.recipient._message_get_suggested_recipients() + suggested_mails = {email[1] for email in recipients[self.recipient.id]} + self.assertTrue("unnamed@test.com" in suggested_mails) self.assertEqual(len(recipients[self.recipient.id][0]), 3) # Repeated Cc recipients - message = self.env['mail.message'].create({ - 'subject': 'Message test', - 'author_id': self.sender.id, - 'email_from': self.sender.email, - 'message_type': 'comment', - 'model': 'res.partner', - 'res_id': self.recipient.id, - 'partner_ids': [(4, self.recipient.id)], - 'email_cc': 'unnamed@test.com, sender@example.com' - ', recipient@example.com', - 'body': '

This is another test message

', - }) + message = self.env["mail.message"].create( + { + "subject": "Message test", + "author_id": self.sender.id, + "email_from": self.sender.email, + "message_type": "comment", + "model": "res.partner", + "res_id": self.recipient.id, + "partner_ids": [(4, self.recipient.id)], + "email_cc": "unnamed@test.com, sender@example.com" + ", recipient@example.com", + "body": "

This is another test message

", + } + ) message._notify(message, {}, force_send=True) - recipients = self.recipient.message_get_suggested_recipients() + recipients = self.recipient._message_get_suggested_recipients() self.assertEqual(len(recipients[self.recipient.id][0]), 3) self._check_partner_trackings(message) def test_failed_message(self): - MailMessageObj = self.env['mail.message'] + MailMessageObj = self.env["mail.message"] # Create message mail, tracking = self.mail_send(self.recipient.email) self.assertFalse(tracking.mail_message_id.mail_tracking_needs_action) # Force error state - tracking.state = 'error' + tracking.state = "error" self.assertTrue(tracking.mail_message_id.mail_tracking_needs_action) failed_count = MailMessageObj.get_failed_count() self.assertTrue(failed_count > 0) values = tracking.mail_message_id.get_failed_messages() - self.assertEqual(values[0]['id'], tracking.mail_message_id.id) + self.assertEqual(values[0]["id"], tracking.mail_message_id.id) messages = MailMessageObj.search([]) messages_failed = MailMessageObj.search( - MailMessageObj._get_failed_message_domain()) + MailMessageObj._get_failed_message_domain() + ) self.assertTrue(messages) self.assertTrue(messages_failed) self.assertTrue(len(messages) > len(messages_failed)) tracking.mail_message_id.set_need_action_done() self.assertFalse(tracking.mail_message_id.mail_tracking_needs_action) - self.assertTrue( - MailMessageObj.get_failed_count() < failed_count) + self.assertTrue(MailMessageObj.get_failed_count() < failed_count) # No author_id tracking.mail_message_id.author_id = False values = tracking.mail_message_id.get_failed_messages()[0] - self.assertEqual(values['author'][0], -1) + self.assertEqual(values["author"][0], -1) def mail_send(self, recipient): - mail = self.env['mail.mail'].create({ - 'subject': 'Test subject', - 'email_from': 'from@domain.com', - 'email_to': recipient, - 'body_html': '

This is a test message

', - }) + mail = self.env["mail.mail"].create( + { + "subject": "Test subject", + "email_from": "from@domain.com", + "email_to": recipient, + "body_html": "

This is a test message

", + } + ) mail.send() # Search tracking created - tracking_email = self.env['mail.tracking.email'].search([ - ('mail_id', '=', mail.id), - ]) + tracking_email = self.env["mail.tracking.email"].search( + [("mail_id", "=", mail.id)] + ) return mail, tracking_email def test_mail_send(self): @@ -245,10 +256,9 @@ class TestMailTracking(TransactionCase): mail, tracking = self.mail_send(self.recipient.email) self.assertEqual(mail.email_to, tracking.recipient) self.assertEqual(mail.email_from, tracking.sender) - with mock.patch('odoo.http.db_filter') as mock_client: + with mock.patch("odoo.http.db_filter") as mock_client: mock_client.return_value = True - res = controller.mail_tracking_open( - db, tracking.id, tracking.token) + res = controller.mail_tracking_open(db, tracking.id, tracking.token) self.assertEqual(image, res.response[0]) # Two events: sent and open self.assertEqual(2, len(tracking.tracking_event_ids)) @@ -261,30 +271,30 @@ class TestMailTracking(TransactionCase): def test_mail_tracking_open(self): controller = MailTrackingController() db = self.env.cr.dbname - with mock.patch('odoo.http.db_filter') as mock_client: + with mock.patch("odoo.http.db_filter") as mock_client: mock_client.return_value = True mail, tracking = self.mail_send(self.recipient.email) # Tracking is in sent or delivered state. But no token give. # Don't generates tracking event controller.mail_tracking_open(db, tracking.id) self.assertEqual(1, len(tracking.tracking_event_ids)) - tracking.write({'state': 'opened'}) + tracking.write({"state": "opened"}) # Tracking isn't in sent or delivered state. # Don't generates tracking event controller.mail_tracking_open(db, tracking.id, tracking.token) self.assertEqual(1, len(tracking.tracking_event_ids)) - tracking.write({'state': 'sent'}) + tracking.write({"state": "sent"}) # Tracking is in sent or delivered state and a token is given. # Generates tracking event controller.mail_tracking_open(db, tracking.id, tracking.token) self.assertEqual(2, len(tracking.tracking_event_ids)) # Generate new email due concurrent event filter mail, tracking = self.mail_send(self.recipient.email) - tracking.write({'token': False}) + tracking.write({"token": False}) # Tracking is in sent or delivered state but a token is given for a # record that doesn't have a token. # Don't generates tracking event - controller.mail_tracking_open(db, tracking.id, 'tokentest') + controller.mail_tracking_open(db, tracking.id, "tokentest") self.assertEqual(1, len(tracking.tracking_event_ids)) # Tracking is in sent or delivered state and not token is given for # a record that doesn't have a token. @@ -296,90 +306,76 @@ class TestMailTracking(TransactionCase): mail, tracking = self.mail_send(self.recipient.email) ts = time.time() metadata = { - 'ip': '127.0.0.1', - 'user_agent': 'Odoo Test/1.0', - 'os_family': 'linux', - 'ua_family': 'odoo', - 'timestamp': ts, + "ip": "127.0.0.1", + "user_agent": "Odoo Test/1.0", + "os_family": "linux", + "ua_family": "odoo", + "timestamp": ts, } # First open event - tracking.event_create('open', metadata) - opens = tracking.tracking_event_ids.filtered( - lambda r: r.event_type == 'open' - ) + tracking.event_create("open", metadata) + opens = tracking.tracking_event_ids.filtered(lambda r: r.event_type == "open") self.assertEqual(len(opens), 1) # Concurrent open event - metadata['timestamp'] = ts + 2 - tracking.event_create('open', metadata) - opens = tracking.tracking_event_ids.filtered( - lambda r: r.event_type == 'open' - ) + metadata["timestamp"] = ts + 2 + tracking.event_create("open", metadata) + opens = tracking.tracking_event_ids.filtered(lambda r: r.event_type == "open") self.assertEqual(len(opens), 1) # Second open event - metadata['timestamp'] = ts + 350 - tracking.event_create('open', metadata) - opens = tracking.tracking_event_ids.filtered( - lambda r: r.event_type == 'open' - ) + metadata["timestamp"] = ts + 350 + tracking.event_create("open", metadata) + opens = tracking.tracking_event_ids.filtered(lambda r: r.event_type == "open") self.assertEqual(len(opens), 2) def test_concurrent_click(self): mail, tracking = self.mail_send(self.recipient.email) ts = time.time() metadata = { - 'ip': '127.0.0.1', - 'user_agent': 'Odoo Test/1.0', - 'os_family': 'linux', - 'ua_family': 'odoo', - 'timestamp': ts, - 'url': 'https://www.example.com/route/1', + "ip": "127.0.0.1", + "user_agent": "Odoo Test/1.0", + "os_family": "linux", + "ua_family": "odoo", + "timestamp": ts, + "url": "https://www.example.com/route/1", } # First click event (URL 1) - tracking.event_create('click', metadata) - opens = tracking.tracking_event_ids.filtered( - lambda r: r.event_type == 'click' - ) + tracking.event_create("click", metadata) + opens = tracking.tracking_event_ids.filtered(lambda r: r.event_type == "click") self.assertEqual(len(opens), 1) # Concurrent click event (URL 1) - metadata['timestamp'] = ts + 2 - tracking.event_create('click', metadata) - opens = tracking.tracking_event_ids.filtered( - lambda r: r.event_type == 'click' - ) + metadata["timestamp"] = ts + 2 + tracking.event_create("click", metadata) + opens = tracking.tracking_event_ids.filtered(lambda r: r.event_type == "click") self.assertEqual(len(opens), 1) # Second click event (URL 1) - metadata['timestamp'] = ts + 350 - tracking.event_create('click', metadata) - opens = tracking.tracking_event_ids.filtered( - lambda r: r.event_type == 'click' - ) + metadata["timestamp"] = ts + 350 + tracking.event_create("click", metadata) + opens = tracking.tracking_event_ids.filtered(lambda r: r.event_type == "click") self.assertEqual(len(opens), 2) # Concurrent click event (URL 2) - metadata['timestamp'] = ts + 2 - metadata['url'] = 'https://www.example.com/route/2' - tracking.event_create('click', metadata) - opens = tracking.tracking_event_ids.filtered( - lambda r: r.event_type == 'click' - ) + metadata["timestamp"] = ts + 2 + metadata["url"] = "https://www.example.com/route/2" + tracking.event_create("click", metadata) + opens = tracking.tracking_event_ids.filtered(lambda r: r.event_type == "click") self.assertEqual(len(opens), 3) - @mute_logger('odoo.addons.mail.models.mail_mail') + @mute_logger("odoo.addons.mail.models.mail_mail") def test_smtp_error(self): with mock.patch(mock_send_email) as mock_func: - mock_func.side_effect = Warning('Test error') + mock_func.side_effect = Warning("Test error") mail, tracking = self.mail_send(self.recipient.email) - self.assertEqual('error', tracking.state) - self.assertEqual('Warning', tracking.error_type) - self.assertEqual('Test error', tracking.error_description) + self.assertEqual("error", tracking.state) + self.assertEqual("Warning", tracking.error_type) + self.assertEqual("Test error", tracking.error_description) self.assertTrue(self.recipient.email_bounced) def test_partner_email_change(self): mail, tracking = self.mail_send(self.recipient.email) - tracking.event_create('open', {}) + tracking.event_create("open", {}) orig_score = self.recipient.email_score orig_count = self.recipient.tracking_emails_count orig_email = self.recipient.email - self.recipient.email = orig_email + '2' + self.recipient.email = orig_email + "2" self.assertEqual(50.0, self.recipient.email_score) self.assertEqual(0, self.recipient.tracking_emails_count) self.recipient.email = orig_email @@ -388,108 +384,106 @@ class TestMailTracking(TransactionCase): def test_process_hard_bounce(self): mail, tracking = self.mail_send(self.recipient.email) - tracking.event_create('hard_bounce', {}) - self.assertEqual('bounced', tracking.state) + tracking.event_create("hard_bounce", {}) + self.assertEqual("bounced", tracking.state) self.assertTrue(self.recipient.email_score < 50.0) def test_process_soft_bounce(self): mail, tracking = self.mail_send(self.recipient.email) - tracking.event_create('soft_bounce', {}) - self.assertEqual('soft-bounced', tracking.state) + tracking.event_create("soft_bounce", {}) + self.assertEqual("soft-bounced", tracking.state) self.assertTrue(self.recipient.email_score < 50.0) def test_process_delivered(self): mail, tracking = self.mail_send(self.recipient.email) - tracking.event_create('delivered', {}) - self.assertEqual('delivered', tracking.state) + tracking.event_create("delivered", {}) + self.assertEqual("delivered", tracking.state) self.assertTrue(self.recipient.email_score > 50.0) def test_process_deferral(self): mail, tracking = self.mail_send(self.recipient.email) - tracking.event_create('deferral', {}) - self.assertEqual('deferred', tracking.state) + tracking.event_create("deferral", {}) + self.assertEqual("deferred", tracking.state) def test_process_spam(self): mail, tracking = self.mail_send(self.recipient.email) - tracking.event_create('spam', {}) - self.assertEqual('spam', tracking.state) + tracking.event_create("spam", {}) + self.assertEqual("spam", tracking.state) self.assertTrue(self.recipient.email_score < 50.0) def test_process_unsub(self): mail, tracking = self.mail_send(self.recipient.email) - tracking.event_create('unsub', {}) - self.assertEqual('unsub', tracking.state) + tracking.event_create("unsub", {}) + self.assertEqual("unsub", tracking.state) self.assertTrue(self.recipient.email_score < 50.0) def test_process_reject(self): mail, tracking = self.mail_send(self.recipient.email) - tracking.event_create('reject', {}) - self.assertEqual('rejected', tracking.state) + tracking.event_create("reject", {}) + self.assertEqual("rejected", tracking.state) self.assertTrue(self.recipient.email_score < 50.0) def test_process_open(self): mail, tracking = self.mail_send(self.recipient.email) - tracking.event_create('open', {}) - self.assertEqual('opened', tracking.state) + tracking.event_create("open", {}) + self.assertEqual("opened", tracking.state) self.assertTrue(self.recipient.email_score > 50.0) def test_process_click(self): mail, tracking = self.mail_send(self.recipient.email) - tracking.event_create('click', {}) - self.assertEqual('opened', tracking.state) + tracking.event_create("click", {}) + self.assertEqual("opened", tracking.state) self.assertTrue(self.recipient.email_score > 50.0) def test_process_several_bounce(self): - for i in range(1, 10): + for _i in range(1, 10): mail, tracking = self.mail_send(self.recipient.email) - tracking.event_create('hard_bounce', {}) - self.assertEqual('bounced', tracking.state) + tracking.event_create("hard_bounce", {}) + self.assertEqual("bounced", tracking.state) self.assertEqual(0.0, self.recipient.email_score) def test_bounce_new_partner(self): mail, tracking = self.mail_send(self.recipient.email) - tracking.event_create('hard_bounce', {}) - new_partner = self.env['res.partner'].create({ - 'name': 'Test New Partner', - }) + tracking.event_create("hard_bounce", {}) + new_partner = self.env["res.partner"].create({"name": "Test New Partner"}) new_partner.email = self.recipient.email self.assertTrue(new_partner.email_bounced) def test_recordset_email_score(self): """For backwords compatibility sake""" - trackings = self.env['mail.tracking.email'] - for i in range(11): + trackings = self.env["mail.tracking.email"] + for _i in range(11): mail, tracking = self.mail_send(self.recipient.email) - tracking.event_create('click', {}) + tracking.event_create("click", {}) trackings |= tracking self.assertEqual(100.0, trackings.email_score()) def test_db(self): db = self.env.cr.dbname controller = MailTrackingController() - with mock.patch('odoo.http.db_filter') as mock_client: + with mock.patch("odoo.http.db_filter") as mock_client: mock_client.return_value = True with self.assertRaises(psycopg2.OperationalError): - controller.mail_tracking_event('not_found_db') + controller.mail_tracking_event("not_found_db") none = controller.mail_tracking_event(db) - self.assertEqual(b'NONE', none.response[0]) - none = controller.mail_tracking_event(db, 'open') - self.assertEqual(b'NONE', none.response[0]) + self.assertEqual(b"NONE", none.response[0]) + none = controller.mail_tracking_event(db, "open") + self.assertEqual(b"NONE", none.response[0]) class TestMailTrackingViews(TransactionCase): def test_fields_view_get(self): - result = self.env['res.partner'].fields_view_get( - view_id=self.env.ref('base.view_partner_form').id, - view_type='form') - doc = etree.XML(result['arch']) + result = self.env["res.partner"].fields_view_get( + view_id=self.env.ref("base.view_partner_form").id, view_type="form" + ) + doc = etree.XML(result["arch"]) nodes = doc.xpath( - "//field[@name='failed_message_ids'" - " and @widget='mail_failed_message']") + "//field[@name='failed_message_ids'" " and @widget='mail_failed_message']" + ) self.assertTrue(nodes) - result = self.env['res.partner'].fields_view_get( - view_id=self.env.ref('base.view_res_partner_filter').id, - view_type='search') - doc = etree.XML(result['arch']) + result = self.env["res.partner"].fields_view_get( + view_id=self.env.ref("base.view_res_partner_filter").id, view_type="search" + ) + doc = etree.XML(result["arch"]) nodes = doc.xpath("//filter[@name='failed_message_ids']") self.assertTrue(nodes)