import math from otree.api import * import json import random import os import numpy as np from scipy import datasets from .calc_best_fit import fit_best_curve from .calc_payment import calc_payment_MSE_data, calc_payment_MSE, heatmaps, heatmaps_data from .clean_annotations import process_svg_path # ------------------- # Constants # ------------------- class C(BaseConstants): NAME_IN_URL = 'annotation' PLAYERS_PER_GROUP = None NUM_ROUNDS = 500 FULLDATA = [] PAYDATA = [] MINS = 0 MAXS = 10 num_datasets = 9 num_draws = [5, 10, 50] for j in range(num_datasets): datalist = [] filename = f'normalizeddatasetwminmax_{j+1}.txt' data_path = os.path.join(os.path.dirname(__file__), filename) with open(data_path, 'r') as file: for line in file: y, x = map(float, line.strip().split(',')) datalist.append([x, y]) FULLDATA.append(datalist) # ------------------- # Group / Player / Subsession # ------------------- class Group(BaseGroup): pass class Player(BasePlayer): dataset_label = models.StringField(blank=True) effectiveround = models.IntegerField(initial=1) datapoints = models.LongStringField() annotations = models.LongStringField(export=False) selected_degree = models.IntegerField() selected_poly = models.StringField() advance_dataset = models.BooleanField(blank=True) score_mse = models.FloatField() indices_9 = models.StringField(blank=True) wta_5 = models.FloatField(min=0, max=10) wta_10 = models.FloatField(min=0, max=10) wta_50 = models.FloatField(min=0, max=10) score_y5 = models.FloatField(blank=True) score_y10 = models.FloatField(blank=True) score_y50 = models.FloatField(blank=True) comp_attempts1 = models.IntegerField(initial=1) comp_answer1a = models.BooleanField( choices=[ [True, 'True'], [False, 'False'], ], widget=widgets.RadioSelect ) comp_answer1b = models.BooleanField( choices=[ [True, 'True'], [False, 'False'],], widget=widgets.RadioSelect ) comp_answer1c = models.BooleanField( choices=[ [True, 'True'], [False, 'False']], widget=widgets.RadioSelect ) comp_attempts2 = models.IntegerField(initial=1) comp_answer2a = models.BooleanField( choices=[ [True, 'True'], [False, 'False'], ], widget=widgets.RadioSelect ) comp_answer2b = models.BooleanField( choices=[ [True, 'True'], [False, 'False'],], widget=widgets.RadioSelect ) comp_answer2c = models.BooleanField( choices=[ [True, 'True'], [False, 'False']], widget=widgets.RadioSelect ) class Subsession(BaseSubsession): pass # ------------------- # Extra Models # ------------------- class Annotation(ExtraModel): player = models.Link(Player) type = models.StringField() path = models.LongStringField(null=True, blank=True) x = models.FloatField(null=True, blank=True) y = models.FloatField(null=True, blank=True) class AnnotationData(ExtraModel): player = models.Link(Player) annotations = models.LongStringField() # raw JSON coordinates = models.LongStringField() # normalized class PolynomialData(ExtraModel): player = models.Link(Player) coeffs = models.LongStringField() # JSON: {degree: coeffs} heatmaps = models.LongStringField() # JSON: {degree: heatmap matrix} # ------------------- # Session creation # ------------------- def creating_session(subsession): test = subsession.session.config.get('test', False) num_datasets = 1 if test else C.num_datasets for p in subsession.get_players(): if p.round_number == 1: p.effectiveround = 1 if subsession.round_number == 1: p.participant.vars['num_tasks'] = num_datasets * len(C.num_draws) * 2 p.participant.vars['divisor'] = subsession.session.config.get('divisor', 20) participant = p.participant datasets = [] paydata = [] dataset_labels = [] for _ in range(2): # two rounds of 27 temp_datasets = [] temp_paydata = [] temp_dataset_labels = [] for j in range(num_datasets): for draw in C.num_draws: sampled = random.sample(C.FULLDATA[j], draw) temp_datasets.append(sampled) complement = [x for x in C.FULLDATA[j] if x not in sampled] temp_paydata.append(complement) temp_dataset_labels.append([j+1, draw]) # Shuffle the order of this batch combined = list(zip(temp_datasets, temp_paydata, temp_dataset_labels)) random.shuffle(combined) batch_datasets, batch_paydata, batch_labels = zip(*combined) datasets.extend(batch_datasets) paydata.extend(batch_paydata) dataset_labels.extend(batch_labels) which_data9 = [] for i in range(len(C.num_draws)): which_data9.append(random.choice([0, 1])) participant.vars["which_data9"] = list(which_data9) # Save as flat lists in participant.vars participant.vars["datasets"] = list(datasets) participant.vars["paydata"] = list(paydata) participant.vars["dataset_labels"] = list(dataset_labels) # print(f"Participant datasets initialized: {len(participant.vars['datasets'])} total") # ------------------- # Pages # ------------------- class Annotate(Page): form_model = 'player' form_fields = ['annotations'] @staticmethod def is_displayed(player: Player): return player.effectiveround <= player.participant.vars['num_tasks'] @staticmethod def vars_for_template(player: Player): all_datasets = player.participant.vars["datasets"] data_list = all_datasets[player.effectiveround - 1] min_x, min_y = C.MINS, C.MINS max_x, max_y = C.MAXS, C.MAXS return { "data_list": data_list, "min_x": min_x, "max_x": max_x, "min_y": min_y, "max_y": max_y, "round_number": player.effectiveround, } @staticmethod def before_next_page(player: Player, timeout_happened): if player.effectiveround == 1: for annotation in Annotation.filter(player=player): annotation.delete() for ad in AnnotationData.filter(player=player): ad.delete() annos = json.loads(player.annotations) for anno in annos: target = anno.get('target', {}) selector = target.get('selector', {}) selector_value = selector.get('value', '') if 'path d="' in selector_value: path_data = selector_value.split('path d="')[1].split('"')[0] Annotation.create(player=player, path=path_data, type='path') elif 'xywh=pixel:' in selector_value: parts = selector_value.split('xywh=pixel:')[1].split(',') if len(parts) >= 2: x_data = float(parts[0]) y_data = float(parts[1]) Annotation.create(player=player, x=x_data, y=y_data, type='point') normalized_coordinates = process_svg_path( player.annotations, C.MINS, C.MAXS, C.MINS, C.MAXS ) AnnotationData.create( player=player, annotations=player.annotations, coordinates=json.dumps(normalized_coordinates), ) # player.coordinates = json.dumps(normalized_coordinates) player.dataset_label = json.dumps(player.participant.vars["dataset_labels"][player.effectiveround - 1]) player.datapoints = json.dumps(player.participant.vars["datasets"][player.effectiveround - 1]) print(f"This is player.dataset_label: {player.dataset_label}") class FitPolynomial(Page): timeout_seconds = 0.1 @staticmethod def is_displayed(player: Player): return player.effectiveround <= player.participant.vars['num_tasks'] @staticmethod def before_next_page(player: Player, timeout_happened): records = AnnotationData.filter(player=player) if not records: return ad = records[-1] coordinates = json.loads(ad.coordinates or "[]") all_coeff_dict = {} heatmap_dict = {} for degree in range(21): coefficients_array = fit_best_curve(coordinates, degree) all_coeff_dict[str(degree)] = coefficients_array.tolist() poly = np.poly1d(coefficients_array) heatmap_dict[str(degree)] = heatmaps(poly, player.participant.vars['divisor']) heatmap_dict[21] = heatmaps_data(coordinates, player.participant.vars['divisor']) PolynomialData.create( player=player, coeffs=json.dumps(all_coeff_dict), heatmaps=json.dumps(heatmap_dict), ) class Plot(Page): form_model = 'player' form_fields = ['selected_degree', 'advance_dataset'] @staticmethod def is_displayed(player: Player): return player.effectiveround <= player.participant.vars['num_tasks'] @staticmethod def vars_for_template(player: Player): all_datasets = player.participant.vars["datasets"] data_list = all_datasets[player.effectiveround - 1] records = AnnotationData.filter(player=player) if not records: return ad = records[-1] records = PolynomialData.filter(player=player) if not records: return pd = records[-1] return { "all_coefficients_json": pd.coeffs, "heatmap_data": pd.heatmaps, "min_x": C.MINS, "max_x": C.MAXS, "min_y": C.MINS, "max_y": C.MAXS, "data_list": data_list, "player_coordinates": ad.coordinates, "round_number": player.effectiveround, } @staticmethod def before_next_page(player: Player, timeout_happened): if player.round_number < C.NUM_ROUNDS: next_player = player.in_round(player.round_number + 1) if player.advance_dataset == 1: if player.effectiveround < player.participant.vars['num_tasks']: next_player.effectiveround = player.effectiveround + 1 else: for i in range(player.round_number + 1, C.NUM_ROUNDS + 1): next_player = player.in_round(i) next_player.effectiveround = player.effectiveround + 1 else: next_player.effectiveround = player.effectiveround records = PolynomialData.filter(player=player) if not records: return pd = records[-1] if player.selected_degree <= 20: player.selected_poly = json.dumps(json.loads(pd.coeffs)[str(player.selected_degree)]) else: player.selected_poly = json.dumps(json.loads(pd.coeffs)[str(20)]) class CalcPayment(Page): timeout_seconds = 0.1 @staticmethod def is_displayed(player: Player): return ( player.field_maybe_none("advance_dataset") == 1 and player.effectiveround <= player.participant.vars['num_tasks'] ) @staticmethod def before_next_page(player: Player, timeout_happened): # get the latest PolynomialData entry for this player records = PolynomialData.filter(player=player) if not records: return pd = records[-1] records = AnnotationData.filter(player=player) if not records: return ad = records[-1] all_coefficients = json.loads(pd.coeffs) selected_degree = player.selected_degree data = player.participant.vars["paydata"][player.effectiveround - 1] if player.selected_degree <= 20: # extract the selected degree's coefficients if player.selected_degree <= 20: selected_coeffs = all_coefficients[str(selected_degree)] polynomial = np.poly1d(selected_coeffs) player.score_mse = float(calc_payment_MSE(polynomial, data, player.participant.vars['divisor'])) else: annotations = ad.coordinates player.score_mse = float(calc_payment_MSE_data(annotations, data, player.participant.vars['divisor'])) current_scores = player.participant.vars.get("scores", []) current_scores.append([player.score_mse, player.effectiveround]) player.participant.vars["scores"] = list(current_scores) # print(f"Participant scores updated: {player.participant.vars['scores']}") class ExtraDecisionInstructions(Page): @staticmethod def is_displayed(player: Player): return ( player.field_maybe_none("advance_dataset") == 1 and player.effectiveround >= player.participant.vars['num_tasks'] ) class CompQ1(Page): form_model = 'player' form_fields = ['comp_answer1a', 'comp_answer1b', 'comp_answer1c'] @staticmethod def is_displayed(player: Player): return ( player.field_maybe_none("advance_dataset") == 1 and player.effectiveround >= player.participant.vars['num_tasks'] ) @staticmethod def error_message(player, values): # If any answer is False, return a single error for the page if not (values.get('comp_answer1a') and values.get('comp_answer1b') and values.get('comp_answer1c')): player.comp_attempts1 += 1 return 'At least one of your answers is incorrect. Please try again.' class CompQ2(Page): form_model = 'player' form_fields = ['comp_answer2a', 'comp_answer2b', 'comp_answer2c'] @staticmethod def is_displayed(player: Player): return ( player.field_maybe_none("advance_dataset") == 1 and player.effectiveround >= player.participant.vars['num_tasks'] ) @staticmethod def error_message(player, values): # If any answer is False, return a single error for the page if not (values.get('comp_answer2a') and values.get('comp_answer2b') and values.get('comp_answer2c')): player.comp_attempts2 += 1 return 'At least one of your answers is incorrect. Please try again.' class ExtraTask(Page): form_model = 'player' form_fields = ['wta_5', 'wta_10', 'wta_50'] @staticmethod def is_displayed(player: Player): return ( player.field_maybe_none("advance_dataset") == 1 and player.effectiveround >= player.participant.vars['num_tasks'] ) @staticmethod def vars_for_template(player: Player): test = player.session.config.get('test', False) records = AnnotationData.filter(player=player) if not records: return data9 = player.participant.vars["which_data9"] dataset_labels = player.participant.vars["dataset_labels"] datasets = player.participant.vars["datasets"] paydata = player.participant.vars["paydata"] data9_datasets = [] data9_bestfit = [] player_polys = [] player_coords = [] indices9 = [] score_y_list = [] # Set values you're looping over (order corresponds to positions in which_data9) set_values = C.num_draws for i, set_val in enumerate(set_values): # Find indices in dataset_labels where (group == 1 and val == set_val) if test == True: indices = [idx for idx, (group, val) in enumerate(dataset_labels) if group == 1 and val == set_val] else: indices = [idx for idx, (group, val) in enumerate(dataset_labels) if group == 9 and val == set_val] if len(indices) < 2: print(f"⚠️ Not enough 9s for set {set_val}: found {len(indices)}") continue # Pick first or second instance based on which_data9 chosen_idx = indices[data9[i]] indices9.append(chosen_idx+1) linear = fit_best_curve(datasets[chosen_idx], 0) polynomial = np.poly1d(linear) score_y_list.append(float(calc_payment_MSE(polynomial, paydata[chosen_idx], player.participant.vars['divisor']))) if isinstance(linear, np.ndarray): linear = linear.tolist() data9_datasets.append(datasets[chosen_idx]) data9_bestfit.append(linear) # Collect matching round objects, not numbers matching_rounds = [ r for r in player.in_all_rounds() if getattr(r, "effectiveround", None) == chosen_idx + 1 ] # Take the last one (most recent) matching_round_player = matching_rounds[-1] if matching_rounds else None # indices9.append(matching_round_player.round_number) # print(f"matching round player is: {matching_round_player}") if matching_round_player: selected_degree = getattr(matching_round_player, "selected_degree", 0) # default 0 if selected_degree < 21: # print(f"selected degree is: {selected_degree} calculating poly") player_polys.append(json.loads(matching_round_player.selected_poly)) player_coords.append([0]) else: # print(f"selected degree is: {selected_degree} appending coordinates") player_polys.append([0]) annotations = AnnotationData.filter(player=matching_round_player) last_annotation = annotations[-1] player_coords.append(json.loads(last_annotation.coordinates)) # print(f"player_polys: {player_polys}, player_coords: {player_coords}") else: # fallback if no matching round found player_polys.append([0]) player_coords.append([0]) player.indices_9 = json.dumps(indices9) print(f"indices9: {indices9}") player.score_y5 = score_y_list[0] player.score_y10 = score_y_list[1] player.score_y50 = score_y_list[2] return { "data9_dataset1": data9_datasets[0], "data9_dataset2": data9_datasets[1], "data9_dataset3": data9_datasets[2], "data9_bestfit1": data9_bestfit[0], "data9_bestfit2": data9_bestfit[1], "data9_bestfit3": data9_bestfit[2], "player_poly1": player_polys[0], "player_poly2": player_polys[1], "player_poly3": player_polys[2], "player_coords1": player_coords[0], "player_coords2": player_coords[1], "player_coords3": player_coords[2], "min_x": C.MINS, "max_x": C.MAXS, "min_y": C.MINS, "max_y": C.MAXS, } class DeterminePayment(Page): timeout_seconds = 0.1 @staticmethod def is_displayed(player: Player): return ( player.field_maybe_none("advance_dataset") == 1 and player.effectiveround >= player.participant.vars['num_tasks'] ) @staticmethod def before_next_page(player: Player,timeout_happened): bdm = random.uniform(0, 10) round_to_pay = random.choice([5, 10, 50]) scores = player.participant.vars["scores"] k = random.randint(1, len(scores)) chosen_player = next((s for s in scores if s[1] == k), None) indices9 = json.loads(player.indices_9) if k in indices9: task_to_pay = "extra_task" wta_field = f"wta_{round_to_pay}" score_y_field = f"score_y{round_to_pay}" wta = getattr(player, wta_field) if wta > bdm: score = chosen_player[0] else: score = getattr(player, score_y_field) else: task_to_pay = "annotation" score = chosen_player[0] wta = 0 player.participant.vars["paid_round"] = chosen_player[1] player.participant.vars["task_to_pay"] = task_to_pay player.participant.vars["bdm"] = round(bdm, 2) player.participant.vars["wta"] = round(wta, 2) player.participant.vars["score"] = round(score, 2) player.participant.vars["round_to_pay"] = round_to_pay class ShowPayment(Page): @staticmethod def is_displayed(player: Player): return ( player.field_maybe_none("advance_dataset") == 1 and player.effectiveround >= player.participant.vars['num_tasks'] ) @staticmethod def vars_for_template(player: Player): if player.participant.vars["task_to_pay"] == "annotation": final_payment = math.ceil(player.participant.vars["score"]+10) else: if player.participant.vars["bdm"] >= player.participant.vars["wta"]: final_payment = math.ceil(player.participant.vars["score"] + 10 + player.participant.vars["bdm"]) else: final_payment = math.ceil(player.participant.vars["score"]+10) player.payoff = final_payment return { "paid_round": player.participant.vars["paid_round"], "task_to_pay": player.participant.vars["task_to_pay"], "bdm": player.participant.vars["bdm"], "wta": player.participant.vars["wta"], "score": player.participant.vars["score"], "round_to_pay": player.participant.vars["round_to_pay"], "final_payment": final_payment, } # ------------------- # Page sequence # ------------------- page_sequence = [Annotate, FitPolynomial, Plot, CalcPayment, ExtraDecisionInstructions, CompQ1, CompQ2, ExtraTask, DeterminePayment, ShowPayment]