# -*- coding: utf-8 -*- # Part of Odoo, Flectra. See LICENSE file for full copyright and licensing details. import logging import traceback import os import unittest import werkzeug import werkzeug.routing import werkzeug.utils import flectra from flectra import api, models from flectra import SUPERUSER_ID from flectra.http import request from flectra.tools import config from flectra.exceptions import QWebException from flectra.tools.safe_eval import safe_eval from flectra.osv.expression import FALSE_DOMAIN from flectra.addons.http_routing.models.ir_http import ModelConverter, _guess_mimetype logger = logging.getLogger(__name__) def sitemap_qs2dom(qs, route, field='name'): """ Convert a query_string (can contains a path) to a domain""" dom = [] if qs and qs.lower() not in route: needles = qs.strip('/').split('/') # needles will be altered and keep only element which one is not in route # diff(from=['shop', 'product'], to=['shop', 'product', 'product']) => to=['product'] unittest.util.unorderable_list_difference(route.strip('/').split('/'), needles) if len(needles) == 1: dom = [(field, 'ilike', needles[0])] else: dom = FALSE_DOMAIN return dom class Http(models.AbstractModel): _inherit = 'ir.http' @classmethod def _get_converters(cls): """ Get the converters list for custom url pattern werkzeug need to match Rule. This override adds the website ones. """ return dict( super(Http, cls)._get_converters(), model=ModelConverter, ) @classmethod def _auth_method_public(cls): """ If no user logged, set the public user of current website, or default public user as request uid. After this method `request.env` can be called, since the `request.uid` is set. The `env` lazy property of `request` will be correct. """ if not request.session.uid: env = api.Environment(request.cr, SUPERUSER_ID, request.context) website = env['website'].get_current_website() if website and website.user_id: request.uid = website.user_id.id if not request.uid: super(Http, cls)._auth_method_public() @classmethod def _add_dispatch_parameters(cls, func): context = dict(request.context) if not context.get('tz'): context['tz'] = request.session.get('geoip', {}).get('time_zone') request.website = request.env['website'].get_current_website() # can use `request.env` since auth methods are called context['website_id'] = request.website.id # bind modified context request.context = context super(Http, cls)._add_dispatch_parameters(func) if request.routing_iteration == 1: request.website = request.website.with_context(request.context) @classmethod def _get_languages(cls): if getattr(request, 'website', False): return request.website.language_ids return super(Http, cls)._get_languages() @classmethod def _get_language_codes(cls): if getattr(request, 'website', False): return request.website._get_languages() return super(Http, cls)._get_language_codes() @classmethod def _get_default_lang(cls): if getattr(request, 'website', False): return request.website.default_lang_id return super(Http, cls)._get_default_lang() @classmethod def _serve_page(cls): req_page = request.httprequest.path domain = [('url', '=', req_page), '|', ('website_ids', 'in', request.website.id), ('website_ids', '=', False)] pages = request.env['website.page'].search(domain) if not request.website.is_publisher(): pages = pages.filtered('is_visible') mypage = pages[0] if pages else False _, ext = os.path.splitext(req_page) if mypage: return request.render(mypage.view_id.id, { # 'path': req_page[1:], 'deletable': True, 'main_object': mypage, }, mimetype=_guess_mimetype(ext)) return False @classmethod def _serve_404(cls): req_page = request.httprequest.path page404 = 'website.%s' % (request.website.is_publisher() and 'page_404' or '404') return request.render(page404, {'path': req_page[1:]}) @classmethod def _serve_redirect(cls): req_page = request.httprequest.path domain = [ '|', ('website_id', '=', request.website.id), ('website_id', '=', False), ('url_from', '=', req_page) ] return request.env['website.redirect'].search(domain, limit=1) @classmethod def _serve_fallback(cls, exception): # serve attachment before parent = super(Http, cls)._serve_fallback(exception) if parent: # attachment return parent website_page = cls._serve_page() if website_page: return website_page redirect = cls._serve_redirect() if redirect: return request.redirect(redirect.url_to, code=redirect.type) return cls._serve_404() @classmethod def _handle_exception(cls, exception): code = 500 # default code is_website_request = bool(getattr(request, 'is_frontend', False) and getattr(request, 'website', False)) if not is_website_request: # Don't touch non website requests exception handling return super(Http, cls)._handle_exception(exception) else: try: response = super(Http, cls)._handle_exception(exception) if isinstance(response, Exception): exception = response else: # if parent excplicitely returns a plain response, then we don't touch it return response except Exception as e: if 'werkzeug' in config['dev_mode'] and (not isinstance(exception, QWebException) or not exception.qweb.get('cause')): raise exception = e values = dict( exception=exception, traceback=traceback.format_exc(), ) if isinstance(exception, werkzeug.exceptions.HTTPException): if exception.code is None: # Hand-crafted HTTPException likely coming from abort(), # usually for a redirect response -> return it directly return exception else: code = exception.code if isinstance(exception, flectra.exceptions.AccessError): code = 403 if isinstance(exception, QWebException): values.update(qweb_exception=exception) if isinstance(exception.qweb.get('cause'), flectra.exceptions.AccessError): code = 403 if code == 500: logger.error("500 Internal Server Error:\n\n%s", values['traceback']) if 'qweb_exception' in values: view = request.env["ir.ui.view"] views = view._views_get(exception.qweb['template']) to_reset = views.filtered(lambda view: view.model_data_id.noupdate is True and view.arch_fs) values['views'] = to_reset elif code == 403: logger.warn("403 Forbidden:\n\n%s", values['traceback']) values.update( status_message=werkzeug.http.HTTP_STATUS_CODES[code], status_code=code, ) if not request.uid: cls._auth_method_public() try: html = request.env['ir.ui.view'].render_template('website.%s' % code, values) except Exception: html = request.env['ir.ui.view'].render_template('website.http_error', values) return werkzeug.wrappers.Response(html, status=code, content_type='text/html;charset=utf-8') @classmethod def binary_content(cls, xmlid=None, model='ir.attachment', id=None, field='datas', unique=False, filename=None, filename_field='datas_fname', download=False, mimetype=None, default_mimetype='application/octet-stream', access_token=None, env=None): env = env or request.env obj = None if xmlid: obj = env.ref(xmlid, False) elif id and model in env: obj = env[model].browse(int(id)) if obj and 'website_published' in obj._fields: if env[obj._name].sudo().search([('id', '=', obj.id), ('website_published', '=', True)]): env = env(user=SUPERUSER_ID) return super(Http, cls).binary_content( xmlid=xmlid, model=model, id=id, field=field, unique=unique, filename=filename, filename_field=filename_field, download=download, mimetype=mimetype, default_mimetype=default_mimetype, access_token=access_token, env=env) class ModelConverter(ModelConverter): def generate(self, uid, dom=None, args=None): Model = request.env[self.model].sudo(uid) domain = safe_eval(self.domain, (args or {}).copy()) if dom: domain += dom for record in Model.search_read(domain=domain, fields=['write_date', Model._rec_name]): if record.get(Model._rec_name, False): yield {'loc': (record['id'], record[Model._rec_name])}