from otree.api import * import json import random import os import numpy as np from .calc_best_fit import fit_best_curve from .calc_payment import heatmaps, heatmaps_data from .clean_annotations import process_svg_path # ------------------- # Constants # ------------------- class C(BaseConstants): NAME_IN_URL = 'practice' PLAYERS_PER_GROUP = None NUM_ROUNDS = 10 FULLDATA = [] MINS = 0 MAXS = 10 num_datasets = 9 num_tasks = 1 datalist = [] # Load all 9 normalized datasets for j in range(num_datasets): datalist = [] filename = f'normalizeddatasetwminmax_{j+1}.txt' data_path = os.path.join(os.path.dirname(__file__), filename) with open(data_path, 'r') as file: for line in file: y, x = map(float, line.strip().split(',')) datalist.append([x, y]) FULLDATA.append(datalist) # ------------------- # Group / Player / Subsession # ------------------- class Group(BaseGroup): pass class Player(BasePlayer): dataset_label = models.StringField(blank=True) effectiveround = models.IntegerField(initial=1) annotations = models.LongStringField(export=False) selected_degree = models.IntegerField() selected_poly = models.StringField() advance_dataset = models.BooleanField(blank=True) score_mse = models.FloatField() practice_password = models.StringField(blank=True,label="Password") class Subsession(BaseSubsession): pass # ------------------- # Extra Models # ------------------- class Annotation(ExtraModel): player = models.Link(Player) type = models.StringField() path = models.LongStringField(null=True, blank=True) x = models.FloatField(null=True, blank=True) y = models.FloatField(null=True, blank=True) class AnnotationData(ExtraModel): player = models.Link(Player) annotations = models.LongStringField() # raw JSON coordinates = models.LongStringField() # normalized class PolynomialData(ExtraModel): player = models.Link(Player) coeffs = models.LongStringField() # JSON: {degree: coeffs} heatmaps = models.LongStringField() # JSON: {degree: heatmap matrix} # ------------------- # Session creation # ------------------- def creating_session(subsession): state = random.getstate() random.seed(3345) for p in subsession.get_players(): if subsession.round_number == 1: p.participant.vars['divisor'] = subsession.session.config.get('divisor', 20) # Sample 1 point from each of the 9 datasets practice_dataset = [random.choice(C.FULLDATA[j]) for j in range(C.num_datasets)] which_extra = random.randint(0, C.num_datasets - 1) practice_dataset.append(random.choice(C.FULLDATA[which_extra])) # store at session level subsession.session.vars['practice_dataset'] = practice_dataset random.setstate(state) # ------------------- # Pages # ------------------- class Annotate(Page): form_model = 'player' form_fields = ['annotations'] @staticmethod def is_displayed(player: Player): return player.effectiveround <= C.num_tasks @staticmethod def vars_for_template(player: Player): data_list = player.subsession.session.vars['practice_dataset'] min_x, min_y = C.MINS, C.MINS max_x, max_y = C.MAXS, C.MAXS return { "data_list": data_list, "min_x": min_x, "max_x": max_x, "min_y": min_y, "max_y": max_y, "round_number": player.effectiveround, } @staticmethod def before_next_page(player: Player, timeout_happened): if player.effectiveround == 1: for annotation in Annotation.filter(player=player): annotation.delete() for ad in AnnotationData.filter(player=player): ad.delete() annos = json.loads(player.annotations) for anno in annos: target = anno.get('target', {}) selector = target.get('selector', {}) selector_value = selector.get('value', '') if 'path d="' in selector_value: path_data = selector_value.split('path d="')[1].split('"')[0] Annotation.create(player=player, path=path_data, type='path') elif 'xywh=pixel:' in selector_value: parts = selector_value.split('xywh=pixel:')[1].split(',') if len(parts) >= 2: x_data = float(parts[0]) y_data = float(parts[1]) Annotation.create(player=player, x=x_data, y=y_data, type='point') normalized_coordinates = process_svg_path( player.annotations, C.MINS, C.MAXS, C.MINS, C.MAXS ) AnnotationData.create( player=player, annotations=player.annotations, coordinates=json.dumps(normalized_coordinates), ) class FitPolynomial(Page): timeout_seconds = 0.1 @staticmethod def is_displayed(player: Player): return player.effectiveround <= C.num_tasks @staticmethod def before_next_page(player: Player, timeout_happened): records = AnnotationData.filter(player=player) if not records: return ad = records[-1] coordinates = json.loads(ad.coordinates or "[]") all_coeff_dict = {} heatmap_dict = {} for degree in range(21): coefficients_array = fit_best_curve(coordinates, degree) all_coeff_dict[str(degree)] = coefficients_array.tolist() poly = np.poly1d(coefficients_array) heatmap_dict[str(degree)] = heatmaps(poly, player.participant.vars['divisor']) heatmap_dict[21] = heatmaps_data(coordinates, player.participant.vars['divisor']) PolynomialData.create( player=player, coeffs=json.dumps(all_coeff_dict), heatmaps=json.dumps(heatmap_dict), ) class Plot(Page): form_model = 'player' form_fields = ['selected_degree', 'advance_dataset'] @staticmethod def is_displayed(player: Player): return player.effectiveround <= C.num_tasks @staticmethod def vars_for_template(player: Player): data_list = player.subsession.session.vars['practice_dataset'] records = AnnotationData.filter(player=player) if not records: return ad = records[-1] records = PolynomialData.filter(player=player) if not records: return pd = records[-1] return { "all_coefficients_json": pd.coeffs, "heatmap_data": pd.heatmaps, "min_x": C.MINS, "max_x": C.MAXS, "min_y": C.MINS, "max_y": C.MAXS, "data_list": data_list, "player_coordinates": ad.coordinates, "round_number": player.effectiveround, } @staticmethod def before_next_page(player: Player, timeout_happened): if player.round_number < C.NUM_ROUNDS: next_player = player.in_round(player.round_number + 1) if player.advance_dataset == 1: if player.effectiveround < C.num_tasks: next_player.effectiveround = player.effectiveround + 1 else: for i in range(player.round_number + 1, C.NUM_ROUNDS + 1): next_player = player.in_round(i) next_player.effectiveround = player.effectiveround + 1 # next_player.advance_dataset = player.advance_dataset else: next_player.effectiveround = player.effectiveround class InstructionsEnd(Page): form_model = 'player' form_fields = ['practice_password'] @staticmethod def is_displayed(player: Player): return (player.round_number == 10) @staticmethod def error_message(player, values): if not (values['practice_password'] == 'econlab'): return 'Please enter the correct code to proceed' # ------------------- # Page sequence # ------------------- page_sequence = [Annotate, FitPolynomial, Plot, InstructionsEnd]