from otree.api import * import numpy as np import random import time doc = """ non_interactive_aggregation """ class C(BaseConstants): NAME_IN_URL = 'aggregation_tasks_treatmentN' PLAYERS_PER_GROUP = 3 BALLS_PER_URN = 100 NUM_ROUNDS = 2 INSTRUCTIONS_TEMPLATE = 'non_interactive_aggregation/instructions.html' class Subsession(BaseSubsession): pass class Group(BaseGroup): computer_choice = models.StringField() # game_iteration = models.IntegerField(initial = 1) # whose_turn = models.IntegerField(initial = 1) # game_over = models.BooleanField(initial = False) urn_left = models.StringField() urn_right = models.StringField() black_left = models.IntegerField() black_right = models.IntegerField() p = models.FloatField() startTime = models.IntegerField() timeSpent = models.IntegerField() def group_start(self): self.p = self.session.config["parameters"][str(self.round_number)]["p"] self.black_left = self.session.config["parameters"][str(self.round_number)]["black_num_left"] self.black_right = self.session.config["parameters"][str(self.round_number)]["black_num_right"] toss = random.randint(0, 1) if (toss == 1): self.p = round(1 - self.p, 2) self.black_left, self.black_right = self.black_right, self.black_left # Assign balls in the left urn list_left = ["black"] * self.black_left + ["red"] * (C.BALLS_PER_URN - self.black_left) list_left = np.random.permutation(list_left).tolist() self.urn_left = ",".join(list_left) # Assign balls in the right urn list_right = ["black"] * self.black_right + ["red"] * (C.BALLS_PER_URN - self.black_right) list_right = np.random.permutation(list_right).tolist() self.urn_right = ",".join(list_right) def get_urn_balls_dict(self, label): if label != "left" and label != "right": return dict() urn = self.urn_left if label == "left" else self.urn_right balls = urn.split(",") dict = {} for i,ball in enumerate(balls): dict[i] = ball return dict def report_history(self): dict = {} for p in self.get_players(): reports = p.report_dict() for i in range(1, self.session.config["stages"] + 1): cell_id = cell_id = "#" + "cell_" + str(i) + "_" + str(p.id_in_group) if i in reports: dict[cell_id] = reports[i] return dict class Player(BasePlayer): report_x = models.LongStringField(initial = "") report_y = models.LongStringField(initial = "") signal = models.StringField() score = models.FloatField() selected_task = models.IntegerField(initial = 0) game_iteration = models.IntegerField(initial = 1) def list_init(self, len): #初始化两个list report_list = [""] * len self.report_x = ",".join(report_list) self.report_y = ",".join(report_list) return def set_list_element(self, i, value_x, value_y): #index range[1, len + 1] report_list = self.report_x.split(",") report_list[i - 1] = str(value_x) self.report_x = ",".join(report_list) report_list = self.report_y.split(",") report_list[i - 1] = str(value_y) self.report_y = ",".join(report_list) return def get_list_element(self, i): #index range[1, len + 1] report_list = self.report_x.split(",") x = float(report_list[i - 1]) report_list = self.report_y.split(",") y = float(report_list[i - 1]) return x, y def report_dict(self): report_list_x = self.report_x.split(",") report_list_y = self.report_y.split(",") value = {} for i in range(0, self.session.config["stages"]): x = report_list_x[i] y = report_list_y[i] if x != "": value[i + 1] = dict(x = x, y = y) return value # PAGES class Ready(Page): @staticmethod def js_vars(player: Player): return dict(num_players = C.PLAYERS_PER_GROUP) class WaittoStart(WaitPage): @staticmethod def after_all_players_arrive(group: Group): group.group_start() class SettingUp(Page): @staticmethod def js_vars(player: Player): return dict(task_number = player.round_number, p = player.group.p, pR = round(1 - player.group.p, 2), black_num = {"Left" : player.group.black_left, "Right": player.group.black_right}, urn_balls = {"Left" : player.group.get_urn_balls_dict("left"), "Right": player.group.get_urn_balls_dict("right")}) class WaittoReport(WaitPage): @staticmethod def after_all_players_arrive(group: Group): p = group.p group.computer_choice = random.choices(["left", "right"], [p, 1-p], k = 1)[0] nB = group.black_left if group.computer_choice == 0 else group.black_right nR = C.BALLS_PER_URN - nB for player in group.get_players(): player.signal = random.choices(["black", "red"], [nB, nR], k = 1)[0] player.list_init(group.session.config["stages"]) group.startTime = int(time.time()) class Report(Page): @staticmethod def js_vars(player: Player): return dict(my_id = player.id_in_group, task_number = player.round_number, round = player.session.config["stages"], game_iteration = player.game_iteration, num_players = C.PLAYERS_PER_GROUP, signal = player.signal, p = player.group.p, pR = round(1 - player.group.p, 2), black_num = {"Left" : player.group.black_left, "Right": player.group.black_right}, urn_balls = {"Left" : player.group.get_urn_balls_dict("left"), "Right": player.group.get_urn_balls_dict("right")}, computer_choice = player.group.computer_choice, table_content = player.group.report_history(), paraAlpha = player.session.config["para_alpha"], paraBeta = player.session.config["para_beta"]) @staticmethod def live_method(player: Player, report): my_id = player.id_in_group input_hint = None x, y = report["x"], report["y"] box_clear = {'report_x': True, 'report_y': True} # clear both boxes if (x != None and 0 <= x and x <= 1 and y != None and 0 <= y and y <= 1): player.set_list_element(player.game_iteration, x, y) news = dict(iteration = player.game_iteration, player_id = my_id, report = report, cell_id = "#" + "cell_" + str(player.game_iteration) + "_" + str(my_id)) player.game_iteration = player.game_iteration + 1 else: news = None if (x == None): input_hint = "请输入一阶预测。" box_clear["report_y"] = False elif (0 > x or x > 1): input_hint = "一阶预测应当是0到1之间的小数。" box_clear["report_y"] = False elif (y == None): input_hint = "请输入二阶预测。" box_clear["report_x"] = False elif (0 > y or y > 1): input_hint = "二阶预测应当是0到1之间的小数。" box_clear["report_x"] = False return { my_id: dict( game_iteration = player.game_iteration, news = news, input_hint = input_hint, box_clear = box_clear ) } @staticmethod def before_next_page(player: Player, timeout_happened): if player.id_in_group == 1: group = player.group group.timeSpent = int(time.time()) - group.startTime class Waiting(WaitPage): pass class Score(Page): @staticmethod def is_displayed(player: Player): if (player.round_number == C.NUM_ROUNDS): if (player.selected_task == 0): player.selected_task = random.randrange(0, C.NUM_ROUNDS) + 1 return player.round_number == C.NUM_ROUNDS @staticmethod def js_vars(player: Player): mplayer = player.in_round(player.selected_task) group = mplayer.group gold = 1 if group.computer_choice == "left" else 0 value = 0 ave_dict = {} score = {} for i in range(0, mplayer.session.config["stages"]): xi, yi = mplayer.get_list_element(i + 1) sum_x = 0 for p in mplayer.get_others_in_group(): xj, yj = p.get_list_element(i + 1) sum_x = sum_x + xj ave_x = round(sum_x / (C.PLAYERS_PER_GROUP - 1), 2) ave_dict["#aveCell_" + str(i + 1) + "_" + str(mplayer.id_in_group)] = ave_x score["score_" + str(i + 1)] = round(mplayer.session.config["para_alpha"] * round(1 - (xi - gold) ** 2, 4) + mplayer.session.config["para_beta"] * round(1 - (yi - ave_x) ** 2, 4), 5) #score["score_" + str(i + 1)] = round(round(1 - (xi - gold) ** 2, 4) + mplayer.session.config["para_beta"] * round(mplayer.session.config["para_c"] - (yi - ave_x) ** 2, 4), 5) #score["score_" + str(i + 1)] = round(round(1 - (xi - gold) ** 2, 4) + mplayer.session.config["para_c"] - mplayer.session.config["para_beta"] * round((yi - ave_x) ** 2, 4), 5) value = round(value + score["score_" + str(i + 1)], 4) return dict(total_score = value, score = score, #paraC = mplayer.session.config["para_c"], paraAlpha = mplayer.session.config["para_alpha"], paraBeta = mplayer.session.config["para_beta"], computer_choice = group.computer_choice, task_number = mplayer.round_number, round = mplayer.session.config["stages"], my_id = mplayer.id_in_group, p = mplayer.group.p, pR = round(1 - mplayer.group.p, 2), black_num = {"Left" : group.black_left, "Right": mplayer.group.black_right}, urn_balls = {"Left" : group.get_urn_balls_dict("left"), "Right": group.get_urn_balls_dict("right")}, table_content = group.report_history(), ave_dict = ave_dict, ) page_sequence = [WaittoStart, SettingUp, WaittoReport, Report, Waiting, Score]