from otree.api import * from numpy import random from datetime import datetime doc = """ Three-agent task """ class C(BaseConstants): NAME_IN_URL = 'three_agent' PLAYERS_PER_GROUP = 3 NUM_ROUNDS = 90 ENDOWMENT = cu(1) SEQ_STORED = 0 # for each group which participant store the actor information N_MINI_BLOCK = 30 # in each mini block the tree players take turns to be the Actor. WAITING_TIME_MAX = 60*10 # the max time the players should wait, in second. RESPONSE_TIME_LIMIT = 60 * 5 # 5 seconds for testing, change this when doing formal experiment. class Subsession(BaseSubsession): pass def creating_session(subsession): if subsession.round_number == 1: for player in subsession.get_players(): participant = player.participant participant.is_dropout = False class Group(BaseGroup): actor = models.IntegerField(initial = 0) num_finished = models.IntegerField(initial = 0) rewarded_no = models.IntegerField(initial = 0) class Player(BasePlayer): question_purpose = models.StringField(label="What do you think is the purpose of the experiment? \n (Please input NA if you have no clue.\n)"); question_task = models.StringField(label="Did the experiment run smoothly? If not, what happened? \n") # PAGES def waiting_too_long(player): participant = player.participant import time # assumes you set wait_page_arrival in PARTICIPANT_FIELDS. return time.time() - participant.arrival_at_waiting > C.WAITING_TIME_MAX def group_by_arrival_time_method(subsession, waiting_players): if len(waiting_players) >= C.PLAYERS_PER_GROUP: for player in waiting_players: player.participant.is_initialized = False return waiting_players[:C.PLAYERS_PER_GROUP] for player in waiting_players: if waiting_too_long(player): # make a single-player group. return [player] class WaitingRoom(WaitPage): group_by_arrival_time = True def is_displayed(player): return player.round_number == 1 class SendTask(Page): timeout_seconds = C.RESPONSE_TIME_LIMIT # the time limit to repondis 5 minutes def is_displayed(player): return len(player.group.get_players()) == C.PLAYERS_PER_GROUP and not player.participant.is_dropout def before_next_page(player, timeout_happened): if timeout_happened: # you may want to fill a default value for any form fields, # because otherwise they may be left null. player_list = player.group.get_players() for player in player_list: player.participant.is_dropout = True def live_method(player: Player,data): # This function is called when the javascripts use 'liveSend' method to send data # to the server. We need to unpack the message and then send information back to # players. # get group and player list group = player.group player_list = group.get_players() lead_player = player_list[C.SEQ_STORED] # lead player store the actor seq information if data['type'] == 'control': # control type message indicate was used for controlling task procedure. if data['msg'] == 'page_loaded': # Initialization for each group. if player.round_number == 1 and (not lead_player.participant.is_initialized): template_seq = [1,2,3] actor_seq = [] actor_seq.extend(random.permutation(template_seq)) for i in range(C.N_MINI_BLOCK-1): mini_seq = random.permutation(template_seq) while mini_seq[0] == actor_seq[-1]: mini_seq = random.permutation(template_seq) actor_seq.extend(mini_seq) print(actor_seq) lead_player.participant.actor_seq = actor_seq lead_player.participant.is_initialized = True if group.actor == 0: group.actor = int(lead_player.participant.actor_seq.pop()) print('The actor is',group.actor,'Player ',player.id_in_group,':','Ready!') response = dict(msg_type = 'control',msg = 'initialization', player_id = player.id_in_group, next_actor = group.actor) return {player.id_in_group: response} if data['msg'] == 'trial_finished': player.group.num_finished += 1 print('num_finished',player.group.num_finished) if player.group.num_finished == C.PLAYERS_PER_GROUP: # check if all players completed the trial player.group.num_finished = 0 return {0: dict(msg_type = 'control',msg = 'all_players_finished')} if data['type'] == 'data': response = dict(msg_type = 'data', stage = 'send', chosen_player = data['choice'], food_angle_list = data['food_angle_list']) player.group.rewarded_no = data['choice'] return {0: response} if data['type'] == 'graphic': if data['shape'] == 'angle': angle = data['angle'] response = dict(msg_type = 'graphic', stage = 'send', angle = angle) player_list = player.get_others_in_group() msg = {} for player in player_list: msg[player.id_in_group] = response return msg class WaitingTooLong(Page): def is_displayed(player): is_finished_after_waiting = len(player.group.get_players()) < C.PLAYERS_PER_GROUP return player.round_number == 1 and is_finished_after_waiting class Debrief(Page): form_model = 'player' form_fields = ['question_purpose','question_task'] def is_displayed(player): player.participant.finished = True # otree tells me to define this is_finished_after_waiting = len(player.group.get_players()) < C.PLAYERS_PER_GROUP return player.round_number == C.NUM_ROUNDS or (player.round_number == 1 and is_finished_after_waiting) or player.participant.is_dropout class TaskFinished(Page): def is_displayed(player): player.participant.finished = True # otree tells me to define this is_finished_after_waiting = len(player.group.get_players()) < C.PLAYERS_PER_GROUP return player.round_number == C.NUM_ROUNDS or (player.round_number == 1 and is_finished_after_waiting) or player.participant.is_dropout page_sequence = [ WaitingRoom, SendTask, Debrief, TaskFinished ]