2018-01-16 06:58:15 +01:00
# -*- coding: utf-8 -*-
2018-01-16 11:34:37 +01:00
# Part of Odoo, Flectra. See LICENSE file for full copyright and licensing details.
2018-01-16 06:58:15 +01:00
import functools
import logging
import json
import werkzeug . urls
import werkzeug . utils
from werkzeug . exceptions import BadRequest
2018-01-16 11:34:37 +01:00
from flectra import api , http , SUPERUSER_ID , _
from flectra . exceptions import AccessDenied
from flectra . http import request
from flectra import registry as registry_get
2018-01-16 06:58:15 +01:00
2018-01-16 11:34:37 +01:00
from flectra . addons . auth_signup . controllers . main import AuthSignupHome as Home
from flectra . addons . web . controllers . main import db_monodb , ensure_db , set_cookie_and_redirect , login_and_redirect
2018-01-16 06:58:15 +01:00
_logger = logging . getLogger ( __name__ )
#----------------------------------------------------------
# helpers
#----------------------------------------------------------
def fragment_to_query_string ( func ) :
@functools.wraps ( func )
def wrapper ( self , * a , * * kw ) :
kw . pop ( ' debug ' , False )
if not kw :
return """ <html><head><script>
var l = window . location ;
var q = l . hash . substring ( 1 ) ;
var r = l . pathname + l . search ;
if ( q . length != = 0 ) {
var s = l . search ? ( l . search == = ' ? ' ? ' ' : ' & ' ) : ' ? ' ;
r = l . pathname + l . search + s + q ;
}
if ( r == l . pathname ) {
r = ' / ' ;
}
window . location = r ;
< / script > < / head > < body > < / body > < / html > """
return func ( self , * a , * * kw )
return wrapper
#----------------------------------------------------------
# Controller
#----------------------------------------------------------
class OAuthLogin ( Home ) :
def list_providers ( self ) :
try :
providers = request . env [ ' auth.oauth.provider ' ] . sudo ( ) . search_read ( [ ( ' enabled ' , ' = ' , True ) ] )
except Exception :
providers = [ ]
for provider in providers :
return_url = request . httprequest . url_root + ' auth_oauth/signin '
state = self . get_state ( provider )
params = dict (
response_type = ' token ' ,
client_id = provider [ ' client_id ' ] ,
redirect_uri = return_url ,
scope = provider [ ' scope ' ] ,
state = json . dumps ( state ) ,
)
provider [ ' auth_link ' ] = " %s ? %s " % ( provider [ ' auth_endpoint ' ] , werkzeug . url_encode ( params ) )
return providers
def get_state ( self , provider ) :
redirect = request . params . get ( ' redirect ' ) or ' web '
if not redirect . startswith ( ( ' // ' , ' http:// ' , ' https:// ' ) ) :
redirect = ' %s %s ' % ( request . httprequest . url_root , redirect [ 1 : ] if redirect [ 0 ] == ' / ' else redirect )
state = dict (
d = request . session . db ,
p = provider [ ' id ' ] ,
r = werkzeug . url_quote_plus ( redirect ) ,
)
token = request . params . get ( ' token ' )
if token :
state [ ' t ' ] = token
return state
@http.route ( )
def web_login ( self , * args , * * kw ) :
ensure_db ( )
if request . httprequest . method == ' GET ' and request . session . uid and request . params . get ( ' redirect ' ) :
# Redirect if already logged in and redirect param is present
return http . redirect_with_hash ( request . params . get ( ' redirect ' ) )
providers = self . list_providers ( )
response = super ( OAuthLogin , self ) . web_login ( * args , * * kw )
if response . is_qweb :
error = request . params . get ( ' oauth_error ' )
if error == ' 1 ' :
error = _ ( " Sign up is not allowed on this database. " )
elif error == ' 2 ' :
error = _ ( " Access Denied " )
elif error == ' 3 ' :
error = _ ( " You do not have access to this database or your invitation has expired. Please ask for an invitation and be sure to follow the link in your invitation email. " )
else :
error = None
response . qcontext [ ' providers ' ] = providers
if error :
response . qcontext [ ' error ' ] = error
return response
@http.route ( )
def web_auth_signup ( self , * args , * * kw ) :
providers = self . list_providers ( )
response = super ( OAuthLogin , self ) . web_auth_signup ( * args , * * kw )
response . qcontext . update ( providers = providers )
return response
@http.route ( )
def web_auth_reset_password ( self , * args , * * kw ) :
providers = self . list_providers ( )
response = super ( OAuthLogin , self ) . web_auth_reset_password ( * args , * * kw )
response . qcontext . update ( providers = providers )
return response
class OAuthController ( http . Controller ) :
@http.route ( ' /auth_oauth/signin ' , type = ' http ' , auth = ' none ' )
@fragment_to_query_string
def signin ( self , * * kw ) :
state = json . loads ( kw [ ' state ' ] )
dbname = state [ ' d ' ]
provider = state [ ' p ' ]
context = state . get ( ' c ' , { } )
registry = registry_get ( dbname )
with registry . cursor ( ) as cr :
try :
env = api . Environment ( cr , SUPERUSER_ID , context )
credentials = env [ ' res.users ' ] . sudo ( ) . auth_oauth ( provider , kw )
cr . commit ( )
action = state . get ( ' a ' )
menu = state . get ( ' m ' )
redirect = werkzeug . url_unquote_plus ( state [ ' r ' ] ) if state . get ( ' r ' ) else False
url = ' /web '
if redirect :
url = redirect
elif action :
url = ' /web#action= %s ' % action
elif menu :
url = ' /web#menu_id= %s ' % menu
resp = login_and_redirect ( * credentials , redirect_url = url )
# Since /web is hardcoded, verify user has right to land on it
if werkzeug . urls . url_parse ( resp . location ) . path == ' /web ' and not request . env . user . has_group ( ' base.group_user ' ) :
resp . location = ' / '
return resp
except AttributeError :
# auth_signup is not installed
_logger . error ( " auth_signup not installed on database %s : oauth sign up cancelled. " % ( dbname , ) )
url = " /web/login?oauth_error=1 "
except AccessDenied :
# oauth credentials not valid, user could be on a temporary session
_logger . info ( ' OAuth2: access denied, redirect to main page in case a valid session exists, without setting cookies ' )
url = " /web/login?oauth_error=3 "
redirect = werkzeug . utils . redirect ( url , 303 )
redirect . autocorrect_location_header = False
return redirect
except Exception as e :
# signup error
_logger . exception ( " OAuth2: %s " % str ( e ) )
url = " /web/login?oauth_error=2 "
return set_cookie_and_redirect ( url )
@http.route ( ' /auth_oauth/oea ' , type = ' http ' , auth = ' none ' )
def oea ( self , * * kw ) :
2018-01-16 11:34:37 +01:00
""" login user via Flectra Account provider """
2018-01-16 06:58:15 +01:00
dbname = kw . pop ( ' db ' , None )
if not dbname :
dbname = db_monodb ( )
if not dbname :
return BadRequest ( )
registry = registry_get ( dbname )
with registry . cursor ( ) as cr :
try :
env = api . Environment ( cr , SUPERUSER_ID , { } )
provider = env . ref ( ' auth_oauth.provider_openerp ' )
except ValueError :
return set_cookie_and_redirect ( ' /web?db= %s ' % dbname )
assert provider . _name == ' auth.oauth.provider '
state = {
' d ' : dbname ,
' p ' : provider . id ,
' c ' : { ' no_user_creation ' : True } ,
}
kw [ ' state ' ] = json . dumps ( state )
return self . signin ( * * kw )