from otree.api import * import math import pandas as pd import itertools import numpy as np from json import dumps as json_dumps, loads as json_loads import time class C(BaseConstants): NAME_IN_URL = 'task3' PLAYERS_PER_GROUP = 4 NUM_ROUNDS = 10 DEFAULT_TEMPLATE = 'cpr_defaults/templates/default_.html' A = 2.3 B = 0.025 # SVO_VALUES = pd.read_csv("cpr_defaults/svo_values.csv") class Subsession(BaseSubsession): #group_size = models.IntegerField(initial=4) wait_for_ids = models.LongStringField(initial='[]') arrived_ids = models.LongStringField(initial='[]') def creating_session(subsession): # extractions = itertools.cycle([11, 18]) #subsession.group_size = 4 for group in subsession.get_groups(): group.default_extraction = int(subsession.session.config["default_extraction"]) players = subsession.get_players() for p in players: p.participant.finished_participant = False p.participant.completion_status = "incomplete" class Group(BaseGroup): group_extraction = models.IntegerField(initial=0) default_extraction = models.IntegerField() group_extraction_last_round = models.IntegerField(initial=0) def set_payoffs(self): for player in self.get_players(): self.group_extraction += player.extraction for player in self.get_players(): player.payoff_task3 += np.round(self.payoff_extraction(player.extraction, self.group_extraction), 2) player.participant.payoff_task3 += player.payoff_task3 @staticmethod def payoff_extraction(x, x_total): payoff_player = 0 if x == 0: payoff_player = 0 else: payoff_player = round((x / x_total) * ((C.A * x_total) - (C.B * math.pow(x_total, 2))), 2) return payoff_player wait_for_ids = models.LongStringField(initial='[]') arrived_ids = models.LongStringField(initial='[]') def unarrived_players(group: Group): return set(json_loads(group.wait_for_ids)) - set(json_loads(group.arrived_ids)) def unarrived_players_subsession(subsession: Subsession): return set(json_loads(subsession.wait_for_ids)) - set(json_loads(subsession.arrived_ids)) class Player(BasePlayer): internal_id = models.FloatField() extraction = models.IntegerField(initial=0) payoff_task3 = models.FloatField(initial=0) payoff_last_round = models.FloatField() time_1st_change_button = models.FloatField() time_spent_dialog = models.FloatField() number_changes = models.IntegerField() extraction_last_round = models.IntegerField() time_spent_round = models.FloatField() timeout_last_round = models.BooleanField(initial=False) completed = models.BooleanField(initial=False) other0 = models.FloatField() other1 = models.FloatField() other2 = models.FloatField() waiting = models.BooleanField(initial=False) lost_focus_time = models.FloatField(initial = 0) lost_focus = models.BooleanField(initial = False) number_clicks_sandbox_self = models.IntegerField(initial = 0) number_clicks_sandbox_others = models.IntegerField(initial=0) number_scrolls_table = models.IntegerField(initial=0) def waiting_too_long(player): participant = player.participant import time return time.time() - participant.wait_page_arrival > int(player.subsession.session.config["timeout_minutes_grouping"]) * 60 def lost_focus(player): if player.lost_focus_time > 0: if time.time() - player.lost_focus_time > int(player.subsession.session.config["timeout_focus"]) * 60: player.lost_focus = True print(player.participant.id_in_session) def check_waiting_players(subsession, waiting_players): for p in subsession.get_players(): if p.waiting and p not in waiting_players: if p.lost_focus_time == 0: p.lost_focus_time = time.time() lost_focus(p) else: p.lost_focus_time = 0 def group_by_arrival_time_method(subsession, waiting_players): session = subsession.session check_waiting_players(subsession, waiting_players) if len(waiting_players) == C.PLAYERS_PER_GROUP: for p in waiting_players: p.waiting = False p.lost_focus_time = 0 return waiting_players for player in waiting_players: player.waiting = True if waiting_too_long(player) and not player.lost_focus: player.participant.finished_participant = True player.participant.completion_status = "ungrouped" player.participant.reason_finished = "We were not able to find a group for you. You will be paid for the past completed tasks." return [player] elif player.lost_focus: player.participant.finished_participant = True player.participant.completion_status = "lost_focus" return [player] # FUNCTIONS # PAGES class GroupsWaitPage(WaitPage): template_name = 'cpr_defaults/GroupsWaitPage.html' title_text = "Task 3" body_text = "Please wait until we find a group for you" group_by_arrival_time = True @staticmethod def is_displayed(player): return player.round_number == 1 @staticmethod def app_after_this_page(player, upcoming_apps): if player.participant.finished_participant and player.participant.completion_status == "ungrouped": player.participant.payoff_task3 = 0 return "cpr_results" elif player.lost_focus and player.participant.completion_status == "lost_focus": player.participant.payoff_task3 = 0 player.participant.reason_finished = "We detected you lost focus of the experiment tab." return "finished_experiment" @staticmethod def vars_for_template(player): return dict(time_elapsed = int(np.floor((time.time() - player.participant.wait_page_arrival) / 60))) class Default(Page): timer_text = "" @staticmethod def get_timeout_seconds(player): return int(player.subsession.session.config["timeout_seconds_task3"]) @staticmethod def js_vars(player): return dict( default_extraction=int(player.subsession.session.config["default_extraction"]) ) @staticmethod def vars_for_template(player): if player.round_number >= 2: player.payoff_last_round = player.in_round(player.round_number - 1).payoff_task3 player.extraction_last_round = player.in_round(player.round_number - 1).extraction player.group.group_extraction_last_round = player.in_round( player.round_number - 1).group.group_extraction - player.extraction_last_round else: player.participant.payoff_task3 = 0 player.payoff_last_round = 0 player.group.group_extraction_last_round = 0 player.extraction_last_round = 0 @staticmethod def live_method(player, data): player.extraction = int(data["extraction"]) player.time_1st_change_button = float(data["time_1st_change_button"]) player.time_spent_dialog = float(data["time_spent_dialog"]) player.number_changes = int(data["number_changes"]) player.time_spent_round = float(data["time_spent_round"]) player.number_clicks_sandbox_self = int(data["clicks_sandbox_self"]) player.number_clicks_sandbox_others = int(data["clicks_sandbox_others"]) player.number_scrolls_table = int(data["number_scrolls_table"]) @staticmethod def before_next_page(player, timeout_happened): if timeout_happened: player.extraction = 0 player.time_1st_change_button = 0 player.time_spent_dialog = 0 player.number_changes = 0 player.time_spent_round = 0 player.participant.finished_participant = True player.participant.completion_status = "timeout" player.participant.reason_finished = "As we stated, you had 2 minutes to complete the task. You took longer than the maximum time allowed to make your decision. Therefore, you were removed from the experiment. You will be paid for the past completed tasks." group = player.group for p in group.get_players(): p.participant.group_size_task3 -= 1 arrived_ids_set = set(json_loads(group.arrived_ids)) arrived_ids_set.add(player.id_in_subsession) group.arrived_ids = json_dumps(list(arrived_ids_set)) # wait_page_live_method(player,"") @staticmethod def app_after_this_page(player, upcoming_apps): if player.participant.finished_participant and player.participant.completion_status == "timeout": player.participant.payoff_task3 = 0 return "cpr_results" def wait_page_live_method(player: Player, data): group = player.group arrived_ids_set = set(json_loads(group.arrived_ids)) arrived_ids_set.add(player.id_in_subsession) group.arrived_ids = json_dumps(list(arrived_ids_set)) response = {} if not unarrived_players(group): response = {0: dict(finished=True, remaining=0)} if player.group.group_extraction == 0: group.set_payoffs() else: response = {0: dict(finished=False, remaining=len(unarrived_players(group)))} return response class WaitPageRound(Page): @staticmethod def is_displayed(player: Player): group = player.group # first time if not json_loads(group.wait_for_ids): wait_for_ids = [] for p in group.get_players(): if not p.participant.finished_participant: wait_for_ids.append(p.id_in_subsession) # wait_for_ids = [p.id_in_subsession for p in group.get_players()] group.wait_for_ids = json_dumps(wait_for_ids) return unarrived_players(group) @staticmethod def live_method(player: Player, data): if data.get('type') == 'wait_page': return wait_page_live_method(player, data) @staticmethod def error_message(player: Player, values): group = player.group if unarrived_players(group): return "Wait page not finished" class RoundResultsPage(Page): timeout_seconds = 30 timer_text = "" @staticmethod def vars_for_template(player): response = {} others = player.get_others_in_group() for i, o in enumerate(others): response["other%d" % i] = o.extraction return response @staticmethod def js_vars(player): response = dict() response["remaining"] = 4 others = player.get_others_in_group() response["payoff_t3"] = player.payoff_task3 for i, o in enumerate(others): response["other%d" % i] = o.extraction if o.extraction == 0: response["remaining"] -= 1 return response @staticmethod def app_after_this_page(player, upcoming_apps): if player.participant.group_size_task3 <= 2: player.participant.reason_finished = "Your group was composed of less than 3 participants. You will be paid for the past completed tasks." player.participant.completion_status = "ungrouped" player.participant.finished_participant = True player.participant.payoff_task3 = 0 return "cpr_results" # print(upcoming_apps) # if player.subsession.group_size <= 2: # return upcoming_apps[-1] page_sequence = [GroupsWaitPage, Default, WaitPageRound, RoundResultsPage]