from otree.api import * import numpy as np from numpy import random from datetime import datetime doc = """ Four-agent task """ class C(BaseConstants): NAME_IN_URL = 'four_agent' PLAYERS_PER_GROUP = 4 NUM_ROUNDS = 240 SEQ_STORED = 0 # for each group which participant store the condition information N_BLOCK = 8 BLOCK_SIZE = 30 WAITING_TIME_MAX = 60 * 10 # the max time the players should wait, in second. RESPONSE_TIME_LIMIT = 60 * 5000 # 5 seconds for testing, change this when doing formal experiment. ENDOWMENT = 10 BONUS = 5 class Subsession(BaseSubsession): pass def creating_session(subsession): if subsession.round_number == 1: for player in subsession.get_players(): participant = player.participant participant.is_dropout = False class Group(BaseGroup): num_make_choice = models.IntegerField(initial = 0) num_finished = models.IntegerField(initial = 0) class Player(BasePlayer): question_purpose = models.StringField(label="你觉得实验的目的是什么?"); question_task = models.StringField(label="你在任务中的策略是什么?") player_no = models.IntegerField(initial = 0) rewarded_no = models.IntegerField(initial = 0) revealed_no = models.IntegerField(initial = 0) trial_finished = models.IntegerField(initial = 0) chosen_idx = models.IntegerField(initial = -1) payoffcal = models.IntegerField(initial = 0) condition = models.IntegerField(initial = 0) block_no = models.IntegerField(initial = 0) RT = models.FloatField(initial = 0) CT = models.FloatField(initial = 0) # PAGES def waiting_too_long(player): participant = player.participant import time # assumes you set wait_page_arrival in PARTICIPANT_FIELDS. return time.time() - participant.arrival_at_waiting > C.WAITING_TIME_MAX def group_by_arrival_time_method(subsession, waiting_players): if len(waiting_players) >= C.PLAYERS_PER_GROUP: for player in waiting_players: player.participant.is_initialized = False player.participant.no_switched = 0 player.participant.total_payout = 0 player.participant.player_no_list = [1,2,3,4] return waiting_players[:C.PLAYERS_PER_GROUP] for player in waiting_players: if waiting_too_long(player): # make a single-player group. return [player] class WaitingRoom(WaitPage): group_by_arrival_time = True def is_displayed(player): player.participant.payout_round = random.randint(1, C.NUM_ROUNDS+1, size=1) return player.round_number == 1 class BlockDescription(Page): def is_displayed(player): player_list = player.group.get_players() lead_player = player_list[C.SEQ_STORED] if not lead_player.participant.is_initialized: template_seq = [0,1] condition_seq = [] for i in range(C.N_BLOCK): condition_seq.extend(random.permutation(template_seq)) image_matrix = np.random.permutation(C.N_BLOCK*C.PLAYERS_PER_GROUP) + 1 image_matrix = image_matrix.reshape(C.N_BLOCK, C.PLAYERS_PER_GROUP) lead_player.participant.condition_seq = condition_seq lead_player.participant.image_seq = image_matrix lead_player.participant.is_initialized = True print(condition_seq) if lead_player.participant.no_switched < (player.round_number-1)//C.BLOCK_SIZE + 1: player_no_list = lead_player.participant.player_no_list random_list = random.permutation(player_no_list) while(any(a == b for a, b in zip(random_list, player_no_list))): random_list = random.permutation(player_no_list) lead_player.participant.player_no_list = random_list.tolist() lead_player.participant.no_switched += 1 return player.round_number%C.BLOCK_SIZE == 1 def live_method(player: Player,data): # This function is called when the javascripts use 'liveSend' method to send data # to the server. We need to unpack the message and then send information back to # players. # get group and player list group = player.group player_list = group.get_players() lead_player = player_list[C.SEQ_STORED] # lead player store the condition seq information if data['type'] == 'control': # control type message indicate was used for controlling task procedure. if data['msg'] == 'page_loaded': # Initialization for each group. print(player.id_in_group) condition_seq = lead_player.participant.condition_seq condition_seq = [int(x) for x in condition_seq] block_index = (player.round_number-1)//C.BLOCK_SIZE condition = condition_seq[block_index] print(condition) player_no_list = lead_player.participant.player_no_list player_no_list = [int(x) for x in player_no_list] player_no = player_no_list[player.id_in_group-1] player.player_no = player.id_in_group image_seq = [int(x) for x in lead_player.participant.image_seq[block_index]] print(image_seq) trial_finished_list = [] player_no_list = [] for p in player.group.get_players(): player_no_list.extend([p.id_in_group]) trial_finished_list.extend([p.trial_finished]) response = dict(msg_type = 'control', msg = 'initialization', player_id = player.id_in_group, player_no = player.player_no, img_01 = image_seq[0], img_02 = image_seq[1], img_03 = image_seq[2], img_04 = image_seq[3], condition = condition, player_no_list = player_no_list, trial_finished_list = trial_finished_list) print(response) return {player.id_in_group: response} if data['msg'] == 'trial_finished': player.trial_finished = 1 all_finished = True for p in player.group.get_players(): if p.trial_finished != 1: all_finished = False if all_finished: # check if all players completed the trial player.group.num_finished = 0 return {0: dict(msg_type = 'control',msg = 'all_players_finished')} class SendTask(Page): timeout_seconds = C.RESPONSE_TIME_LIMIT # the time limit to respond is 5 minutes def is_displayed(player): return len(player.group.get_players()) == C.PLAYERS_PER_GROUP and not player.participant.is_dropout def before_next_page(player, timeout_happened): if timeout_happened: # you may want to fill a default value for any form fields, # because otherwise they may be left null. player_list = player.group.get_players() for player in player_list: player.participant.is_dropout = True def live_method(player: Player,data): # This function is called when the javascripts use 'liveSend' method to send data # to the server. We need to unpack the message and then send information back to # players. # get group and player list group = player.group player_list = group.get_players() lead_player = player_list[C.SEQ_STORED] # lead player store the condition seq information condition_seq = lead_player.participant.condition_seq condition_seq = [int(x) for x in condition_seq] block_index = (player.round_number-1)//C.BLOCK_SIZE condition = condition_seq[(player.round_number-1)//C.BLOCK_SIZE] if data['type'] == 'control': # control type message indicate was used for controlling task procedure. if data['msg'] == 'page_loaded': # Initialization for each group. condition_seq = lead_player.participant.condition_seq condition_seq = [int(x) for x in condition_seq] condition = condition_seq[block_index] player_no = player.id_in_group player.player_no = player_no player.block_no = block_index + 1 image_seq = [int(x) for x in lead_player.participant.image_seq[block_index]] print(image_seq) player_no_list = [] chosen_player_list = [] revealed_player_list = [] trial_finished_list = [] for p in player.group.get_players(): chosen_player_list.extend([p.rewarded_no]) revealed_player_list.extend([p.revealed_no]) player_no_list.extend([p.player_no]) trial_finished_list.extend([p.trial_finished]) response = dict(msg_type = 'control', msg = 'initialization', player_id = player.id_in_group, player_no = player_no, img_01 = image_seq[0], img_02 = image_seq[1], img_03 = image_seq[2], img_04 = image_seq[3], condition = condition, round_number = player.round_number, revealed_player_list = revealed_player_list, player_no_list = player_no_list, chosen_player_list = chosen_player_list, trial_finished_list = trial_finished_list, ) print(response) return {player.id_in_group: response} if data['msg'] == 'trial_finished': player.CT = data['CT'] player.trial_finished = 2 all_finished = True for p in player.group.get_players(): if p.trial_finished != 2: all_finished = False if all_finished: # check if all players completed the trial player.group.num_finished = 0 return {0: dict(msg_type = 'control',msg = 'all_players_finished')} if data['type'] == 'data': player.rewarded_no = data['choice'] player.revealed_no = data['revealed'] player.RT = data['RT'] player.chosen_idx = data['chosen_index'] all_chosen = True print("player chosen number") for p in player.group.get_players(): print(p.rewarded_no) if p.rewarded_no == 0: all_chosen = False if all_chosen: chosen_player_list = [] player_no_list = [] bonus = 0 if condition == 1: bonus = C.BONUS for p in player.group.get_players(): chosen_player_list.extend([p.revealed_no]) player_no_list.extend([p.player_no]) for i_player in player.group.get_players(): for j_player in player.group.get_players(): if j_player.revealed_no == i_player.player_no: i_player.payoffcal += C.ENDOWMENT if j_player.player_no == i_player.revealed_no: i_player.payoffcal += bonus i_player.payoff = i_player.payoffcal response = dict(msg_type = 'data', stage = 'send', player_no_list = player_no_list, chosen_player_list = chosen_player_list); return {0: response} class WaitingTooLong(Page): def is_displayed(player): is_finished_after_waiting = len(player.group.get_players()) < C.PLAYERS_PER_GROUP return player.round_number == 1 and is_finished_after_waiting class Debrief(Page): form_model = 'player' form_fields = ['question_purpose','question_task'] def is_displayed(player): player.participant.finished = True # otree tells me to define this is_finished_after_waiting = len(player.group.get_players()) < C.PLAYERS_PER_GROUP return player.round_number == C.NUM_ROUNDS or (player.round_number == 1 and is_finished_after_waiting) or player.participant.is_dropout class TaskFinished(Page): def is_displayed(player): player.participant.finished = True # otree tells me to define this is_finished_after_waiting = len(player.group.get_players()) < C.PLAYERS_PER_GROUP return player.round_number == C.NUM_ROUNDS or (player.round_number == 1 and is_finished_after_waiting) or player.participant.is_dropout page_sequence = [ WaitingRoom, BlockDescription, SendTask, Debrief, TaskFinished ]