from otree.api import ( models, widgets, BaseConstants, BaseSubsession, BaseGroup, BasePlayer, Currency as c, currency_range ) from django.utils.safestring import mark_safe from django.template.defaultfilters import safe import csv import pandas as pd import random import math import datetime import time import django.utils.timezone author = 'Jan Romann' doc = """ Studie des Forschungsprojektes »Maße der Bedarfsgerechtigkeit, Expertise und Kohärenz« """ class Constants(BaseConstants): name_in_url = 'ma_jan' players_per_group = None num_rounds = 1 SUPPORTED_COUNTRIES = ["USA", "Sweden", "Germany"] DEFAULT_COUNTRY = "USA" PAGES_DATA = pd.read_csv('ma_jan/question_data/pages.csv') QUESTION_DATA = pd.read_csv('ma_jan/question_data/question_data.csv') TREATMENT_DATA = pd.read_csv('ma_jan/question_data/treatment.csv') VIGNETTE_DATA = pd.read_csv('ma_jan/question_data/numbers.csv') NUMBER_OF_TREATMENTS = 2 DEFAULT_DONT_KNOW_STRING = "Don't know" DEFAULT_NO_ANSWER_STRING = 'Prefer not to answer' # TODO: Simplify function def get_custom_levels(number_of_levels: int, labels: list, no_answer: bool, dont_know: bool): highest_level = number_of_levels + 1 levels = [] if number_of_levels < 2 or len(labels) < 2: return levels for x in range(1, highest_level): if x == 1: levels.append([x, f'1 - {labels[0]}']) elif x == number_of_levels: levels.append([x, f'{number_of_levels} - {labels[1]}']) elif len(labels) > 2 and x == highest_level / 2 and labels[2]: levels.append([x, f'{int(highest_level / 2)} - {labels[2]}']) else: levels.append([x, f'{x}']) if dont_know: levels.append([98, Constants.DEFAULT_DONT_KNOW_STRING]) if no_answer: levels.append([99, Constants.DEFAULT_NO_ANSWER_STRING]) return levels def get_numbers_for_labels(labels: list, no_answer: bool, dont_know: bool): if not len(labels) > 0: return None labelled_levels = [list(x) for x in list(zip(range(1, len(labels) + 1), labels))] if dont_know: labelled_levels.append([98, Constants.DEFAULT_DONT_KNOW_STRING]) if no_answer: labelled_levels.append([99, Constants.DEFAULT_NO_ANSWER_STRING]) return labelled_levels def get_likert_levels(levels: int, no_answer: bool, dont_know: bool): first_half = [] second_half = [] if levels > 3: first_half.append('Strongly Disagree') second_half.append('Strongly Agree') first_half.append('Disagree') second_half.append('Agree') if levels > 5: first_half.append('Somewhat Disagree') second_half.append('Somewhat Agree') if not levels % 2 == 0: first_half.append('Neither Agree nor Disagree') return get_numbers_for_labels(first_half + second_half, dont_know, no_answer) def randomize_list(list_to_randomize): return random.sample(list_to_randomize, len(list_to_randomize)) def get_levels_for_variable(variable_name: str): return Constants.QUESTION_DATA[Constants.QUESTION_DATA.variable_name == variable_name].iloc[ 0].filter( regex=r'^option_+').dropna().to_list() def create_question(row): datatype = str(row['datatype']) if not pd.isna(row['datatype']) else None widget_name = str(row['widget']) if not pd.isna(row['widget']) else None label = str(row['label']) if not pd.isna(row['label']) else '' blank = bool(row['can_be_blank']) if not pd.isna(row['can_be_blank']) else False no_answer = bool(row['no_answer']) if not pd.isna(row['no_answer']) else False dont_know = bool(row['dont_know']) if not pd.isna(row['dont_know']) else False min = int(row['min']) if not pd.isna(row['min']) else None max = int(row['max']) if not pd.isna(row['max']) else None likert_levels = int(row['likert_levels']) if not pd.isna(row['likert_levels']) else None max_length = int(row['max']) if not pd.isna(row['max_length']) else None options = row.filter(regex=r'^option_+').dropna().to_list() number_of_options = len(options) if widget_name == "RadioButton": widget = widgets.RadioSelect elif widget_name == "CheckBox": widget = widgets.CheckboxInput elif widget_name == "Textarea": widget = widgets.Textarea() else: widget = None if datatype == 'string': if max_length is not None: return models.StringField(blank=blank, label=label, widget=widget, max_length=max_length) else: return models.StringField(blank=blank, label=label, widget=widget, ) elif datatype == "boolean": if widget_name == "CheckBox" and number_of_options > 1: checkboxes = [] for x in range(0, number_of_options): checkboxes.append(models.BooleanField(blank=blank, label=options[x], widget=widget, )) return checkboxes return models.BooleanField(blank=blank, label=label, widget=widget, ) elif datatype == "currency": return models.CurrencyField(blank=blank, label=label, min=min, max=max, ) else: choices = None if len(options) > 0: if likert_levels is None: choices = get_numbers_for_labels(options, no_answer, dont_know) else: choices = get_custom_levels(likert_levels, options, no_answer, dont_know) elif likert_levels is not None: choices = get_likert_levels(likert_levels, no_answer, dont_know) return models.IntegerField(blank=blank, label=label, widget=widget, choices=choices, min=min, max=max) class Subsession(BaseSubsession): def creating_session(self): person_data = pd.read_csv('ma_jan/question_data/names.csv') country = Constants.DEFAULT_COUNTRY if self.session.config.get("country_specific_names", False): config_country = self.session.config.get("country", country) if config_country in Constants.SUPPORTED_COUNTRIES: country = config_country person_names = person_data[person_data.country == country]['person_name'].to_list() players = self.get_players() treatment_list = [] for index, row in Constants.TREATMENT_DATA.iterrows(): treatment_list.extend( [row['treatment_number']] * self.session.config.get(f"num_treatment_{row['treatment_number']}", 0)) self.session.vars['treatments'] = treatment_list for p in players: p.random_state = random.randint(0, 4294967295) p.block_order = random.randint(0, 1) p.reversed_order_need_productivity = random.random() < 0.5 p.switch_a_and_b = random.random() < 0.5 persons = randomize_list(person_names) p.person_a_name = persons.pop() p.person_b_name = persons.pop() numbers_rand = Constants.VIGNETTE_DATA.groupby('scenario').apply(lambda x: x.sample(frac=1)).reset_index( drop=True) if p.block_order == 1: numbers_rand = numbers_rand.iloc[::-1] # Reverses order of questions p.sequence_need_vignette = " ".join(str(x) for x in numbers_rand[numbers_rand.scenario == "need"].situation) p.sequence_productivity_vignette = " ".join( str(x) for x in numbers_rand[numbers_rand.scenario == "productivity"].situation) p.participant.vars['vignette_data'] = numbers_rand p.randomize_variable("checker_vignette") p.participant.vars['order_log_choices'] = p.randomize_choices("checker_logs") p.randomize_questions() class Group(BaseGroup): pass class Player(BasePlayer): def start(self): self.participant.vars['expiry'] = time.time() + self.session.config.get("timeout_minutes", 90) * 60 random.shuffle(self.session.vars['treatments']) if len(self.session.vars['treatments']) > 0: self.treatment_number = self.session.vars['treatments'].pop() else: self.treatment_number = random.choice(Constants.TREATMENT_DATA['treatment_number'].to_list()) self.treatment_was_randomly_assigned = True def randomize_variable(self, variable_name: str): self.participant.vars[f'order_{variable_name}'] = randomize_list( list(range(0, len(get_levels_for_variable(variable_name))))) def is_high_accountability(self): return self.treatment_number == 1 def is_low_accountability(self): return self.treatment_number == 2 def randomize_choices(self, variable_name: str): return randomize_list(list(range(0, len(get_levels_for_variable(variable_name))))) def get_current_question(self): return self.get_question(self.vignette_index) def get_question(self, index): data = self.participant.vars.get('vignette_data') if index < 0: return None else: try: return data.iloc[index] except IndexError: return None def get_number_of_questions(self): return len(self.participant.vars.get('vignette_data').index) def get_current_question_names(self): return self.get_question_names(self.get_question(self.vignette_index)) def get_question_names(self, question_data): scenario = question_data['scenario'] situation = question_data['situation'] return f"justice_{scenario}_A_{situation}", f"justice_{scenario}_B_{situation}" def drop_out(self): if not self.dropout: self.dropout = True self.reset_treatment() def is_quality_fail(self): fail = self.quality_fails > self.session.config.get("max_quality_fails", math.inf) if fail: self.reset_treatment() return fail def reset_treatment(self): if self.treatment_number is not None and not self.treatment_reset: if not self.treatment_was_randomly_assigned: self.treatment_reset = True self.session.vars['treatments'].append(self.treatment_number) def get_timeout_seconds(self): return self.participant.vars['expiry'] - time.time() def check_checkers(self): quality_fail = not self.checker_vignette_0 quality_fail |= self.checker_vignette_1 quality_fail |= self.checker_vignette_2 quality_fail |= self.checker_vignette_3 quality_fail |= self.checker_vignette_4 quality_fail |= self.checker_vignette_5 quality_fail |= self.checker_vignette_6 if quality_fail: self.quality_fails += 1 def check_scheite(self): if self.checker_logs != 1: self.quality_fails += 1 def is_displayed(self): return not self.dropout and not self.is_quality_fail() def get_country_filter(self): country = "USA" if self.session.config.get('country_specific_names', False): country_config = self.session.config.get('country', "USA") if country_config in Constants.SUPPORTED_COUNTRIES: country = country_config return ["Both", country] def randomize_questions(self): country_filter = self.get_country_filter() page_group = Constants.PAGES_DATA[Constants.PAGES_DATA.randomize_questions == 1] for index, page in page_group.iterrows(): questions_on_page = Constants.QUESTION_DATA[ (Constants.QUESTION_DATA.page_name == page.page) & ( Constants.QUESTION_DATA.country.isin(country_filter))] questions_on_page.fillna(dict(randomize_order=1), inplace=True) questions_on_page = questions_on_page.fillna(-9999) question_groups = list(range(0, questions_on_page['question_group'].nunique())) random.Random(self.random_state).shuffle(question_groups) questions_rand = questions_on_page[questions_on_page.randomize_order == 1].groupby( 'question_group').apply(lambda x: x.sample(frac=1, random_state=self.random_state)).reset_index( drop=True) questions_not_rand = questions_on_page[questions_on_page.randomize_order == 0] result = pd.concat([questions_not_rand, questions_rand]) result['group'] = result.groupby('question_group').ngroup() result['group'] = result['group'].apply(lambda x: question_groups[x]) self.participant.vars[f'{page.page}_questions'] = result.sort_values('group')['variable_name'].to_list() def get_formfields_for_page(self, page_name): country_filter = self.get_country_filter() return Constants.QUESTION_DATA[(Constants.QUESTION_DATA.page_name == page_name) & ( Constants.QUESTION_DATA.country.isin(country_filter))]["variable_name"].tolist() def get_other_list(self, page_name): country_filter = self.get_country_filter() data = Constants.QUESTION_DATA[(Constants.QUESTION_DATA.page_name == page_name) & ( Constants.QUESTION_DATA.country.isin(country_filter))] other_data = data[~data.variable_name.isna() & ~data.other_value.isna() & ~data.other_variable_name.isna()] variable_names = other_data["variable_name"].to_list() other_values = [int(i) for i in other_data["other_value"].to_list()] other_variable_names = other_data["other_variable_name"].to_list() return_list = [list(x) for x in list(zip(variable_names, other_values, other_variable_names))] return return_list def get_data_for_vignette(self): vignette_index = self.vignette_index number_of_questions = self.get_number_of_questions() current_question = self.get_question(vignette_index) last_question = self.get_question(self.vignette_index - 1) selector_1, selector_2 = self.get_question_names(current_question) scenario = current_question['scenario'] name_1 = name_a = self.person_a_name name_2 = name_b = self.person_b_name need_1, need_2 = current_question['need_a'], current_question['need_b'] productivity_1, productivity_2 = current_question['productivity_a'], current_question['productivity_b'] endowment = int(productivity_1) + int(productivity_2) if last_question is None: old_scenario = scenario old_need_1, old_need_2 = need_1, need_2 old_productivity_1, old_productivity_2 = productivity_1, productivity_2 old_endowment = endowment else: old_scenario = last_question['scenario'] old_need_1, old_need_2 = last_question['need_a'], last_question['need_b'] old_productivity_1, old_productivity_2 = last_question['productivity_a'], last_question['productivity_b'] old_endowment = old_productivity_1 + old_productivity_2 if self.switch_a_and_b: name_1, name_2 = name_2, name_1 productivity_1, productivity_2 = productivity_2, productivity_1 old_productivity_1, old_productivity_2 = old_productivity_2, old_productivity_1 need_1, need_2 = need_2, need_1 old_need_1, old_need_2 = old_need_2, old_need_1 class_list = [] comparisons = [ [scenario, old_scenario, 'new-block'], [need_1, old_need_1, 'need_1'], [need_2, old_need_2, 'need_2'], [productivity_1, old_productivity_1, 'productivity_1'], [productivity_2, old_productivity_2, 'productivity_2'], [endowment, old_endowment, 'endowment'], ] for comparison in comparisons: if comparison[0] != comparison[1]: class_list.append(comparison[2]) treatment_data = \ Constants.TREATMENT_DATA[Constants.TREATMENT_DATA.treatment_number == self.treatment_number].iloc[0] low_productivity_reason = treatment_data['low_productivity_reason'] high_need_reason = treatment_data['high_need_reason'] types_of_need_variation = treatment_data['types_of_need_variation'] types_of_need_consequence = treatment_data['types_of_need_consequence'] return dict( scenario=scenario, number_of_questions=number_of_questions, current_question_number=vignette_index + 1, name_a=name_a, name_b=name_b, name_1=name_1, name_2=name_2, productivity_1=productivity_1, productivity_2=productivity_2, total_productivity=int(productivity_1) + int(productivity_2), need_1=need_1, need_2=need_2, total_need=int(need_1) + int(need_2), endowment=endowment, selector_1=selector_1, selector_2=selector_2, class_list=class_list, low_productivity_reason=low_productivity_reason, high_need_reason=high_need_reason, types_of_need_variation=types_of_need_variation, types_of_need_consequence=types_of_need_consequence, ) treatment_number = models.PositiveIntegerField() treatment_was_randomly_assigned = models.BooleanField(initial=False) block_order = models.IntegerField(initial=0) quality_fails = models.IntegerField(initial=0) dropout = models.BooleanField(initial=False) treatment_reset = models.BooleanField(initial=False) person_a_name = models.StringField(initial="") person_b_name = models.StringField(initial="") vignette_index = models.IntegerField(initial=0) random_state = models.IntegerField(initial=0) sequence_need_vignette = models.StringField(initial="") sequence_productivity_vignette = models.StringField(initial="") reversed_order_need_productivity = models.BooleanField() switch_a_and_b = models.BooleanField() for i, row in Constants.VIGNETTE_DATA.iterrows(): for x in ["A", "B"]: locals()[f"justice_{row['scenario']}_{x}_{row['situation']}"] = models.PositiveIntegerField() del x del i for i, row in Constants.QUESTION_DATA.iterrows(): if not str(row['variable_name']) or not bool(row['include']): continue field = create_question(row) if isinstance(field, list): for x in range(0, len(field)): locals()[str(row['variable_name']) + f"_{x}"] = field[x] del x else: locals()[str(row[f'variable_name'])] = field del field del i # TODO: Create custom data export