[IMP] mail_tracking: black, isort

This commit is contained in:
Alexandre Díaz 2019-11-18 11:46:59 +01:00 committed by Jasmin Solanki
parent 9d4c4d3533
commit 200e016ab8
13 changed files with 894 additions and 787 deletions

View File

@ -10,15 +10,11 @@
"version": "12.0.2.0.1",
"category": "Social Network",
"website": "http://github.com/OCA/social",
"author": "Tecnativa, "
"Odoo Community Association (OCA)",
"author": ("Tecnativa, " "Odoo Community Association (OCA)"),
"license": "AGPL-3",
"application": False,
'installable': True,
"depends": [
"decimal_precision",
"mail",
],
"installable": True,
"depends": ["decimal_precision", "mail"],
"data": [
"data/tracking_data.xml",
"security/mail_tracking_email_security.xml",
@ -35,8 +31,6 @@
"static/src/xml/failed_message/thread.xml",
"static/src/xml/failed_message/discuss.xml",
],
'demo': [
'demo/demo.xml',
],
"demo": ["demo/demo.xml"],
"pre_init_hook": "pre_init_hook",
}

View File

@ -1,17 +1,20 @@
# Copyright 2016 Antonio Espinosa - <antonio.espinosa@tecnativa.com>
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
import werkzeug
import odoo
import base64
import logging
from contextlib import contextmanager
from odoo import api, http, SUPERUSER_ID
import werkzeug
import odoo
from odoo import SUPERUSER_ID, api, http
from odoo.addons.mail.controllers.main import MailController
import logging
import base64
_logger = logging.getLogger(__name__)
BLANK = 'R0lGODlhAQABAIAAANvf7wAAACH5BAEAAAAALAAAAAABAAEAAAICRAEAOw=='
BLANK = "R0lGODlhAQABAIAAANvf7wAAACH5BAEAAAAALAAAAAABAAEAAAICRAEAOw=="
@contextmanager
@ -28,60 +31,74 @@ def db_env(dbname):
class MailTrackingController(MailController):
def _request_metadata(self):
"""Prepare remote info metadata"""
request = http.request.httprequest
return {
'ip': request.remote_addr or False,
'user_agent': request.user_agent or False,
'os_family': request.user_agent.platform or False,
'ua_family': request.user_agent.browser or False,
"ip": request.remote_addr or False,
"user_agent": request.user_agent or False,
"os_family": request.user_agent.platform or False,
"ua_family": request.user_agent.browser or False,
}
@http.route(['/mail/tracking/all/<string:db>',
'/mail/tracking/event/<string:db>/<string:event_type>'],
type='http', auth='none', csrf=False)
@http.route(
[
"/mail/tracking/all/<string:db>",
"/mail/tracking/event/<string:db>/<string:event_type>",
],
type="http",
auth="none",
csrf=False,
)
def mail_tracking_event(self, db, event_type=None, **kw):
"""Route used by external mail service"""
metadata = self._request_metadata()
res = None
with db_env(db) as env:
try:
res = env['mail.tracking.email'].event_process(
http.request, kw, metadata, event_type=event_type)
res = env["mail.tracking.email"].event_process(
http.request, kw, metadata, event_type=event_type
)
except Exception:
pass
if not res or res == 'NOT FOUND':
if not res or res == "NOT FOUND":
return werkzeug.exceptions.NotAcceptable()
return res
@http.route(['/mail/tracking/open/<string:db>'
'/<int:tracking_email_id>/blank.gif',
'/mail/tracking/open/<string:db>'
'/<int:tracking_email_id>/<string:token>/blank.gif'],
type='http', auth='none', methods=['GET'])
@http.route(
[
"/mail/tracking/open/<string:db>" "/<int:tracking_email_id>/blank.gif",
"/mail/tracking/open/<string:db>"
"/<int:tracking_email_id>/<string:token>/blank.gif",
],
type="http",
auth="none",
methods=["GET"],
)
def mail_tracking_open(self, db, tracking_email_id, token=False, **kw):
"""Route used to track mail openned (With & Without Token)"""
metadata = self._request_metadata()
with db_env(db) as env:
try:
tracking_email = env['mail.tracking.email'].search([
('id', '=', tracking_email_id),
('state', 'in', ['sent', 'delivered']),
('token', '=', token),
])
tracking_email = env["mail.tracking.email"].search(
[
("id", "=", tracking_email_id),
("state", "in", ["sent", "delivered"]),
("token", "=", token),
]
)
if tracking_email:
tracking_email.event_create('open', metadata)
tracking_email.event_create("open", metadata)
else:
_logger.warning(
"MailTracking email '%s' not found", tracking_email_id)
"MailTracking email '%s' not found", tracking_email_id
)
except Exception:
pass
# Always return GIF blank image
response = werkzeug.wrappers.Response()
response.mimetype = 'image/gif'
response.mimetype = "image/gif"
response.data = base64.b64decode(BLANK)
return response
@ -89,8 +106,7 @@ class MailTrackingController(MailController):
def mail_init_messaging(self):
"""Route used to initial values of Discuss app"""
values = super().mail_init_messaging()
values.update({
'failed_counter':
http.request.env['mail.message'].get_failed_count(),
})
values.update(
{"failed_counter": http.request.env["mail.message"].get_failed_count()}
)
return values

View File

@ -2,34 +2,40 @@
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
import logging
from psycopg2.extensions import AsIs
_logger = logging.getLogger(__name__)
def column_exists(cr, table, column):
cr.execute("""
cr.execute(
"""
SELECT column_name
FROM information_schema.columns
WHERE table_name = %s AND column_name = %s""", (table, column))
WHERE table_name = %s AND column_name = %s""",
(table, column),
)
return bool(cr.fetchall())
def column_add_with_value(cr, table, column, field_type, value):
if not column_exists(cr, table, column):
cr.execute("""
cr.execute(
"""
ALTER TABLE %s
ADD COLUMN %s %s""", (AsIs(table), AsIs(column), AsIs(field_type)))
cr.execute("""
UPDATE %s SET %s = %s""", (AsIs(table), AsIs(column), value))
ADD COLUMN %s %s""",
(AsIs(table), AsIs(column), AsIs(field_type)),
)
cr.execute(
"""
UPDATE %s SET %s = %s""",
(AsIs(table), AsIs(column), value),
)
def pre_init_hook(cr):
_logger.info("Creating res.partner.tracking_emails_count column "
"with value 0")
column_add_with_value(
cr, "res_partner", "tracking_emails_count", "integer", 0)
_logger.info("Creating res.partner.email_score column "
"with value 50.0")
column_add_with_value(
cr, "res_partner", "email_score", "double precision", 50.0)
_logger.info("Creating res.partner.tracking_emails_count column " "with value 0")
column_add_with_value(cr, "res_partner", "tracking_emails_count", "integer", 0)
_logger.info("Creating res.partner.email_score column " "with value 50.0")
column_add_with_value(cr, "res_partner", "email_score", "double precision", 50.0)

View File

@ -3,7 +3,8 @@
import re
import threading
from odoo import models, api, tools
from odoo import api, models, tools
class IrMailServer(models.Model):
@ -12,40 +13,63 @@ class IrMailServer(models.Model):
def _tracking_headers_add(self, tracking_email_id, headers):
"""Allow other addons to add its own tracking SMTP headers"""
headers = headers or {}
headers['X-Odoo-Database'] = getattr(
threading.currentThread(), 'dbname', None),
headers['X-Odoo-Tracking-ID'] = tracking_email_id
headers["X-Odoo-Database"] = (
getattr(threading.currentThread(), "dbname", None),
)
headers["X-Odoo-Tracking-ID"] = tracking_email_id
return headers
def _tracking_email_id_body_get(self, body):
body = body or ''
body = body or ""
# https://regex101.com/r/lW4cB1/2
match = re.search(
r'<img[^>]*data-odoo-tracking-email=["\']([0-9]*)["\']', body)
match = re.search(r'<img[^>]*data-odoo-tracking-email=["\']([0-9]*)["\']', body)
return int(match.group(1)) if match.group(1) else False
def build_email(self, email_from, email_to, subject, body, email_cc=None,
email_bcc=None, reply_to=False, attachments=None,
message_id=None, references=None, object_id=False,
subtype='plain', headers=None, body_alternative=None,
subtype_alternative='plain'):
def build_email(
self,
email_from,
email_to,
subject,
body,
email_cc=None,
email_bcc=None,
reply_to=False,
attachments=None,
message_id=None,
references=None,
object_id=False,
subtype="plain",
headers=None,
body_alternative=None,
subtype_alternative="plain",
):
tracking_email_id = self._tracking_email_id_body_get(body)
if tracking_email_id:
headers = self._tracking_headers_add(tracking_email_id, headers)
msg = super(IrMailServer, self).build_email(
email_from, email_to, subject, body, email_cc=email_cc,
email_bcc=email_bcc, reply_to=reply_to, attachments=attachments,
message_id=message_id, references=references, object_id=object_id,
subtype=subtype, headers=headers,
email_from,
email_to,
subject,
body,
email_cc=email_cc,
email_bcc=email_bcc,
reply_to=reply_to,
attachments=attachments,
message_id=message_id,
references=references,
object_id=object_id,
subtype=subtype,
headers=headers,
body_alternative=body_alternative,
subtype_alternative=subtype_alternative)
subtype_alternative=subtype_alternative,
)
return msg
def _tracking_email_get(self, message):
tracking_email_id = False
if message.get('X-Odoo-Tracking-ID', '').isdigit():
tracking_email_id = int(message['X-Odoo-Tracking-ID'])
return self.env['mail.tracking.email'].browse(tracking_email_id)
if message.get("X-Odoo-Tracking-ID", "").isdigit():
tracking_email_id = int(message["X-Odoo-Tracking-ID"])
return self.env["mail.tracking.email"].browse(tracking_email_id)
def _smtp_server_get(self, mail_server_id, smtp_server):
smtp_server_used = False
@ -53,36 +77,49 @@ class IrMailServer(models.Model):
if mail_server_id:
mail_server = self.browse(mail_server_id)
elif not smtp_server:
mail_server_ids = self.search([], order='sequence', limit=1)
mail_server_ids = self.search([], order="sequence", limit=1)
mail_server = mail_server_ids[0] if mail_server_ids else None
if mail_server:
smtp_server_used = mail_server.smtp_host
else:
smtp_server_used = smtp_server or tools.config.get('smtp_server')
smtp_server_used = smtp_server or tools.config.get("smtp_server")
return smtp_server_used
@api.model
def send_email(self, message, mail_server_id=None, smtp_server=None,
smtp_port=None, smtp_user=None, smtp_password=None,
smtp_encryption=None, smtp_debug=False, smtp_session=None):
def send_email(
self,
message,
mail_server_id=None,
smtp_server=None,
smtp_port=None,
smtp_user=None,
smtp_password=None,
smtp_encryption=None,
smtp_debug=False,
smtp_session=None,
):
message_id = False
tracking_email = self._tracking_email_get(message)
smtp_server_used = self.sudo()._smtp_server_get(
mail_server_id, smtp_server,
)
smtp_server_used = self.sudo()._smtp_server_get(mail_server_id, smtp_server)
try:
message_id = super(IrMailServer, self).send_email(
message, mail_server_id=mail_server_id,
smtp_server=smtp_server, smtp_port=smtp_port,
smtp_user=smtp_user, smtp_password=smtp_password,
smtp_encryption=smtp_encryption, smtp_debug=smtp_debug,
smtp_session=smtp_session)
message,
mail_server_id=mail_server_id,
smtp_server=smtp_server,
smtp_port=smtp_port,
smtp_user=smtp_user,
smtp_password=smtp_password,
smtp_encryption=smtp_encryption,
smtp_debug=smtp_debug,
smtp_session=smtp_session,
)
except Exception as e:
if tracking_email:
tracking_email.smtp_error(self, smtp_server_used, e)
if message_id and tracking_email:
vals = tracking_email._tracking_sent_prepare(
self, smtp_server_used, message, message_id)
self, smtp_server_used, message, message_id
)
if vals:
self.env['mail.tracking.event'].sudo().create(vals)
self.env["mail.tracking.event"].sudo().create(vals)
return message_id

View File

@ -1,7 +1,7 @@
# Copyright 2018 Tecnativa - Ernesto Tejeda
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
from odoo import api, fields, models
from odoo import fields, models
class MailBouncedMixin(models.AbstractModel):
@ -10,9 +10,9 @@ class MailBouncedMixin(models.AbstractModel):
the mixin and must contain the email field of the model.
"""
_name = 'mail.bounced.mixin'
_description = 'Mail bounced mixin'
_primary_email = ['email']
_name = "mail.bounced.mixin"
_description = "Mail bounced mixin"
_primary_email = ["email"]
email_bounced = fields.Boolean(index=True)
@ -20,25 +20,25 @@ class MailBouncedMixin(models.AbstractModel):
def email_bounced_set(self, tracking_emails, reason, event=None):
"""Inherit this method to make any other actions to the model that
inherit the mixin"""
if self.env.context.get('write_loop'):
if self.env.context.get("write_loop"):
# We avoid with the context an infinite recursion calling write
# method from other write method.
return True
partners = self.filtered(lambda r: not r.email_bounced)
return partners.write({'email_bounced': True})
return partners.write({"email_bounced": True})
def write(self, vals):
[email_field] = self._primary_email
if email_field not in vals:
return super().write(vals)
email = vals[email_field].lower() if vals[email_field] else False
mte_obj = self.env['mail.tracking.email']
vals['email_bounced'] = mte_obj.email_is_bounced(email)
if vals['email_bounced']:
mte_obj = self.env["mail.tracking.email"]
vals["email_bounced"] = mte_obj.email_is_bounced(email)
if vals["email_bounced"]:
res = mte_obj._email_last_tracking_state(email)
tracking = mte_obj.browse(res[0].get('id'))
tracking = mte_obj.browse(res[0].get("id"))
event = tracking.tracking_event_ids[:1]
self.with_context(
write_loop=True,
).email_bounced_set(tracking, event.error_details, event)
self.with_context(write_loop=True).email_bounced_set(
tracking, event.error_details, event
)
return super().write(vals)

View File

@ -5,27 +5,27 @@ import time
from datetime import datetime
from email.utils import COMMASPACE
from odoo import models, fields
from odoo import fields, models
class MailMail(models.Model):
_inherit = 'mail.mail'
_inherit = "mail.mail"
def _tracking_email_prepare(self, partner, email):
"""Prepare email.tracking.email record values"""
ts = time.time()
dt = datetime.utcfromtimestamp(ts)
email_to_list = email.get('email_to', [])
email_to_list = email.get("email_to", [])
email_to = COMMASPACE.join(email_to_list)
return {
'name': self.subject,
'timestamp': '%.6f' % ts,
'time': fields.Datetime.to_string(dt),
'mail_id': self.id,
'mail_message_id': self.mail_message_id.id,
'partner_id': partner.id if partner else False,
'recipient': email_to,
'sender': self.email_from,
"name": self.subject,
"timestamp": "%.6f" % ts,
"time": fields.Datetime.to_string(dt),
"mail_id": self.id,
"mail_message_id": self.mail_message_id.id,
"partner_id": partner.id if partner else False,
"recipient": email_to,
"sender": self.email_from,
}
def _send_prepare_values(self, partner=None):
@ -33,5 +33,5 @@ class MailMail(models.Model):
to the email"""
email = super()._send_prepare_values(partner=partner)
vals = self._tracking_email_prepare(partner, email)
tracking_email = self.env['mail.tracking.email'].sudo().create(vals)
tracking_email = self.env["mail.tracking.email"].sudo().create(vals)
return tracking_email.tracking_img_add(email)

View File

@ -2,7 +2,7 @@
# Copyright 2019 Alexandre Díaz
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
from odoo import _, models, api, fields
from odoo import _, api, fields, models
from odoo.tools import email_split
@ -10,16 +10,16 @@ class MailMessage(models.Model):
_inherit = "mail.message"
# Recipients
email_cc = fields.Char("Cc", help='Additional recipients that receive a '
'"Carbon Copy" of the e-mail')
email_cc = fields.Char(
"Cc", help="Additional recipients that receive a " '"Carbon Copy" of the e-mail'
)
mail_tracking_ids = fields.One2many(
comodel_name='mail.tracking.email',
inverse_name='mail_message_id',
comodel_name="mail.tracking.email",
inverse_name="mail_message_id",
string="Mail Trackings",
)
mail_tracking_needs_action = fields.Boolean(
help="The message tracking will be considered"
" to filter tracking issues",
help="The message tracking will be considered" " to filter tracking issues",
default=False,
)
is_failed_message = fields.Boolean(compute="_compute_is_failed_message")
@ -27,95 +27,105 @@ class MailMessage(models.Model):
@api.model
def get_failed_states(self):
"""The 'failed' states of the message"""
return {'error', 'rejected', 'spam', 'bounced', 'soft-bounced'}
return {"error", "rejected", "spam", "bounced", "soft-bounced"}
@api.depends('mail_tracking_needs_action', 'author_id', 'partner_ids',
'mail_tracking_ids.state')
@api.depends(
"mail_tracking_needs_action",
"author_id",
"partner_ids",
"mail_tracking_ids.state",
)
def _compute_is_failed_message(self):
"""Compute 'is_failed_message' field for the active user"""
failed_states = self.get_failed_states()
for message in self:
needs_action = message.mail_tracking_needs_action
involves_me = self.env.user.partner_id in (
message.author_id | message.partner_ids)
message.author_id | message.partner_ids
)
has_failed_trackings = failed_states.intersection(
message.mapped("mail_tracking_ids.state"))
message.mapped("mail_tracking_ids.state")
)
message.is_failed_message = bool(
needs_action and involves_me and has_failed_trackings)
needs_action and involves_me and has_failed_trackings
)
def _tracking_status_map_get(self):
"""Map tracking states to be used in chatter"""
return {
'False': 'waiting',
'error': 'error',
'deferred': 'sent',
'sent': 'sent',
'delivered': 'delivered',
'opened': 'opened',
'rejected': 'error',
'spam': 'error',
'unsub': 'opened',
'bounced': 'error',
'soft-bounced': 'error',
"False": "waiting",
"error": "error",
"deferred": "sent",
"sent": "sent",
"delivered": "delivered",
"opened": "opened",
"rejected": "error",
"spam": "error",
"unsub": "opened",
"bounced": "error",
"soft-bounced": "error",
}
def _partner_tracking_status_get(self, tracking_email):
"""Determine tracking status"""
tracking_status_map = self._tracking_status_map_get()
status = 'unknown'
status = "unknown"
if tracking_email:
tracking_email_status = str(tracking_email.state)
status = tracking_status_map.get(tracking_email_status, 'unknown')
status = tracking_status_map.get(tracking_email_status, "unknown")
return status
def _partner_tracking_status_human_get(self, status):
"""Translations for tracking statuses to be used on qweb"""
statuses = {'waiting': _('Waiting'), 'error': _('Error'),
'sent': _('Sent'), 'delivered': _('Delivered'),
'opened': _('Opened'), 'unknown': _('Unknown')}
statuses = {
"waiting": _("Waiting"),
"error": _("Error"),
"sent": _("Sent"),
"delivered": _("Delivered"),
"opened": _("Opened"),
"unknown": _("Unknown"),
}
return _("Status: %s") % statuses[status]
@api.model
def _get_error_description(self, tracking):
"""Translations for error descriptions to be used on qweb"""
descriptions = {
'no_recipient': _("The partner doesn't have a defined email"),
}
return descriptions.get(tracking.error_type,
tracking.error_description)
descriptions = {"no_recipient": _("The partner doesn't have a defined email")}
return descriptions.get(tracking.error_type, tracking.error_description)
def tracking_status(self):
"""Generates a complete status tracking of the messages by partner"""
res = {}
for message in self:
partner_trackings = []
partners_already = self.env['res.partner']
partners = self.env['res.partner']
trackings = self.env['mail.tracking.email'].sudo().search([
('mail_message_id', '=', message.id),
])
partners_already = self.env["res.partner"]
partners = self.env["res.partner"]
trackings = (
self.env["mail.tracking.email"]
.sudo()
.search([("mail_message_id", "=", message.id)])
)
# Get Cc recipients
email_cc_list = email_split(message.email_cc)
if any(email_cc_list):
partners |= partners.search([('email', 'in', email_cc_list)])
partners |= partners.search([("email", "in", email_cc_list)])
email_cc_list = set(email_cc_list)
# Search all trackings for this message
for tracking in trackings:
status = self._partner_tracking_status_get(tracking)
recipient = (
tracking.partner_id.name or tracking.recipient)
partner_trackings.append({
'status': status,
'status_human':
self._partner_tracking_status_human_get(status),
'error_type': tracking.error_type,
'error_description':
self._get_error_description(tracking),
'tracking_id': tracking.id,
'recipient': recipient,
'partner_id': tracking.partner_id.id,
'isCc': False,
})
recipient = tracking.partner_id.name or tracking.recipient
partner_trackings.append(
{
"status": status,
"status_human": self._partner_tracking_status_human_get(status),
"error_type": tracking.error_type,
"error_description": self._get_error_description(tracking),
"tracking_id": tracking.id,
"recipient": recipient,
"partner_id": tracking.partner_id.id,
"isCc": False,
}
)
if tracking.partner_id:
email_cc_list.discard(tracking.partner_id.email)
partners_already |= tracking.partner_id
@ -127,12 +137,11 @@ class MailMessage(models.Model):
# Remove recipients already included
partners -= partners_already
tracking_unkown_values = {
'status': 'unknown',
'status_human': self._partner_tracking_status_human_get(
'unknown'),
'error_type': False,
'error_description': False,
'tracking_id': False,
"status": "unknown",
"status_human": self._partner_tracking_status_human_get("unknown"),
"error_type": False,
"error_description": False,
"tracking_id": False,
}
for partner in partners:
# If there is partners not included, then status is 'unknown'
@ -141,36 +150,31 @@ class MailMessage(models.Model):
if partner.email in email_cc_list:
email_cc_list.discard(partner.email)
isCc = True
tracking_unkown_values.update({
'recipient': partner.name,
'partner_id': partner.id,
'isCc': isCc,
})
tracking_unkown_values.update(
{"recipient": partner.name, "partner_id": partner.id, "isCc": isCc}
)
partner_trackings.append(tracking_unkown_values.copy())
for email in email_cc_list:
# If there is Cc without partner
tracking_unkown_values.update({
'recipient': email,
'partner_id': False,
'isCc': True,
})
tracking_unkown_values.update(
{"recipient": email, "partner_id": False, "isCc": True}
)
partner_trackings.append(tracking_unkown_values.copy())
res[message.id] = {
'partner_trackings': partner_trackings,
'is_failed_message': message.is_failed_message,
"partner_trackings": partner_trackings,
"is_failed_message": message.is_failed_message,
}
return res
@api.model
def _message_read_dict_postprocess(self, messages, message_tree):
"""Preare values to be used by the chatter widget"""
res = super()._message_read_dict_postprocess(
messages, message_tree)
mail_message_ids = {m.get('id') for m in messages if m.get('id')}
res = super()._message_read_dict_postprocess(messages, message_tree)
mail_message_ids = {m.get("id") for m in messages if m.get("id")}
mail_messages = self.browse(mail_message_ids)
tracking_statuses = mail_messages.tracking_status()
for message_dict in messages:
mail_message_id = message_dict.get('id', False)
mail_message_id = message_dict.get("id", False)
if mail_message_id:
message_dict.update(tracking_statuses[mail_message_id])
return res
@ -180,27 +184,30 @@ class MailMessage(models.Model):
"""Preare values to be used by the chatter widget"""
self.ensure_one()
failed_trackings = self.mail_tracking_ids.filtered(
lambda x: x.state in self.get_failed_states())
failed_partners = failed_trackings.mapped('partner_id')
lambda x: x.state in self.get_failed_states()
)
failed_partners = failed_trackings.mapped("partner_id")
failed_recipients = failed_partners.name_get()
if self.author_id:
author = self.author_id.name_get()[0]
else:
author = (-1, _('-Unknown Author-'))
author = (-1, _("-Unknown Author-"))
return {
'id': self.id,
'date': self.date,
'author': author,
'body': self.body,
'failed_recipients': failed_recipients,
"id": self.id,
"date": self.date,
"author": author,
"body": self.body,
"failed_recipients": failed_recipients,
}
@api.multi
def get_failed_messages(self):
"""Returns the list of failed messages to be used by the
failed_messages widget"""
return [msg._prepare_dict_failed_message()
for msg in self.sorted('date', reverse=True)]
return [
msg._prepare_dict_failed_message()
for msg in self.sorted("date", reverse=True)
]
@api.multi
def set_need_action_done(self):
@ -208,23 +215,23 @@ class MailMessage(models.Model):
This will mark them to be ignored in the tracking issues filter.
"""
self.check_access_rule('read')
self.write({'mail_tracking_needs_action': False})
self.check_access_rule("read")
self.write({"mail_tracking_needs_action": False})
notification = {
'type': 'toggle_tracking_status',
'message_ids': self.ids,
'needs_actions': False
"type": "toggle_tracking_status",
"message_ids": self.ids,
"needs_actions": False,
}
self.env['bus.bus'].sendone(
(self._cr.dbname, 'res.partner', self.env.user.partner_id.id),
notification)
self.env["bus.bus"].sendone(
(self._cr.dbname, "res.partner", self.env.user.partner_id.id), notification
)
def _get_failed_message_domain(self):
domain = self.env['mail.thread']._get_failed_message_domain()
domain = self.env["mail.thread"]._get_failed_message_domain()
domain += [
'|',
('partner_ids', 'in', [self.env.user.partner_id.id]),
('author_id', '=', self.env.user.partner_id.id),
"|",
("partner_ids", "in", [self.env.user.partner_id.id]),
("author_id", "=", self.env.user.partner_id.id),
]
return domain
@ -240,17 +247,16 @@ class MailMessage(models.Model):
Used by Discuss """
unreviewed_messages = self.search(self._get_failed_message_domain())
unreviewed_messages.write({'mail_tracking_needs_action': False})
unreviewed_messages.write({"mail_tracking_needs_action": False})
ids = unreviewed_messages.ids
self.env['bus.bus'].sendone(
(self._cr.dbname, 'res.partner',
self.env.user.partner_id.id),
self.env["bus.bus"].sendone(
(self._cr.dbname, "res.partner", self.env.user.partner_id.id),
{
'type': 'toggle_tracking_status',
'message_ids': ids,
'needs_actions': False,
}
"type": "toggle_tracking_status",
"message_ids": ids,
"needs_actions": False,
},
)
return ids

View File

@ -1,7 +1,7 @@
# Copyright 2019 Alexandre Díaz
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
from odoo import models, api
from odoo import api, models
class MailResendMessage(models.TransientModel):
@ -10,45 +10,53 @@ class MailResendMessage(models.TransientModel):
@api.model
def default_get(self, fields):
rec = super().default_get(fields)
message_id = self._context.get('mail_message_to_resend')
message_id = self._context.get("mail_message_to_resend")
if message_id:
MailMessageObj = self.env['mail.message']
MailMessageObj = self.env["mail.message"]
mail_message_id = MailMessageObj.browse(message_id)
failed_states = MailMessageObj.get_failed_states()
tracking_ids = mail_message_id.mail_tracking_ids.filtered(
lambda x: x.state in failed_states)
lambda x: x.state in failed_states
)
if any(tracking_ids):
partner_ids = [(0, 0, {
partner_ids = [
(
0,
0,
{
"partner_id": tracking.partner_id.id,
"name": tracking.partner_id.name,
"email": tracking.partner_id.email,
"resend": True,
"message": tracking.error_description,
}) for tracking in tracking_ids]
rec['partner_ids'].extend(partner_ids)
},
)
for tracking in tracking_ids
]
rec["partner_ids"].extend(partner_ids)
return rec
@api.multi
def resend_mail_action(self):
for wizard in self:
to_send = wizard.partner_ids.filtered("resend").mapped(
"partner_id")
to_send = wizard.partner_ids.filtered("resend").mapped("partner_id")
if to_send:
# Set as reviewed
wizard.mail_message_id.mail_tracking_needs_action = False
# Reset mail.tracking.email state
tracking_ids = wizard.mail_message_id.mail_tracking_ids\
.filtered(lambda x: x.partner_id in to_send)
tracking_ids.write({'state': False})
tracking_ids = wizard.mail_message_id.mail_tracking_ids.filtered(
lambda x: x.partner_id in to_send
)
tracking_ids.write({"state": False})
# Send bus notifications to update Discuss and
# mail_failed_messages widget
notification = {
'type': 'toggle_tracking_status',
'message_ids': [self.mail_message_id.id],
'needs_actions': False
"type": "toggle_tracking_status",
"message_ids": [self.mail_message_id.id],
"needs_actions": False,
}
self.env['bus.bus'].sendone(
(self._cr.dbname, 'res.partner',
self.env.user.partner_id.id),
notification)
self.env["bus.bus"].sendone(
(self._cr.dbname, "res.partner", self.env.user.partner_id.id),
notification,
)
super().resend_mail_action()

View File

@ -1,32 +1,36 @@
# Copyright 2019 Alexandre Díaz
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
from odoo import fields, models, api, _
from email.utils import getaddresses
from odoo.tools import email_split_and_format
from lxml import etree
from odoo import _, api, fields, models
from odoo.tools import email_split_and_format
class MailThread(models.AbstractModel):
_inherit = "mail.thread"
failed_message_ids = fields.One2many(
'mail.message', 'res_id', string='Failed Messages',
domain=lambda self:
[('model', '=', self._name)]
+ self._get_failed_message_domain())
"mail.message",
"res_id",
string="Failed Messages",
domain=lambda self: [("model", "=", self._name)]
+ self._get_failed_message_domain(),
)
def _get_failed_message_domain(self):
"""Domain used to display failed messages on the 'failed_messages'
widget"""
failed_states = self.env['mail.message'].get_failed_states()
failed_states = self.env["mail.message"].get_failed_states()
return [
('mail_tracking_needs_action', '=', True),
('mail_tracking_ids.state', 'in', list(failed_states)),
("mail_tracking_needs_action", "=", True),
("mail_tracking_ids.state", "in", list(failed_states)),
]
@api.multi
@api.returns('self', lambda value: value.id)
@api.returns("self", lambda value: value.id)
def message_post(self, *args, **kwargs):
"""Adds CC recipient to the message.
@ -34,11 +38,9 @@ class MailThread(models.AbstractModel):
this information its written into the mail.message record.
"""
new_message = super().message_post(*args, **kwargs)
email_cc = kwargs.get('cc')
email_cc = kwargs.get("cc")
if email_cc:
new_message.sudo().write({
'email_cc': email_cc,
})
new_message.sudo().write({"email_cc": email_cc})
return new_message
@api.multi
@ -48,71 +50,79 @@ class MailThread(models.AbstractModel):
If the recipient has a res.partner, use it.
"""
res = super().message_get_suggested_recipients()
ResPartnerObj = self.env['res.partner']
ResPartnerObj = self.env["res.partner"]
email_cc_formated_list = []
for record in self:
emails_cc = record.message_ids.mapped('email_cc')
emails_cc = record.message_ids.mapped("email_cc")
for email in emails_cc:
email_cc_formated_list.extend(email_split_and_format(email))
email_cc_formated_list = set(email_cc_formated_list)
for cc in email_cc_formated_list:
email_parts = getaddresses([cc])[0]
partner_id = record.message_partner_info_from_emails(
[email_parts[1]])[0].get('partner_id')
partner_id = record._message_partner_info_from_emails([email_parts[1]])[
0
].get("partner_id")
if not partner_id:
record._message_add_suggested_recipient(
res, email=cc, reason=_('Cc'))
record._message_add_suggested_recipient(res, email=cc, reason=_("Cc"))
else:
partner = ResPartnerObj.browse(partner_id, self._prefetch)
record._message_add_suggested_recipient(
res, partner=partner, reason=_('Cc'))
res, partner=partner, reason=_("Cc")
)
return res
@api.model
def fields_view_get(self, view_id=None, view_type='form', toolbar=False,
submenu=False):
def fields_view_get(
self, view_id=None, view_type="form", toolbar=False, submenu=False
):
"""Add filters for failed messages.
These filters will show up on any form or search views of any
model inheriting from ``mail.thread``.
"""
res = super().fields_view_get(
view_id=view_id, view_type=view_type, toolbar=toolbar,
submenu=submenu)
if view_type not in {'search', 'form'}:
view_id=view_id, view_type=view_type, toolbar=toolbar, submenu=submenu
)
if view_type not in {"search", "form"}:
return res
doc = etree.XML(res['arch'])
if view_type == 'search':
doc = etree.XML(res["arch"])
if view_type == "search":
# Modify view to add new filter element
nodes = doc.xpath("//search")
if nodes:
# Create filter element
new_filter = etree.Element(
'filter', {
'string': _('Failed sent messages'),
'name': "failed_message_ids",
'domain': str([
['failed_message_ids.mail_tracking_ids.state',
'in',
list(
self.env['mail.message'].get_failed_states()
)],
['failed_message_ids.mail_tracking_needs_action',
'=', True]
])
})
nodes[0].append(etree.Element('separator'))
"filter",
{
"string": _("Failed sent messages"),
"name": "failed_message_ids",
"domain": str(
[
[
"failed_message_ids.mail_tracking_ids.state",
"in",
list(self.env["mail.message"].get_failed_states()),
],
[
"failed_message_ids.mail_tracking_needs_action",
"=",
True,
],
]
),
},
)
nodes[0].append(etree.Element("separator"))
nodes[0].append(new_filter)
elif view_type == 'form':
elif view_type == "form":
# Modify view to add new field element
nodes = doc.xpath(
"//field[@name='message_ids' and @widget='mail_thread']")
nodes = doc.xpath("//field[@name='message_ids' and @widget='mail_thread']")
if nodes:
# Create field
field_failed_messages = etree.Element('field', {
'name': 'failed_message_ids',
'widget': 'mail_failed_message',
})
field_failed_messages = etree.Element(
"field",
{"name": "failed_message_ids", "widget": "mail_failed_message"},
)
nodes[0].addprevious(field_failed_messages)
res['arch'] = etree.tostring(doc, encoding='unicode')
res["arch"] = etree.tostring(doc, encoding="unicode")
return res

View File

@ -2,9 +2,9 @@
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
import logging
import urllib.parse
import time
import re
import time
import urllib.parse
import uuid
from datetime import datetime
@ -19,9 +19,9 @@ EVENT_CLICK_DELTA = 5 # seconds
class MailTrackingEmail(models.Model):
_name = "mail.tracking.email"
_order = 'time desc'
_rec_name = 'display_name'
_description = 'MailTracking email'
_order = "time desc"
_rec_name = "display_name"
_description = "MailTracking email"
# This table is going to grow fast and to infinite, so we index:
# - name: Search in tree view
@ -30,38 +30,51 @@ class MailTrackingEmail(models.Model):
# - state: Search and group_by in tree view
name = fields.Char(string="Subject", readonly=True, index=True)
display_name = fields.Char(
string="Display name", readonly=True, store=True,
compute="_compute_tracking_display_name")
string="Display name",
readonly=True,
store=True,
compute="_compute_tracking_display_name",
)
timestamp = fields.Float(
string='UTC timestamp', readonly=True,
digits=dp.get_precision('MailTracking Timestamp'))
time = fields.Datetime(string="Time", readonly=True, index=True)
date = fields.Date(
string="Date", readonly=True, compute="_compute_date", store=True)
string="Date", readonly=True, compute="_compute_date", store=True
)
mail_message_id = fields.Many2one(
string="Message", comodel_name='mail.message', readonly=True,
index=True)
mail_id = fields.Many2one(
string="Email", comodel_name='mail.mail', readonly=True)
string="Message", comodel_name="mail.message", readonly=True, index=True
)
mail_id = fields.Many2one(string="Email", comodel_name="mail.mail", readonly=True)
partner_id = fields.Many2one(
string="Partner", comodel_name='res.partner', readonly=True)
recipient = fields.Char(string='Recipient email', readonly=True)
string="Partner", comodel_name="res.partner", readonly=True
)
recipient = fields.Char(string="Recipient email", readonly=True)
recipient_address = fields.Char(
string='Recipient email address', readonly=True, store=True,
compute='_compute_recipient_address', index=True)
sender = fields.Char(string='Sender email', readonly=True)
state = fields.Selection([
('error', 'Error'),
('deferred', 'Deferred'),
('sent', 'Sent'),
('delivered', 'Delivered'),
('opened', 'Opened'),
('rejected', 'Rejected'),
('spam', 'Spam'),
('unsub', 'Unsubscribed'),
('bounced', 'Bounced'),
('soft-bounced', 'Soft bounced'),
], string='State', index=True, readonly=True, default=False,
string="Recipient email address",
readonly=True,
store=True,
compute="_compute_recipient_address",
index=True,
)
sender = fields.Char(string="Sender email", readonly=True)
state = fields.Selection(
[
("error", "Error"),
("deferred", "Deferred"),
("sent", "Sent"),
("delivered", "Delivered"),
("opened", "Opened"),
("rejected", "Rejected"),
("spam", "Spam"),
("unsub", "Unsubscribed"),
("bounced", "Bounced"),
("soft-bounced", "Soft bounced"),
],
string="State",
index=True,
readonly=True,
default=False,
help=" * The 'Error' status indicates that there was an error "
"when trying to sent the email, for example, "
"'No valid recipient'\n"
@ -81,76 +94,89 @@ class MailTrackingEmail(models.Model):
" * The 'Bounced' status indicates that message was bounced "
"by recipient Mail Exchange (MX) server.\n"
" * The 'Soft bounced' status indicates that message was soft "
"bounced by recipient Mail Exchange (MX) server.\n")
error_smtp_server = fields.Char(string='Error SMTP server', readonly=True)
error_type = fields.Char(string='Error type', readonly=True)
error_description = fields.Char(
string='Error description', readonly=True)
bounce_type = fields.Char(string='Bounce type', readonly=True)
bounce_description = fields.Char(
string='Bounce description', readonly=True)
"bounced by recipient Mail Exchange (MX) server.\n",
)
error_smtp_server = fields.Char(string="Error SMTP server", readonly=True)
error_type = fields.Char(string="Error type", readonly=True)
error_description = fields.Char(string="Error description", readonly=True)
bounce_type = fields.Char(string="Bounce type", readonly=True)
bounce_description = fields.Char(string="Bounce description", readonly=True)
tracking_event_ids = fields.One2many(
string="Tracking events", comodel_name='mail.tracking.event',
inverse_name='tracking_email_id', readonly=True)
string="Tracking events",
comodel_name="mail.tracking.event",
inverse_name="tracking_email_id",
readonly=True,
)
# Token isn't generated here to have compatibility with older trackings.
# New trackings have token and older not
token = fields.Char(string="Security Token", readonly=True,
token = fields.Char(
string="Security Token",
readonly=True,
default=lambda s: uuid.uuid4().hex,
groups="base.group_system")
groups="base.group_system",
)
@api.model_create_multi
def create(self, vals_list):
records = super().create(vals_list)
failed_states = self.env['mail.message'].get_failed_states()
records \
.filtered(lambda one: one.state in failed_states) \
.mapped("mail_message_id") \
.write({'mail_tracking_needs_action': True})
failed_states = self.env["mail.message"].get_failed_states()
records.filtered(lambda one: one.state in failed_states).mapped(
"mail_message_id"
).write({"mail_tracking_needs_action": True})
return records
@api.multi
def write(self, vals):
super().write(vals)
state = vals.get('state')
if state and state in self.env['mail.message'].get_failed_states():
self.mapped('mail_message_id').write({
'mail_tracking_needs_action': True,
})
state = vals.get("state")
if state and state in self.env["mail.message"].get_failed_states():
self.mapped("mail_message_id").write({"mail_tracking_needs_action": True})
@api.model
def email_is_bounced(self, email):
if not email:
return False
res = self._email_last_tracking_state(email)
return res and res[0].get('state', '') in {'rejected', 'error',
'spam', 'bounced'}
return res and res[0].get("state", "") in {
"rejected",
"error",
"spam",
"bounced",
}
@api.model
def _email_last_tracking_state(self, email):
return self.search_read([('recipient_address', '=', email.lower())],
['state'], limit=1, order='time DESC')
return self.search_read(
[("recipient_address", "=", email.lower())],
["state"],
limit=1,
order="time DESC",
)
@api.model
def email_score_from_email(self, email):
if not email:
return 0.
data = self.read_group([('recipient_address', '=', email.lower())],
['recipient_address', 'state'], ['state'])
mapped_data = {state['state']: state['state_count'] for state in data}
return 0.0
data = self.read_group(
[("recipient_address", "=", email.lower())],
["recipient_address", "state"],
["state"],
)
mapped_data = {state["state"]: state["state_count"] for state in data}
return self.with_context(mt_states=mapped_data).email_score()
@api.model
def _email_score_weights(self):
"""Default email score weights. Ready to be inherited"""
return {
'error': -50.0,
'rejected': -25.0,
'spam': -25.0,
'bounced': -25.0,
'soft-bounced': -10.0,
'unsub': -10.0,
'delivered': 1.0,
'opened': 5.0,
"error": -50.0,
"rejected": -25.0,
"spam": -25.0,
"bounced": -25.0,
"soft-bounced": -10.0,
"unsub": -10.0,
"delivered": 1.0,
"opened": 5.0,
}
def email_score(self):
@ -163,7 +189,7 @@ class MailTrackingEmail(models.Model):
"""
weights = self._email_score_weights()
score = 50.0
states = self.env.context.get('mt_states', False)
states = self.env.context.get("mt_states", False)
if states:
for state in states.keys():
score += weights.get(state, 0.0) * states[state]
@ -176,13 +202,12 @@ class MailTrackingEmail(models.Model):
score = 0.0
return score
@api.depends('recipient')
@api.depends("recipient")
def _compute_recipient_address(self):
for email in self:
is_empty_recipient = (not email.recipient
or '<False>' in email.recipient)
is_empty_recipient = not email.recipient or "<False>" in email.recipient
if not is_empty_recipient:
matches = re.search(r'<(.*@.*)>', email.recipient)
matches = re.search(r"<(.*@.*)>", email.recipient)
if matches:
email.recipient_address = matches.group(1).lower()
else:
@ -190,46 +215,45 @@ class MailTrackingEmail(models.Model):
else:
email.recipient_address = False
@api.depends('name', 'recipient')
@api.depends("name", "recipient")
def _compute_tracking_display_name(self):
for email in self:
parts = [email.name or '']
parts = [email.name or ""]
if email.recipient:
parts.append(email.recipient)
email.display_name = ' - '.join(parts)
email.display_name = " - ".join(parts)
@api.depends('time')
@api.depends("time")
def _compute_date(self):
for email in self:
email.date = fields.Date.to_string(
fields.Date.from_string(email.time))
email.date = fields.Date.to_string(fields.Date.from_string(email.time))
def _get_mail_tracking_img(self):
m_config = self.env['ir.config_parameter']
base_url = (m_config.get_param('mail_tracking.base.url')
or m_config.get_param('web.base.url'))
m_config = self.env["ir.config_parameter"]
base_url = m_config.get_param("mail_tracking.base.url") or m_config.get_param(
"web.base.url"
)
if self.token:
path_url = (
'mail/tracking/open/%(db)s/%(tracking_email_id)s/%(token)s/'
'blank.gif' % {
'db': self.env.cr.dbname,
'tracking_email_id': self.id,
'token': self.token,
})
"mail/tracking/open/%(db)s/%(tracking_email_id)s/%(token)s/"
"blank.gif"
% {
"db": self.env.cr.dbname,
"tracking_email_id": self.id,
"token": self.token,
}
)
else:
# This is here for compatibility with older records
path_url = (
'mail/tracking/open/%(db)s/%(tracking_email_id)s/blank.gif' % {
'db': self.env.cr.dbname,
'tracking_email_id': self.id,
})
path_url = "mail/tracking/open/{db}/{tracking_email_id}/blank.gif".format(
db=self.env.cr.dbname, tracking_email_id=self.id
)
track_url = urllib.parse.urljoin(base_url, path_url)
return (
'<img src="%(url)s" alt="" '
'data-odoo-tracking-email="%(tracking_email_id)s"/>' % {
'url': track_url,
'tracking_email_id': self.id,
})
'data-odoo-tracking-email="%(tracking_email_id)s"/>'
% {"url": track_url, "tracking_email_id": self.id}
)
@api.multi
def _partners_email_bounced_set(self, reason, event=None):
@ -237,32 +261,35 @@ class MailTrackingEmail(models.Model):
if event and event.recipient_address:
recipients.append(event.recipient_address)
else:
recipients = [x for x in self.mapped('recipient_address') if x]
recipients = [x for x in self.mapped("recipient_address") if x]
for recipient in recipients:
self.env['res.partner'].search([
('email', '=ilike', recipient)
]).email_bounced_set(self, reason, event=event)
self.env["res.partner"].search(
[("email", "=ilike", recipient)]
).email_bounced_set(self, reason, event=event)
@api.multi
def smtp_error(self, mail_server, smtp_server, exception):
values = {
'state': 'error',
values = {"state": "error"}
IrMailServer = self.env["ir.mail_server"]
if (
str(exception) == IrMailServer.NO_VALID_RECIPIENT
and not self.recipient_address
):
values.update(
{
"error_type": "no_recipient",
"error_description": "The partner doesn't have a defined email",
}
IrMailServer = self.env['ir.mail_server']
if str(exception) == IrMailServer.NO_VALID_RECIPIENT \
and not self.recipient_address:
values.update({
'error_type': 'no_recipient',
'error_description':
"The partner doesn't have a defined email",
})
)
else:
values.update({
'error_smtp_server': tools.ustr(smtp_server),
'error_type': exception.__class__.__name__,
'error_description': tools.ustr(exception),
})
self.sudo()._partners_email_bounced_set('error')
values.update(
{
"error_smtp_server": tools.ustr(smtp_server),
"error_type": exception.__class__.__name__,
"error_description": tools.ustr(exception),
}
)
self.sudo()._partners_email_bounced_set("error")
self.sudo().write(values)
@api.multi
@ -270,14 +297,14 @@ class MailTrackingEmail(models.Model):
self.ensure_one()
tracking_url = self._get_mail_tracking_img()
if tracking_url:
content = email.get('body', '')
content = email.get("body", "")
content = re.sub(
r'<img[^>]*data-odoo-tracking-email=["\'][0-9]*["\'][^>]*>',
'', content)
r'<img[^>]*data-odoo-tracking-email=["\'][0-9]*["\'][^>]*>', "", content
)
body = tools.append_content_to_html(
content, tracking_url, plaintext=False,
container_tag='div')
email['body'] = body
content, tracking_url, plaintext=False, container_tag="div"
)
email["body"] = body
return email
def _message_partners_check(self, message, message_id):
@ -294,70 +321,67 @@ class MailTrackingEmail(models.Model):
'needaction_partner_ids': [(4, self.partner_id.id)],
})
else:
mail_message.sudo().write({
'partner_ids': [(4, self.partner_id.id)],
})
mail_message.sudo().write({"partner_ids": [(4, self.partner_id.id)]})
return True
@api.multi
def _tracking_sent_prepare(self, mail_server, smtp_server, message,
message_id):
def _tracking_sent_prepare(self, mail_server, smtp_server, message, message_id):
self.ensure_one()
ts = time.time()
dt = datetime.utcfromtimestamp(ts)
self._message_partners_check(message, message_id)
self.sudo().write({'state': 'sent'})
self.sudo().write({"state": "sent"})
return {
'recipient': message['To'],
'timestamp': '%.6f' % ts,
'time': fields.Datetime.to_string(dt),
'tracking_email_id': self.id,
'event_type': 'sent',
'smtp_server': smtp_server,
"recipient": message["To"],
"timestamp": "%.6f" % ts,
"time": fields.Datetime.to_string(dt),
"tracking_email_id": self.id,
"event_type": "sent",
"smtp_server": smtp_server,
}
def _event_prepare(self, event_type, metadata):
self.ensure_one()
m_event = self.env['mail.tracking.event']
method = getattr(m_event, 'process_' + event_type, None)
if method and hasattr(method, '__call__'):
m_event = self.env["mail.tracking.event"]
method = getattr(m_event, "process_" + event_type, None)
if method and callable(method):
return method(self, metadata)
else: # pragma: no cover
_logger.info('Unknown event type: %s' % event_type)
_logger.info("Unknown event type: %s" % event_type)
return False
def _concurrent_events(self, event_type, metadata):
m_event = self.env['mail.tracking.event']
m_event = self.env["mail.tracking.event"]
self.ensure_one()
concurrent_event_ids = False
if event_type in {'open', 'click'}:
ts = metadata.get('timestamp', time.time())
delta = EVENT_OPEN_DELTA if event_type == 'open' \
else EVENT_CLICK_DELTA
if event_type in {"open", "click"}:
ts = metadata.get("timestamp", time.time())
delta = EVENT_OPEN_DELTA if event_type == "open" else EVENT_CLICK_DELTA
domain = [
('timestamp', '>=', ts - delta),
('timestamp', '<=', ts + delta),
('tracking_email_id', '=', self.id),
('event_type', '=', event_type),
("timestamp", ">=", ts - delta),
("timestamp", "<=", ts + delta),
("tracking_email_id", "=", self.id),
("event_type", "=", event_type),
]
if event_type == 'click':
domain.append(('url', '=', metadata.get('url', False)))
if event_type == "click":
domain.append(("url", "=", metadata.get("url", False)))
concurrent_event_ids = m_event.search(domain)
return concurrent_event_ids
@api.multi
def event_create(self, event_type, metadata):
event_ids = self.env['mail.tracking.event']
event_ids = self.env["mail.tracking.event"]
for tracking_email in self:
other_ids = tracking_email._concurrent_events(event_type, metadata)
if not other_ids:
vals = tracking_email._event_prepare(event_type, metadata)
if vals:
events = event_ids.sudo().create(vals)
if event_type in {'hard_bounce', 'spam', 'reject'}:
if event_type in {"hard_bounce", "spam", "reject"}:
for event in events:
self.sudo()._partners_email_bounced_set(
event_type, event=event)
event_type, event=event
)
event_ids += events
else:
_logger.debug("Concurrent event '%s' discarded", event_type)
@ -369,4 +393,4 @@ class MailTrackingEmail(models.Model):
# - return 'OK' if processed
# - return 'NONE' if this request is not for you
# - return 'ERROR' if any error
return 'NONE' # pragma: no cover
return "NONE" # pragma: no cover

View File

@ -11,55 +11,69 @@ import odoo.addons.decimal_precision as dp
class MailTrackingEvent(models.Model):
_name = "mail.tracking.event"
_order = 'timestamp desc'
_rec_name = 'event_type'
_description = 'MailTracking event'
_order = "timestamp desc"
_rec_name = "event_type"
_description = "MailTracking event"
recipient = fields.Char(string="Recipient", readonly=True)
recipient_address = fields.Char(
string='Recipient email address', readonly=True, store=True,
compute='_compute_recipient_address', index=True)
string="Recipient email address",
readonly=True,
store=True,
compute="_compute_recipient_address",
index=True,
)
timestamp = fields.Float(
string='UTC timestamp', readonly=True,
digits=dp.get_precision('MailTracking Timestamp'))
time = fields.Datetime(string="Time", readonly=True)
date = fields.Date(
string="Date", readonly=True, compute="_compute_date", store=True)
string="Date", readonly=True, compute="_compute_date", store=True
)
tracking_email_id = fields.Many2one(
string='Message', readonly=True, required=True, ondelete='cascade',
comodel_name='mail.tracking.email', index=True)
event_type = fields.Selection(string='Event type', selection=[
('sent', 'Sent'),
('delivered', 'Delivered'),
('deferral', 'Deferral'),
('hard_bounce', 'Hard bounce'),
('soft_bounce', 'Soft bounce'),
('open', 'Open'),
('click', 'Clicked'),
('spam', 'Spam'),
('unsub', 'Unsubscribed'),
('reject', 'Rejected'),
], readonly=True)
smtp_server = fields.Char(string='SMTP server', readonly=True)
url = fields.Char(string='Clicked URL', readonly=True)
ip = fields.Char(string='User IP', readonly=True)
user_agent = fields.Char(string='User agent', readonly=True)
mobile = fields.Boolean(string='Is mobile?', readonly=True)
os_family = fields.Char(string='Operating system family', readonly=True)
ua_family = fields.Char(string='User agent family', readonly=True)
ua_type = fields.Char(string='User agent type', readonly=True)
user_country_id = fields.Many2one(string='User country', readonly=True,
comodel_name='res.country')
error_type = fields.Char(string='Error type', readonly=True)
error_description = fields.Char(string='Error description', readonly=True)
error_details = fields.Text(string='Error details', readonly=True)
string="Message",
readonly=True,
required=True,
ondelete="cascade",
comodel_name="mail.tracking.email",
index=True,
)
event_type = fields.Selection(
string="Event type",
selection=[
("sent", "Sent"),
("delivered", "Delivered"),
("deferral", "Deferral"),
("hard_bounce", "Hard bounce"),
("soft_bounce", "Soft bounce"),
("open", "Open"),
("click", "Clicked"),
("spam", "Spam"),
("unsub", "Unsubscribed"),
("reject", "Rejected"),
],
readonly=True,
)
smtp_server = fields.Char(string="SMTP server", readonly=True)
url = fields.Char(string="Clicked URL", readonly=True)
ip = fields.Char(string="User IP", readonly=True)
user_agent = fields.Char(string="User agent", readonly=True)
mobile = fields.Boolean(string="Is mobile?", readonly=True)
os_family = fields.Char(string="Operating system family", readonly=True)
ua_family = fields.Char(string="User agent family", readonly=True)
ua_type = fields.Char(string="User agent type", readonly=True)
user_country_id = fields.Many2one(
string="User country", readonly=True, comodel_name="res.country"
)
error_type = fields.Char(string="Error type", readonly=True)
error_description = fields.Char(string="Error description", readonly=True)
error_details = fields.Text(string="Error details", readonly=True)
@api.multi
@api.depends('recipient')
@api.depends("recipient")
def _compute_recipient_address(self):
for email in self:
if email.recipient:
matches = re.search(r'<(.*@.*)>', email.recipient)
matches = re.search(r"<(.*@.*)>", email.recipient)
if matches:
email.recipient_address = matches.group(1).lower()
else:
@ -68,85 +82,82 @@ class MailTrackingEvent(models.Model):
email.recipient_address = False
@api.multi
@api.depends('time')
@api.depends("time")
def _compute_date(self):
for email in self:
email.date = fields.Date.to_string(
fields.Date.from_string(email.time))
email.date = fields.Date.to_string(fields.Date.from_string(email.time))
def _process_data(self, tracking_email, metadata, event_type, state):
ts = time.time()
dt = datetime.utcfromtimestamp(ts)
return {
'recipient': metadata.get('recipient', tracking_email.recipient),
'timestamp': metadata.get('timestamp', ts),
'time': metadata.get('time', fields.Datetime.to_string(dt)),
'date': metadata.get('date', fields.Date.to_string(dt)),
'tracking_email_id': tracking_email.id,
'event_type': event_type,
'ip': metadata.get('ip', False),
'url': metadata.get('url', False),
'user_agent': metadata.get('user_agent', False),
'mobile': metadata.get('mobile', False),
'os_family': metadata.get('os_family', False),
'ua_family': metadata.get('ua_family', False),
'ua_type': metadata.get('ua_type', False),
'user_country_id': metadata.get('user_country_id', False),
'error_type': metadata.get('error_type', False),
'error_description': metadata.get('error_description', False),
'error_details': metadata.get('error_details', False),
"recipient": metadata.get("recipient", tracking_email.recipient),
"timestamp": metadata.get("timestamp", ts),
"time": metadata.get("time", fields.Datetime.to_string(dt)),
"date": metadata.get("date", fields.Date.to_string(dt)),
"tracking_email_id": tracking_email.id,
"event_type": event_type,
"ip": metadata.get("ip", False),
"url": metadata.get("url", False),
"user_agent": metadata.get("user_agent", False),
"mobile": metadata.get("mobile", False),
"os_family": metadata.get("os_family", False),
"ua_family": metadata.get("ua_family", False),
"ua_type": metadata.get("ua_type", False),
"user_country_id": metadata.get("user_country_id", False),
"error_type": metadata.get("error_type", False),
"error_description": metadata.get("error_description", False),
"error_details": metadata.get("error_details", False),
}
def _process_status(self, tracking_email, metadata, event_type, state):
tracking_email.sudo().write({'state': state})
tracking_email.sudo().write({"state": state})
return self._process_data(tracking_email, metadata, event_type, state)
def _process_bounce(self, tracking_email, metadata, event_type, state):
tracking_email.sudo().write({
'state': state,
'bounce_type': metadata.get('bounce_type', False),
'bounce_description': metadata.get('bounce_description', False),
})
tracking_email.sudo().write(
{
"state": state,
"bounce_type": metadata.get("bounce_type", False),
"bounce_description": metadata.get("bounce_description", False),
}
)
return self._process_data(tracking_email, metadata, event_type, state)
@api.model
def process_delivered(self, tracking_email, metadata):
return self._process_status(
tracking_email, metadata, 'delivered', 'delivered')
return self._process_status(tracking_email, metadata, "delivered", "delivered")
@api.model
def process_deferral(self, tracking_email, metadata):
return self._process_status(
tracking_email, metadata, 'deferral', 'deferred')
return self._process_status(tracking_email, metadata, "deferral", "deferred")
@api.model
def process_hard_bounce(self, tracking_email, metadata):
return self._process_bounce(
tracking_email, metadata, 'hard_bounce', 'bounced')
return self._process_bounce(tracking_email, metadata, "hard_bounce", "bounced")
@api.model
def process_soft_bounce(self, tracking_email, metadata):
return self._process_bounce(
tracking_email, metadata, 'soft_bounce', 'soft-bounced')
tracking_email, metadata, "soft_bounce", "soft-bounced"
)
@api.model
def process_open(self, tracking_email, metadata):
return self._process_status(tracking_email, metadata, 'open', 'opened')
return self._process_status(tracking_email, metadata, "open", "opened")
@api.model
def process_click(self, tracking_email, metadata):
return self._process_status(
tracking_email, metadata, 'click', 'opened')
return self._process_status(tracking_email, metadata, "click", "opened")
@api.model
def process_spam(self, tracking_email, metadata):
return self._process_status(tracking_email, metadata, 'spam', 'spam')
return self._process_status(tracking_email, metadata, "spam", "spam")
@api.model
def process_unsub(self, tracking_email, metadata):
return self._process_status(tracking_email, metadata, 'unsub', 'unsub')
return self._process_status(tracking_email, metadata, "unsub", "unsub")
@api.model
def process_reject(self, tracking_email, metadata):
return self._process_status(
tracking_email, metadata, 'reject', 'rejected')
return self._process_status(tracking_email, metadata, "reject", "rejected")

View File

@ -1,20 +1,21 @@
# Copyright 2016 Antonio Espinosa - <antonio.espinosa@tecnativa.com>
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
from odoo import models, api, fields
from odoo import api, fields, models
class ResPartner(models.Model):
_name = 'res.partner'
_inherit = ['res.partner', 'mail.bounced.mixin']
_name = "res.partner"
_inherit = ["res.partner", "mail.bounced.mixin"]
# tracking_emails_count and email_score are non-store fields in order
# to improve performance
tracking_emails_count = fields.Integer(
compute='_compute_tracking_emails_count', readonly=True)
email_score = fields.Float(compute='_compute_email_score', readonly=True)
compute="_compute_tracking_emails_count", readonly=True
)
email_score = fields.Float(compute="_compute_email_score", readonly=True)
@api.depends('email')
@api.depends("email")
def _compute_email_score(self):
for partner in self.filtered('email'):
partner.email_score = self.env['mail.tracking.email'].\
@ -26,7 +27,7 @@ class ResPartner(models.Model):
for partner in self:
count = 0
if partner.email:
count = self.env['mail.tracking.email'].search_count([
('recipient_address', '=', partner.email.lower())
])
count = self.env["mail.tracking.email"].search_count(
[("recipient_address", "=", partner.email.lower())]
)
partner.tracking_emails_count = count

View File

@ -1,77 +1,78 @@
# Copyright 2016 Antonio Espinosa - <antonio.espinosa@tecnativa.com>
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
import mock
from odoo.tools import mute_logger
import time
import base64
import time
import mock
import psycopg2
import psycopg2.errorcodes
from odoo import http
from odoo.tests.common import TransactionCase
from ..controllers.main import MailTrackingController, BLANK
from lxml import etree
mock_send_email = ('odoo.addons.base.models.ir_mail_server.'
'IrMailServer.send_email')
from odoo import _, http
from odoo.tests.common import TransactionCase
from odoo.tools import mute_logger
from ..controllers.main import BLANK, MailTrackingController
mock_send_email = "odoo.addons.base.models.ir_mail_server." "IrMailServer.send_email"
class FakeUserAgent(object):
browser = 'Test browser'
platform = 'Test platform'
browser = "Test browser"
platform = "Test platform"
def __str__(self):
"""Return name"""
return 'Test suite'
return "Test suite"
class TestMailTracking(TransactionCase):
def setUp(self, *args, **kwargs):
super(TestMailTracking, self).setUp(*args, **kwargs)
self.sender = self.env['res.partner'].create({
'name': 'Test sender',
'email': 'sender@example.com',
})
self.recipient = self.env['res.partner'].create({
'name': 'Test recipient',
'email': 'recipient@example.com',
})
self.sender = self.env["res.partner"].create(
{"name": "Test sender", "email": "sender@example.com"}
)
self.recipient = self.env["res.partner"].create(
{"name": "Test recipient", "email": "recipient@example.com"}
)
self.last_request = http.request
http.request = type('obj', (object,), {
'env': self.env,
'cr': self.env.cr,
'db': self.env.cr.dbname,
'endpoint': type('obj', (object,), {
'routing': [],
}),
'httprequest': type('obj', (object,), {
'remote_addr': '123.123.123.123',
'user_agent': FakeUserAgent(),
}),
})
http.request = type(
"obj",
(object,),
{
"env": self.env,
"cr": self.env.cr,
"db": self.env.cr.dbname,
"endpoint": type("obj", (object,), {"routing": []}),
"httprequest": type(
"obj",
(object,),
{"remote_addr": "123.123.123.123", "user_agent": FakeUserAgent()},
),
},
)
def tearDown(self, *args, **kwargs):
http.request = self.last_request
return super(TestMailTracking, self).tearDown(*args, **kwargs)
def test_empty_email(self):
self.recipient.write({'email_bounced': True})
self.recipient.write({'email': False})
self.recipient.write({"email_bounced": True})
self.recipient.write({"email": False})
self.assertEqual(False, self.recipient.email)
self.assertEqual(False, self.recipient.email_bounced)
self.recipient.write({'email_bounced': True})
self.recipient.write({'email': ''})
self.recipient.write({"email_bounced": True})
self.recipient.write({"email": ""})
self.assertEqual(False, self.recipient.email_bounced)
self.assertEqual(False, self.env["mail.tracking.email"].email_is_bounced(False))
self.assertEqual(
False,
self.env['mail.tracking.email'].email_is_bounced(False))
self.assertEqual(
0.,
self.env['mail.tracking.email'].email_score_from_email(False))
0.0, self.env["mail.tracking.email"].email_score_from_email(False)
)
def test_recipient_address_compute(self):
mail, tracking = self.mail_send(self.recipient.email)
tracking.write({'recipient': False})
tracking.write({"recipient": False})
self.assertEqual(False, tracking.recipient_address)
def test_message_post(self):
@ -88,154 +89,164 @@ class TestMailTracking(TransactionCase):
})
message._notify(message, {}, force_send=True)
# Search tracking created
tracking_email = self.env['mail.tracking.email'].search([
('mail_message_id', '=', message.id),
('partner_id', '=', self.recipient.id),
])
tracking_email = self.env["mail.tracking.email"].search(
[
("mail_message_id", "=", message.id),
("partner_id", "=", self.recipient.id),
]
)
# The tracking email must be sent
self.assertTrue(tracking_email)
self.assertEqual(tracking_email.state, 'sent')
self.assertEqual(tracking_email.state, "sent")
# message_dict read by web interface
message_dict = message.message_format()[0]
self.assertTrue(len(message_dict['partner_ids']) > 0)
self.assertTrue(len(message_dict["partner_ids"]) > 0)
# First partner is recipient
partner_id = message_dict['partner_ids'][0][0]
partner_id = message_dict["partner_ids"][0][0]
self.assertEqual(partner_id, self.recipient.id)
status = message_dict['partner_trackings'][0]
status = message_dict["partner_trackings"][0]
# Tracking status must be sent and
# mail tracking must be the one search before
self.assertEqual(status['status'], 'sent')
self.assertEqual(status['tracking_id'], tracking_email.id)
self.assertEqual(status['recipient'], self.recipient.display_name)
self.assertEqual(status['partner_id'], self.recipient.id)
self.assertEqual(status['isCc'], False)
self.assertEqual(status["status"], "sent")
self.assertEqual(status["tracking_id"], tracking_email.id)
self.assertEqual(status["recipient"], self.recipient.display_name)
self.assertEqual(status["partner_id"], self.recipient.id)
self.assertEqual(status["isCc"], False)
# And now open the email
metadata = {
'ip': '127.0.0.1',
'user_agent': 'Odoo Test/1.0',
'os_family': 'linux',
'ua_family': 'odoo',
"ip": "127.0.0.1",
"user_agent": "Odoo Test/1.0",
"os_family": "linux",
"ua_family": "odoo",
}
tracking_email.event_create('open', metadata)
self.assertEqual(tracking_email.state, 'opened')
tracking_email.event_create("open", metadata)
self.assertEqual(tracking_email.state, "opened")
def test_message_post_partner_no_email(self):
# Create message with recipient without defined email
self.recipient.write({'email': False})
message = self.env['mail.message'].create({
'subject': 'Message test',
'author_id': self.sender.id,
'email_from': self.sender.email,
'message_type': 'comment',
'model': 'res.partner',
'res_id': self.recipient.id,
'partner_ids': [(4, self.recipient.id)],
'body': '<p>This is a test message</p>',
})
self.recipient.write({"email": False})
message = self.env["mail.message"].create(
{
"subject": "Message test",
"author_id": self.sender.id,
"email_from": self.sender.email,
"message_type": "comment",
"model": "res.partner",
"res_id": self.recipient.id,
"partner_ids": [(4, self.recipient.id)],
"body": "<p>This is a test message</p>",
}
)
message._notify(message, {}, force_send=True)
# Search tracking created
tracking_email = self.env['mail.tracking.email'].search([
('mail_message_id', '=', message.id),
('partner_id', '=', self.recipient.id),
])
tracking_email = self.env["mail.tracking.email"].search(
[
("mail_message_id", "=", message.id),
("partner_id", "=", self.recipient.id),
]
)
# No email should generate a error state: no_recipient
self.assertEqual(tracking_email.state, 'error')
self.assertEqual(tracking_email.error_type, 'no_recipient')
self.assertEqual(tracking_email.state, "error")
self.assertEqual(tracking_email.error_type, "no_recipient")
self.assertFalse(self.recipient.email_bounced)
def _check_partner_trackings(self, message):
message_dict = message.message_format()[0]
self.assertEqual(len(message_dict['partner_trackings']), 3)
self.assertEqual(len(message_dict["partner_trackings"]), 3)
# mail cc
foundPartner = False
foundNoPartner = False
for tracking in message_dict['partner_trackings']:
if tracking['partner_id'] == self.sender.id:
for tracking in message_dict["partner_trackings"]:
if tracking["partner_id"] == self.sender.id:
foundPartner = True
self.assertTrue(tracking['isCc'])
elif tracking['recipient'] == 'unnamed@test.com':
self.assertTrue(tracking["isCc"])
elif tracking["recipient"] == "unnamed@test.com":
foundNoPartner = True
self.assertFalse(tracking['partner_id'])
self.assertTrue(tracking['isCc'])
elif tracking['partner_id'] == self.recipient.id:
self.assertFalse(tracking['isCc'])
self.assertFalse(tracking["partner_id"])
self.assertTrue(tracking["isCc"])
elif tracking["partner_id"] == self.recipient.id:
self.assertFalse(tracking["isCc"])
self.assertTrue(foundPartner)
self.assertTrue(foundNoPartner)
def test_email_cc(self):
sender_user = self.env['res.users'].create({
'name': 'Sender User Test',
'partner_id': self.sender.id,
'login': 'sender-test',
})
message = self.recipient.sudo(user=sender_user).message_post(
body='<p>This is a test message</p>',
cc='unnamed@test.com, sender@example.com'
sender_user = self.env["res.users"].create(
{
"name": "Sender User Test",
"partner_id": self.sender.id,
"login": "sender-test",
}
)
message = self.recipient.with_user(sender_user).message_post(
body=_("<p>This is a test message</p>"),
cc="unnamed@test.com, sender@example.com",
)
# suggested recipients
recipients = self.recipient.message_get_suggested_recipients()
suggested_mails = {
email[1] for email in recipients[self.recipient.id]
}
self.assertTrue('unnamed@test.com' in suggested_mails)
recipients = self.recipient._message_get_suggested_recipients()
suggested_mails = {email[1] for email in recipients[self.recipient.id]}
self.assertTrue("unnamed@test.com" in suggested_mails)
self.assertEqual(len(recipients[self.recipient.id][0]), 3)
# Repeated Cc recipients
message = self.env['mail.message'].create({
'subject': 'Message test',
'author_id': self.sender.id,
'email_from': self.sender.email,
'message_type': 'comment',
'model': 'res.partner',
'res_id': self.recipient.id,
'partner_ids': [(4, self.recipient.id)],
'email_cc': 'unnamed@test.com, sender@example.com'
', recipient@example.com',
'body': '<p>This is another test message</p>',
})
message = self.env["mail.message"].create(
{
"subject": "Message test",
"author_id": self.sender.id,
"email_from": self.sender.email,
"message_type": "comment",
"model": "res.partner",
"res_id": self.recipient.id,
"partner_ids": [(4, self.recipient.id)],
"email_cc": "unnamed@test.com, sender@example.com"
", recipient@example.com",
"body": "<p>This is another test message</p>",
}
)
message._notify(message, {}, force_send=True)
recipients = self.recipient.message_get_suggested_recipients()
recipients = self.recipient._message_get_suggested_recipients()
self.assertEqual(len(recipients[self.recipient.id][0]), 3)
self._check_partner_trackings(message)
def test_failed_message(self):
MailMessageObj = self.env['mail.message']
MailMessageObj = self.env["mail.message"]
# Create message
mail, tracking = self.mail_send(self.recipient.email)
self.assertFalse(tracking.mail_message_id.mail_tracking_needs_action)
# Force error state
tracking.state = 'error'
tracking.state = "error"
self.assertTrue(tracking.mail_message_id.mail_tracking_needs_action)
failed_count = MailMessageObj.get_failed_count()
self.assertTrue(failed_count > 0)
values = tracking.mail_message_id.get_failed_messages()
self.assertEqual(values[0]['id'], tracking.mail_message_id.id)
self.assertEqual(values[0]["id"], tracking.mail_message_id.id)
messages = MailMessageObj.search([])
messages_failed = MailMessageObj.search(
MailMessageObj._get_failed_message_domain())
MailMessageObj._get_failed_message_domain()
)
self.assertTrue(messages)
self.assertTrue(messages_failed)
self.assertTrue(len(messages) > len(messages_failed))
tracking.mail_message_id.set_need_action_done()
self.assertFalse(tracking.mail_message_id.mail_tracking_needs_action)
self.assertTrue(
MailMessageObj.get_failed_count() < failed_count)
self.assertTrue(MailMessageObj.get_failed_count() < failed_count)
# No author_id
tracking.mail_message_id.author_id = False
values = tracking.mail_message_id.get_failed_messages()[0]
self.assertEqual(values['author'][0], -1)
self.assertEqual(values["author"][0], -1)
def mail_send(self, recipient):
mail = self.env['mail.mail'].create({
'subject': 'Test subject',
'email_from': 'from@domain.com',
'email_to': recipient,
'body_html': '<p>This is a test message</p>',
})
mail = self.env["mail.mail"].create(
{
"subject": "Test subject",
"email_from": "from@domain.com",
"email_to": recipient,
"body_html": "<p>This is a test message</p>",
}
)
mail.send()
# Search tracking created
tracking_email = self.env['mail.tracking.email'].search([
('mail_id', '=', mail.id),
])
tracking_email = self.env["mail.tracking.email"].search(
[("mail_id", "=", mail.id)]
)
return mail, tracking_email
def test_mail_send(self):
@ -245,10 +256,9 @@ class TestMailTracking(TransactionCase):
mail, tracking = self.mail_send(self.recipient.email)
self.assertEqual(mail.email_to, tracking.recipient)
self.assertEqual(mail.email_from, tracking.sender)
with mock.patch('odoo.http.db_filter') as mock_client:
with mock.patch("odoo.http.db_filter") as mock_client:
mock_client.return_value = True
res = controller.mail_tracking_open(
db, tracking.id, tracking.token)
res = controller.mail_tracking_open(db, tracking.id, tracking.token)
self.assertEqual(image, res.response[0])
# Two events: sent and open
self.assertEqual(2, len(tracking.tracking_event_ids))
@ -261,30 +271,30 @@ class TestMailTracking(TransactionCase):
def test_mail_tracking_open(self):
controller = MailTrackingController()
db = self.env.cr.dbname
with mock.patch('odoo.http.db_filter') as mock_client:
with mock.patch("odoo.http.db_filter") as mock_client:
mock_client.return_value = True
mail, tracking = self.mail_send(self.recipient.email)
# Tracking is in sent or delivered state. But no token give.
# Don't generates tracking event
controller.mail_tracking_open(db, tracking.id)
self.assertEqual(1, len(tracking.tracking_event_ids))
tracking.write({'state': 'opened'})
tracking.write({"state": "opened"})
# Tracking isn't in sent or delivered state.
# Don't generates tracking event
controller.mail_tracking_open(db, tracking.id, tracking.token)
self.assertEqual(1, len(tracking.tracking_event_ids))
tracking.write({'state': 'sent'})
tracking.write({"state": "sent"})
# Tracking is in sent or delivered state and a token is given.
# Generates tracking event
controller.mail_tracking_open(db, tracking.id, tracking.token)
self.assertEqual(2, len(tracking.tracking_event_ids))
# Generate new email due concurrent event filter
mail, tracking = self.mail_send(self.recipient.email)
tracking.write({'token': False})
tracking.write({"token": False})
# Tracking is in sent or delivered state but a token is given for a
# record that doesn't have a token.
# Don't generates tracking event
controller.mail_tracking_open(db, tracking.id, 'tokentest')
controller.mail_tracking_open(db, tracking.id, "tokentest")
self.assertEqual(1, len(tracking.tracking_event_ids))
# Tracking is in sent or delivered state and not token is given for
# a record that doesn't have a token.
@ -296,90 +306,76 @@ class TestMailTracking(TransactionCase):
mail, tracking = self.mail_send(self.recipient.email)
ts = time.time()
metadata = {
'ip': '127.0.0.1',
'user_agent': 'Odoo Test/1.0',
'os_family': 'linux',
'ua_family': 'odoo',
'timestamp': ts,
"ip": "127.0.0.1",
"user_agent": "Odoo Test/1.0",
"os_family": "linux",
"ua_family": "odoo",
"timestamp": ts,
}
# First open event
tracking.event_create('open', metadata)
opens = tracking.tracking_event_ids.filtered(
lambda r: r.event_type == 'open'
)
tracking.event_create("open", metadata)
opens = tracking.tracking_event_ids.filtered(lambda r: r.event_type == "open")
self.assertEqual(len(opens), 1)
# Concurrent open event
metadata['timestamp'] = ts + 2
tracking.event_create('open', metadata)
opens = tracking.tracking_event_ids.filtered(
lambda r: r.event_type == 'open'
)
metadata["timestamp"] = ts + 2
tracking.event_create("open", metadata)
opens = tracking.tracking_event_ids.filtered(lambda r: r.event_type == "open")
self.assertEqual(len(opens), 1)
# Second open event
metadata['timestamp'] = ts + 350
tracking.event_create('open', metadata)
opens = tracking.tracking_event_ids.filtered(
lambda r: r.event_type == 'open'
)
metadata["timestamp"] = ts + 350
tracking.event_create("open", metadata)
opens = tracking.tracking_event_ids.filtered(lambda r: r.event_type == "open")
self.assertEqual(len(opens), 2)
def test_concurrent_click(self):
mail, tracking = self.mail_send(self.recipient.email)
ts = time.time()
metadata = {
'ip': '127.0.0.1',
'user_agent': 'Odoo Test/1.0',
'os_family': 'linux',
'ua_family': 'odoo',
'timestamp': ts,
'url': 'https://www.example.com/route/1',
"ip": "127.0.0.1",
"user_agent": "Odoo Test/1.0",
"os_family": "linux",
"ua_family": "odoo",
"timestamp": ts,
"url": "https://www.example.com/route/1",
}
# First click event (URL 1)
tracking.event_create('click', metadata)
opens = tracking.tracking_event_ids.filtered(
lambda r: r.event_type == 'click'
)
tracking.event_create("click", metadata)
opens = tracking.tracking_event_ids.filtered(lambda r: r.event_type == "click")
self.assertEqual(len(opens), 1)
# Concurrent click event (URL 1)
metadata['timestamp'] = ts + 2
tracking.event_create('click', metadata)
opens = tracking.tracking_event_ids.filtered(
lambda r: r.event_type == 'click'
)
metadata["timestamp"] = ts + 2
tracking.event_create("click", metadata)
opens = tracking.tracking_event_ids.filtered(lambda r: r.event_type == "click")
self.assertEqual(len(opens), 1)
# Second click event (URL 1)
metadata['timestamp'] = ts + 350
tracking.event_create('click', metadata)
opens = tracking.tracking_event_ids.filtered(
lambda r: r.event_type == 'click'
)
metadata["timestamp"] = ts + 350
tracking.event_create("click", metadata)
opens = tracking.tracking_event_ids.filtered(lambda r: r.event_type == "click")
self.assertEqual(len(opens), 2)
# Concurrent click event (URL 2)
metadata['timestamp'] = ts + 2
metadata['url'] = 'https://www.example.com/route/2'
tracking.event_create('click', metadata)
opens = tracking.tracking_event_ids.filtered(
lambda r: r.event_type == 'click'
)
metadata["timestamp"] = ts + 2
metadata["url"] = "https://www.example.com/route/2"
tracking.event_create("click", metadata)
opens = tracking.tracking_event_ids.filtered(lambda r: r.event_type == "click")
self.assertEqual(len(opens), 3)
@mute_logger('odoo.addons.mail.models.mail_mail')
@mute_logger("odoo.addons.mail.models.mail_mail")
def test_smtp_error(self):
with mock.patch(mock_send_email) as mock_func:
mock_func.side_effect = Warning('Test error')
mock_func.side_effect = Warning("Test error")
mail, tracking = self.mail_send(self.recipient.email)
self.assertEqual('error', tracking.state)
self.assertEqual('Warning', tracking.error_type)
self.assertEqual('Test error', tracking.error_description)
self.assertEqual("error", tracking.state)
self.assertEqual("Warning", tracking.error_type)
self.assertEqual("Test error", tracking.error_description)
self.assertTrue(self.recipient.email_bounced)
def test_partner_email_change(self):
mail, tracking = self.mail_send(self.recipient.email)
tracking.event_create('open', {})
tracking.event_create("open", {})
orig_score = self.recipient.email_score
orig_count = self.recipient.tracking_emails_count
orig_email = self.recipient.email
self.recipient.email = orig_email + '2'
self.recipient.email = orig_email + "2"
self.assertEqual(50.0, self.recipient.email_score)
self.assertEqual(0, self.recipient.tracking_emails_count)
self.recipient.email = orig_email
@ -388,108 +384,106 @@ class TestMailTracking(TransactionCase):
def test_process_hard_bounce(self):
mail, tracking = self.mail_send(self.recipient.email)
tracking.event_create('hard_bounce', {})
self.assertEqual('bounced', tracking.state)
tracking.event_create("hard_bounce", {})
self.assertEqual("bounced", tracking.state)
self.assertTrue(self.recipient.email_score < 50.0)
def test_process_soft_bounce(self):
mail, tracking = self.mail_send(self.recipient.email)
tracking.event_create('soft_bounce', {})
self.assertEqual('soft-bounced', tracking.state)
tracking.event_create("soft_bounce", {})
self.assertEqual("soft-bounced", tracking.state)
self.assertTrue(self.recipient.email_score < 50.0)
def test_process_delivered(self):
mail, tracking = self.mail_send(self.recipient.email)
tracking.event_create('delivered', {})
self.assertEqual('delivered', tracking.state)
tracking.event_create("delivered", {})
self.assertEqual("delivered", tracking.state)
self.assertTrue(self.recipient.email_score > 50.0)
def test_process_deferral(self):
mail, tracking = self.mail_send(self.recipient.email)
tracking.event_create('deferral', {})
self.assertEqual('deferred', tracking.state)
tracking.event_create("deferral", {})
self.assertEqual("deferred", tracking.state)
def test_process_spam(self):
mail, tracking = self.mail_send(self.recipient.email)
tracking.event_create('spam', {})
self.assertEqual('spam', tracking.state)
tracking.event_create("spam", {})
self.assertEqual("spam", tracking.state)
self.assertTrue(self.recipient.email_score < 50.0)
def test_process_unsub(self):
mail, tracking = self.mail_send(self.recipient.email)
tracking.event_create('unsub', {})
self.assertEqual('unsub', tracking.state)
tracking.event_create("unsub", {})
self.assertEqual("unsub", tracking.state)
self.assertTrue(self.recipient.email_score < 50.0)
def test_process_reject(self):
mail, tracking = self.mail_send(self.recipient.email)
tracking.event_create('reject', {})
self.assertEqual('rejected', tracking.state)
tracking.event_create("reject", {})
self.assertEqual("rejected", tracking.state)
self.assertTrue(self.recipient.email_score < 50.0)
def test_process_open(self):
mail, tracking = self.mail_send(self.recipient.email)
tracking.event_create('open', {})
self.assertEqual('opened', tracking.state)
tracking.event_create("open", {})
self.assertEqual("opened", tracking.state)
self.assertTrue(self.recipient.email_score > 50.0)
def test_process_click(self):
mail, tracking = self.mail_send(self.recipient.email)
tracking.event_create('click', {})
self.assertEqual('opened', tracking.state)
tracking.event_create("click", {})
self.assertEqual("opened", tracking.state)
self.assertTrue(self.recipient.email_score > 50.0)
def test_process_several_bounce(self):
for i in range(1, 10):
for _i in range(1, 10):
mail, tracking = self.mail_send(self.recipient.email)
tracking.event_create('hard_bounce', {})
self.assertEqual('bounced', tracking.state)
tracking.event_create("hard_bounce", {})
self.assertEqual("bounced", tracking.state)
self.assertEqual(0.0, self.recipient.email_score)
def test_bounce_new_partner(self):
mail, tracking = self.mail_send(self.recipient.email)
tracking.event_create('hard_bounce', {})
new_partner = self.env['res.partner'].create({
'name': 'Test New Partner',
})
tracking.event_create("hard_bounce", {})
new_partner = self.env["res.partner"].create({"name": "Test New Partner"})
new_partner.email = self.recipient.email
self.assertTrue(new_partner.email_bounced)
def test_recordset_email_score(self):
"""For backwords compatibility sake"""
trackings = self.env['mail.tracking.email']
for i in range(11):
trackings = self.env["mail.tracking.email"]
for _i in range(11):
mail, tracking = self.mail_send(self.recipient.email)
tracking.event_create('click', {})
tracking.event_create("click", {})
trackings |= tracking
self.assertEqual(100.0, trackings.email_score())
def test_db(self):
db = self.env.cr.dbname
controller = MailTrackingController()
with mock.patch('odoo.http.db_filter') as mock_client:
with mock.patch("odoo.http.db_filter") as mock_client:
mock_client.return_value = True
with self.assertRaises(psycopg2.OperationalError):
controller.mail_tracking_event('not_found_db')
controller.mail_tracking_event("not_found_db")
none = controller.mail_tracking_event(db)
self.assertEqual(b'NONE', none.response[0])
none = controller.mail_tracking_event(db, 'open')
self.assertEqual(b'NONE', none.response[0])
self.assertEqual(b"NONE", none.response[0])
none = controller.mail_tracking_event(db, "open")
self.assertEqual(b"NONE", none.response[0])
class TestMailTrackingViews(TransactionCase):
def test_fields_view_get(self):
result = self.env['res.partner'].fields_view_get(
view_id=self.env.ref('base.view_partner_form').id,
view_type='form')
doc = etree.XML(result['arch'])
result = self.env["res.partner"].fields_view_get(
view_id=self.env.ref("base.view_partner_form").id, view_type="form"
)
doc = etree.XML(result["arch"])
nodes = doc.xpath(
"//field[@name='failed_message_ids'"
" and @widget='mail_failed_message']")
"//field[@name='failed_message_ids'" " and @widget='mail_failed_message']"
)
self.assertTrue(nodes)
result = self.env['res.partner'].fields_view_get(
view_id=self.env.ref('base.view_res_partner_filter').id,
view_type='search')
doc = etree.XML(result['arch'])
result = self.env["res.partner"].fields_view_get(
view_id=self.env.ref("base.view_res_partner_filter").id, view_type="search"
)
doc = etree.XML(result["arch"])
nodes = doc.xpath("//filter[@name='failed_message_ids']")
self.assertTrue(nodes)