381 lines
17 KiB
Python
381 lines
17 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Part of Odoo, Flectra. See LICENSE file for full copyright and licensing details.
|
|
|
|
import json
|
|
import logging
|
|
import werkzeug
|
|
from datetime import datetime
|
|
from math import ceil
|
|
|
|
from flectra import fields, http, SUPERUSER_ID
|
|
from flectra.http import request
|
|
from flectra.tools import ustr
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
|
|
class WebsiteSurvey(http.Controller):
|
|
# HELPER METHODS #
|
|
|
|
def _check_bad_cases(self, survey, token=None):
|
|
# In case of bad survey, redirect to surveys list
|
|
if not survey.sudo().exists():
|
|
return werkzeug.utils.redirect("/survey/")
|
|
|
|
# In case of auth required, block public user
|
|
if survey.auth_required and request.env.user == request.website.user_id:
|
|
return request.render("survey.auth_required", {'survey': survey, 'token': token})
|
|
|
|
# In case of non open surveys
|
|
if survey.stage_id.closed:
|
|
return request.render("survey.notopen")
|
|
|
|
# If there is no pages
|
|
if not survey.page_ids:
|
|
return request.render("survey.nopages", {'survey': survey})
|
|
|
|
# Everything seems to be ok
|
|
return None
|
|
|
|
def _check_deadline(self, user_input):
|
|
'''Prevent opening of the survey if the deadline has turned out
|
|
|
|
! This will NOT disallow access to users who have already partially filled the survey !'''
|
|
deadline = user_input.deadline
|
|
if deadline:
|
|
dt_deadline = fields.Datetime.from_string(deadline)
|
|
dt_now = datetime.now()
|
|
if dt_now > dt_deadline: # survey is not open anymore
|
|
return request.render("survey.notopen")
|
|
return None
|
|
|
|
## ROUTES HANDLERS ##
|
|
|
|
# Survey start
|
|
@http.route(['/survey/start/<model("survey.survey"):survey>',
|
|
'/survey/start/<model("survey.survey"):survey>/<string:token>'],
|
|
type='http', auth='public', website=True)
|
|
def start_survey(self, survey, token=None, **post):
|
|
UserInput = request.env['survey.user_input']
|
|
|
|
# Test mode
|
|
if token and token == "phantom":
|
|
_logger.info("[survey] Phantom mode")
|
|
user_input = UserInput.create({'survey_id': survey.id, 'test_entry': True})
|
|
data = {'survey': survey, 'page': None, 'token': user_input.token}
|
|
return request.render('survey.survey_init', data)
|
|
# END Test mode
|
|
|
|
# Controls if the survey can be displayed
|
|
errpage = self._check_bad_cases(survey, token=token)
|
|
if errpage:
|
|
return errpage
|
|
|
|
# Manual surveying
|
|
if not token:
|
|
vals = {'survey_id': survey.id}
|
|
if request.website.user_id != request.env.user:
|
|
vals['partner_id'] = request.env.user.partner_id.id
|
|
user_input = UserInput.create(vals)
|
|
else:
|
|
user_input = UserInput.sudo().search([('token', '=', token)], limit=1)
|
|
if not user_input:
|
|
return request.render("website.403")
|
|
|
|
# Do not open expired survey
|
|
errpage = self._check_deadline(user_input)
|
|
if errpage:
|
|
return errpage
|
|
|
|
# Select the right page
|
|
if user_input.state == 'new': # Intro page
|
|
data = {'survey': survey, 'page': None, 'token': user_input.token}
|
|
return request.render('survey.survey_init', data)
|
|
else:
|
|
return request.redirect('/survey/fill/%s/%s' % (survey.id, user_input.token))
|
|
|
|
# Survey displaying
|
|
@http.route(['/survey/fill/<model("survey.survey"):survey>/<string:token>',
|
|
'/survey/fill/<model("survey.survey"):survey>/<string:token>/<string:prev>'],
|
|
type='http', auth='public', website=True)
|
|
def fill_survey(self, survey, token, prev=None, **post):
|
|
'''Display and validates a survey'''
|
|
Survey = request.env['survey.survey']
|
|
UserInput = request.env['survey.user_input']
|
|
|
|
# Controls if the survey can be displayed
|
|
errpage = self._check_bad_cases(survey)
|
|
if errpage:
|
|
return errpage
|
|
|
|
# Load the user_input
|
|
try:
|
|
user_input = UserInput.sudo().search([('token', '=', token)], limit=1)
|
|
except IndexError: # Invalid token
|
|
return request.render("website.403")
|
|
|
|
# Do not display expired survey (even if some pages have already been
|
|
# displayed -- There's a time for everything!)
|
|
errpage = self._check_deadline(user_input)
|
|
if errpage:
|
|
return errpage
|
|
|
|
# Select the right page
|
|
if user_input.state == 'new': # First page
|
|
page, page_nr, last = Survey.next_page(user_input, 0, go_back=False)
|
|
data = {'survey': survey, 'page': page, 'page_nr': page_nr, 'token': user_input.token}
|
|
if last:
|
|
data.update({'last': True})
|
|
return request.render('survey.survey', data)
|
|
elif user_input.state == 'done': # Display success message
|
|
return request.render('survey.sfinished', {'survey': survey,
|
|
'token': token,
|
|
'user_input': user_input})
|
|
elif user_input.state == 'skip':
|
|
flag = (True if prev and prev == 'prev' else False)
|
|
page, page_nr, last = Survey.next_page(user_input, user_input.last_displayed_page_id.id, go_back=flag)
|
|
|
|
#special case if you click "previous" from the last page, then leave the survey, then reopen it from the URL, avoid crash
|
|
if not page:
|
|
page, page_nr, last = Survey.next_page(user_input, user_input.last_displayed_page_id.id, go_back=True)
|
|
|
|
data = {'survey': survey, 'page': page, 'page_nr': page_nr, 'token': user_input.token}
|
|
if last:
|
|
data.update({'last': True})
|
|
return request.render('survey.survey', data)
|
|
else:
|
|
return request.render("website.403")
|
|
|
|
# AJAX prefilling of a survey
|
|
@http.route(['/survey/prefill/<model("survey.survey"):survey>/<string:token>',
|
|
'/survey/prefill/<model("survey.survey"):survey>/<string:token>/<model("survey.page"):page>'],
|
|
type='http', auth='public', website=True)
|
|
def prefill(self, survey, token, page=None, **post):
|
|
UserInputLine = request.env['survey.user_input_line']
|
|
ret = {}
|
|
|
|
# Fetch previous answers
|
|
if page:
|
|
previous_answers = UserInputLine.sudo().search([('user_input_id.token', '=', token), ('page_id', '=', page.id)])
|
|
else:
|
|
previous_answers = UserInputLine.sudo().search([('user_input_id.token', '=', token)])
|
|
|
|
# Return non empty answers in a JSON compatible format
|
|
for answer in previous_answers:
|
|
if not answer.skipped:
|
|
answer_tag = '%s_%s_%s' % (answer.survey_id.id, answer.page_id.id, answer.question_id.id)
|
|
answer_value = None
|
|
if answer.answer_type == 'free_text':
|
|
answer_value = answer.value_free_text
|
|
elif answer.answer_type == 'text' and answer.question_id.type == 'textbox':
|
|
answer_value = answer.value_text
|
|
elif answer.answer_type == 'text' and answer.question_id.type != 'textbox':
|
|
# here come comment answers for matrices, simple choice and multiple choice
|
|
answer_tag = "%s_%s" % (answer_tag, 'comment')
|
|
answer_value = answer.value_text
|
|
elif answer.answer_type == 'number':
|
|
answer_value = str(answer.value_number)
|
|
elif answer.answer_type == 'date':
|
|
answer_value = answer.value_date
|
|
elif answer.answer_type == 'suggestion' and not answer.value_suggested_row:
|
|
answer_value = answer.value_suggested.id
|
|
elif answer.answer_type == 'suggestion' and answer.value_suggested_row:
|
|
answer_tag = "%s_%s" % (answer_tag, answer.value_suggested_row.id)
|
|
answer_value = answer.value_suggested.id
|
|
if answer_value:
|
|
ret.setdefault(answer_tag, []).append(answer_value)
|
|
else:
|
|
_logger.warning("[survey] No answer has been found for question %s marked as non skipped" % answer_tag)
|
|
return json.dumps(ret)
|
|
|
|
# AJAX scores loading for quiz correction mode
|
|
@http.route(['/survey/scores/<model("survey.survey"):survey>/<string:token>'],
|
|
type='http', auth='public', website=True)
|
|
def get_scores(self, survey, token, page=None, **post):
|
|
ret = {}
|
|
|
|
# Fetch answers
|
|
previous_answers = request.env['survey.user_input_line'].sudo().search([('user_input_id.token', '=', token)])
|
|
|
|
# Compute score for each question
|
|
for answer in previous_answers:
|
|
tmp_score = ret.get(answer.question_id.id, 0.0)
|
|
ret.update({answer.question_id.id: tmp_score + answer.quizz_mark})
|
|
return json.dumps(ret)
|
|
|
|
# AJAX submission of a page
|
|
@http.route(['/survey/submit/<model("survey.survey"):survey>'], type='http', methods=['POST'], auth='public', website=True)
|
|
def submit(self, survey, **post):
|
|
_logger.debug('Incoming data: %s', post)
|
|
page_id = int(post['page_id'])
|
|
questions = request.env['survey.question'].search([('page_id', '=', page_id)])
|
|
|
|
# Answer validation
|
|
errors = {}
|
|
for question in questions:
|
|
answer_tag = "%s_%s_%s" % (survey.id, page_id, question.id)
|
|
errors.update(question.validate_question(post, answer_tag))
|
|
|
|
ret = {}
|
|
if len(errors):
|
|
# Return errors messages to webpage
|
|
ret['errors'] = errors
|
|
else:
|
|
# Store answers into database
|
|
try:
|
|
user_input = request.env['survey.user_input'].sudo().search([('token', '=', post['token'])], limit=1)
|
|
except KeyError: # Invalid token
|
|
return request.render("website.403")
|
|
user_id = request.env.user.id if user_input.type != 'link' else SUPERUSER_ID
|
|
|
|
for question in questions:
|
|
answer_tag = "%s_%s_%s" % (survey.id, page_id, question.id)
|
|
request.env['survey.user_input_line'].sudo(user=user_id).save_lines(user_input.id, question, post, answer_tag)
|
|
|
|
go_back = post['button_submit'] == 'previous'
|
|
next_page, _, last = request.env['survey.survey'].next_page(user_input, page_id, go_back=go_back)
|
|
vals = {'last_displayed_page_id': page_id}
|
|
if next_page is None and not go_back:
|
|
vals.update({'state': 'done'})
|
|
else:
|
|
vals.update({'state': 'skip'})
|
|
user_input.sudo(user=user_id).write(vals)
|
|
ret['redirect'] = '/survey/fill/%s/%s' % (survey.id, post['token'])
|
|
if go_back:
|
|
ret['redirect'] += '/prev'
|
|
return json.dumps(ret)
|
|
|
|
# Printing routes
|
|
@http.route(['/survey/print/<model("survey.survey"):survey>',
|
|
'/survey/print/<model("survey.survey"):survey>/<string:token>'],
|
|
type='http', auth='public', website=True)
|
|
def print_survey(self, survey, token=None, **post):
|
|
'''Display an survey in printable view; if <token> is set, it will
|
|
grab the answers of the user_input_id that has <token>.'''
|
|
return request.render('survey.survey_print',
|
|
{'survey': survey,
|
|
'token': token,
|
|
'page_nr': 0,
|
|
'quizz_correction': True if survey.quizz_mode and token else False})
|
|
|
|
@http.route(['/survey/results/<model("survey.survey"):survey>'],
|
|
type='http', auth='user', website=True)
|
|
def survey_reporting(self, survey, token=None, **post):
|
|
'''Display survey Results & Statistics for given survey.'''
|
|
result_template = 'survey.result'
|
|
current_filters = []
|
|
filter_display_data = []
|
|
filter_finish = False
|
|
|
|
if not survey.user_input_ids or not [input_id.id for input_id in survey.user_input_ids if input_id.state != 'new']:
|
|
result_template = 'survey.no_result'
|
|
if 'finished' in post:
|
|
post.pop('finished')
|
|
filter_finish = True
|
|
if post or filter_finish:
|
|
filter_data = self.get_filter_data(post)
|
|
current_filters = survey.filter_input_ids(filter_data, filter_finish)
|
|
filter_display_data = survey.get_filter_display_data(filter_data)
|
|
return request.render(result_template,
|
|
{'survey': survey,
|
|
'survey_dict': self.prepare_result_dict(survey, current_filters),
|
|
'page_range': self.page_range,
|
|
'current_filters': current_filters,
|
|
'filter_display_data': filter_display_data,
|
|
'filter_finish': filter_finish
|
|
})
|
|
# Quick retroengineering of what is injected into the template for now:
|
|
# (TODO: flatten and simplify this)
|
|
#
|
|
# survey: a browse record of the survey
|
|
# survey_dict: very messy dict containing all the info to display answers
|
|
# {'page_ids': [
|
|
#
|
|
# ...
|
|
#
|
|
# {'page': browse record of the page,
|
|
# 'question_ids': [
|
|
#
|
|
# ...
|
|
#
|
|
# {'graph_data': data to be displayed on the graph
|
|
# 'input_summary': number of answered, skipped...
|
|
# 'prepare_result': {
|
|
# answers displayed in the tables
|
|
# }
|
|
# 'question': browse record of the question_ids
|
|
# }
|
|
#
|
|
# ...
|
|
#
|
|
# ]
|
|
# }
|
|
#
|
|
# ...
|
|
#
|
|
# ]
|
|
# }
|
|
#
|
|
# page_range: pager helper function
|
|
# current_filters: a list of ids
|
|
# filter_display_data: [{'labels': ['a', 'b'], question_text} ... ]
|
|
# filter_finish: boolean => only finished surveys or not
|
|
#
|
|
|
|
def prepare_result_dict(self, survey, current_filters=None):
|
|
"""Returns dictionary having values for rendering template"""
|
|
current_filters = current_filters if current_filters else []
|
|
Survey = request.env['survey.survey']
|
|
result = {'page_ids': []}
|
|
for page in survey.page_ids:
|
|
page_dict = {'page': page, 'question_ids': []}
|
|
for question in page.question_ids:
|
|
question_dict = {
|
|
'question': question,
|
|
'input_summary': Survey.get_input_summary(question, current_filters),
|
|
'prepare_result': Survey.prepare_result(question, current_filters),
|
|
'graph_data': self.get_graph_data(question, current_filters),
|
|
}
|
|
|
|
page_dict['question_ids'].append(question_dict)
|
|
result['page_ids'].append(page_dict)
|
|
return result
|
|
|
|
def get_filter_data(self, post):
|
|
"""Returns data used for filtering the result"""
|
|
filters = []
|
|
for ids in post:
|
|
#if user add some random data in query URI, ignore it
|
|
try:
|
|
row_id, answer_id = ids.split(',')
|
|
filters.append({'row_id': int(row_id), 'answer_id': int(answer_id)})
|
|
except:
|
|
return filters
|
|
return filters
|
|
|
|
def page_range(self, total_record, limit):
|
|
'''Returns number of pages required for pagination'''
|
|
total = ceil(total_record / float(limit))
|
|
return range(1, int(total + 1))
|
|
|
|
def get_graph_data(self, question, current_filters=None):
|
|
'''Returns formatted data required by graph library on basis of filter'''
|
|
# TODO refactor this terrible method and merge it with prepare_result_dict
|
|
current_filters = current_filters if current_filters else []
|
|
Survey = request.env['survey.survey']
|
|
result = []
|
|
if question.type == 'multiple_choice':
|
|
result.append({'key': ustr(question.question),
|
|
'values': Survey.prepare_result(question, current_filters)['answers']
|
|
})
|
|
if question.type == 'simple_choice':
|
|
result = Survey.prepare_result(question, current_filters)['answers']
|
|
if question.type == 'matrix':
|
|
data = Survey.prepare_result(question, current_filters)
|
|
for answer in data['answers']:
|
|
values = []
|
|
for row in data['rows']:
|
|
values.append({'text': data['rows'].get(row), 'count': data['result'].get((row, answer))})
|
|
result.append({'key': data['answers'].get(answer), 'values': values})
|
|
return json.dumps(result)
|