import random import json from json import JSONDecodeError import time from otree import settings from otree.api import * from . import task_matrix from .image_utils import encode_image from datetime import datetime doc = """ Social Proximity Combined """ ### EXTRA MODEL ### class NumberSequence(ExtraModel): question = models.StringField() solution = models.StringField() type = models.BooleanField() class Analogy(ExtraModel): question = models.StringField() optionA = models.StringField() optionB = models.StringField() optionC = models.StringField() optionD = models.StringField() optionE = models.StringField() solution = models.StringField() ### Loading Extra Model ### def load_analogy(filepath='analogy.csv'): rows = read_csv(__name__ + '/' + filepath, Analogy) for i, row in enumerate(rows): row['number'] = i+1 return rows def load_number_sequence(filepath='nums.csv'): rows = read_csv(__name__ + '/' + filepath, NumberSequence) for i, row in enumerate(rows): row['number'] = i+1 return rows def generate_order_for_tasks(player): maximum_length = min(len(C.ANALOGY_LIST), len(C.NUM_SEQUENCE_LIST)) indices_seller = list(range(0, maximum_length)) ## only 37 in the num_sequence file indices_customer = list(range(0, maximum_length)) random.shuffle(indices_customer) random.shuffle(indices_seller) player.participant.social_proximity_order_customer = indices_customer player.participant.social_proximity_order_seller = indices_seller ''' HELPER ''' ## Numberseq & Analogy Order per Player and Type def get_number_for_player(player, customer, stage=1): number = player.round_number*2 - 3 + stage ## stage sollte 1 oder 2 sein if customer: return player.participant.social_proximity_order_customer[number] return player.participant.social_proximity_order_seller[number] ## Matching ## 7er class C(BaseConstants): NAME_IN_URL = 'social_proximity' PLAYERS_PER_GROUP = None ROUNDS_PART1 = 3 ROUNDS_PART2_RANDOM = 2 ROUNDS_PART2_CHOICE = 5 NUM_ROUNDS = ROUNDS_PART1 + ROUNDS_PART2_RANDOM + ROUNDS_PART2_CHOICE ## EASIER TO READ START_PART2 = ROUNDS_PART1 START_PART2_CHOICE = START_PART2 + ROUNDS_PART2_RANDOM ## Same Pages for Analogy/7er & Numbersequence PAGE_ANALOGY = "social_proximity/Analogy.html" PAGE_NUMSEQ = "social_proximity/NumSeq.html" PAGE_7Task = "social_proximity/7Task.html" ## LOAD Models ANALOGY_LIST = load_analogy() NUM_SEQUENCE_LIST = load_number_sequence() ## Controlvariables AMOUNT_NUMBER_GENERATOR = 18 AMOUNT_NUMBERS_COMPLETE = 2 * AMOUNT_NUMBER_GENERATOR ANALOGY_POINTS = 2 NUMBER_SEQ_POINTS = 2 INSTRUCTION_READ_TIME = 120 ##Puzzle specifics TEXT_SIZE = 20 DEVIATION_GROUP = 4 ## Deviation for Group stuff NO_SUBMIT_PENALTY = DEVIATION_GROUP * 5 PAYOFF_PER_PUZZLE = cu(5) PAYOFF_PER_PUZZLE_SP1 = cu(4) PAYOFF_PER_PUZZLE_SP2 = cu(3) TIME_PER_PUZZLE = 120 HEIGHT = 15 WIDTH = 20 IGNORED_CHARS = "012345689" COUNTED_CHARS = "7" AMOUNT_OF_COUNTED_CHARS = 1 AMOUNT_EMPTY_FIELDS = 0 ## Timeout basierte TIMEOUT_GAME = 60 TIMER_TEXT = "Verbleibende Zeit:" class Subsession(BaseSubsession): pass ## Creating Session def creating_session(subsession: Subsession): ## Puzzle Session Dic session = subsession.session defaults = dict( retry_delay=1.0, puzzle_delay=1.0, attempts_per_puzzle=1, second_time_counting=False ) session.params = {} for param in defaults: session.params[param] = session.config.get(param, defaults[param]) ## Order for NumSeq & Analogy for player in subsession.get_players(): generate_order_for_tasks(player) class Group(BaseGroup): ## 7er solution = models.IntegerField(initial=0) response = models.IntegerField(initial=0) is_correct = models.BooleanField(initial=False) solo_play = models.BooleanField(initial=False) class Player(BasePlayer): response_games = models.LongStringField() points_analogy = models.IntegerField(initial=0) points_number_seq = models.IntegerField(initial=0) #7er iteration = models.IntegerField(initial=0) num_trials = models.IntegerField(initial=0) num_correct = models.IntegerField(initial=0) num_failed = models.IntegerField(initial=0) response = models.IntegerField(initial=-1) solution = models.IntegerField(initial=0) is_correct = models.BooleanField(initial=False, doc='Nur für 1x Zählen und Alleine Zählen aussagekräftig') ## nur für solospiele ### Extra Model (linked) class Puzzle(ExtraModel): player = models.Link(Player) iteration = models.IntegerField(initial=0) attempts = models.IntegerField(initial=0) timestamp = models.FloatField(initial=0) text = models.LongStringField() # entw. Text oder Json (wie in Slider) solution = models.LongStringField() # genauso wie text response = models.LongStringField() response_timestamp = models.FloatField() is_correct = models.BooleanField() ## Helper (linked) def check_for_correct_answer(player, game_object, response_games): try: resp = json.loads(response_games) print(resp , game_object) return resp['answer'] == game_object['solution'] except JSONDecodeError: return False def gen_puzzle(player: Player) -> Puzzle: """ generate new puzzle for a player""" ## ignored_chars = player.session.config['ignored_chars'] if 'ignored_chars' in player.session.config else C.IGNORED_CHARS counted_char = player.session.config['counted_char'] if 'counted_char' in player.session.config else C.COUNTED_CHARS width = player.session.config['width'] if 'width' in player.session.config else C.WIDTH height = player.session.config['height'] if 'height' in player.session.config else C.HEIGHT text_size = player.session.config['text_size'] if 'text_size' in player.session.config else C.TEXT_SIZE amount_of_counted_chars = player.session.config['amount_of_counted_chars'] if 'amount_of_counted_chars' in player.session.config else C.AMOUNT_OF_COUNTED_CHARS fields = task_matrix.generate_puzzle_fields(ignored_chars=ignored_chars, counted_char=counted_char, width=width, height=height, text_size= text_size, amount_of_counted_chars=amount_of_counted_chars, amount_empty_fields=C.AMOUNT_EMPTY_FIELDS ) ## Schriftgröße kann hier mitgegeben werden player.iteration += 1 return Puzzle.create( player=player, iteration=player.iteration, timestamp=time.time(), **fields # text & solution ) def get_cur_puzzle(player: Player): puzzles = Puzzle.filter(player=player, iteration=player.iteration) if puzzles: [puzzle] = puzzles return puzzle def enc_puzzle(puzzle: Puzzle): image = task_matrix.render_image(puzzle) return dict(image=encode_image(image)) def get_progess(player:Player): return dict( num_trials= player.num_trials, num_correct = player.num_correct, num_incorrect = player.num_failed, iteration = player.iteration ) def play_game(player: Player, msg: dict): ## siehe slider session = player.session my_id = player.id_in_group params = session.params now = time.time() current = get_cur_puzzle(player) msg_type = msg['type'] if msg_type == 'load': prog = get_progess(player) if current: return { my_id: dict(type='status',progress=prog, puzzle=enc_puzzle(current)) } else: return {my_id: dict(type='status',progress=prog)} if msg_type == 'cheat' and settings.DEBUG: return {my_id: dict(type='solution', solution=current.solution)} if msg_type == 'next': if current is not None: if current.response is None: raise RuntimeError('trying to skip over unsolved puzzle') if now < current.timestamp + params["puzzle_delay"]: raise RuntimeError('retrying too fast') return {my_id: dict(type='status', progress=get_progess(player), iteration_left=0)} puzz = gen_puzzle(player) prog = get_progess(player) return {my_id: dict(type='puzzle', puzzle=enc_puzzle(puzz), progress=prog)} if msg_type == 'answer': if current is None: raise RuntimeError('trying to answer no puzzle') if current.response is not None: # if current.attempts >= params['attempts_per_puzzle']: # raise RuntimeError('no more attempts allowed') # herausgenommen da jeder ja nur einen Versuch hat # if now < current.response_timestamp + params['retry_delay']: # raise RuntimeError('retrying too fast') # herausgenommen, da die Zahl submittet wird, wenn gewünscht dann retry hoch player.num_trials -= 1 if current.is_correct: player.num_correct -= 1 else: player.num_failed -= 1 answer = msg['answer'] if answer == '' or answer is None: raise ValueError('Answer is empty ') current.response = answer current.is_correct = task_matrix.is_correct(answer, current) player.solution = int(current.solution) current.response_timestamp = now current.attempts +=1 if current.is_correct: player.num_correct += 1 else: player.num_failed +=1 player.num_trials += 1 player.response = int(answer) tries_left = params['attempts_per_puzzle'] - current.attempts prog = get_progess(player) return { my_id: dict( type='feedback', is_correct=current.is_correct, retries_left=tries_left, progress=prog, ) } raise RuntimeError("unrecognized message from client") def calculate_group_findings(group: Group): together_test =group.get_players()[0].participant.count_numbers_together if together_test is not None and together_test: group.response = 0 group.solution = 0 for p in group.get_players(): if p.response == -1 and p.solution == 0: ## nicht geantwortet p.response -= C.NO_SUBMIT_PENALTY p.solution = C.NO_SUBMIT_PENALTY group.solution += p.solution group.response += p.response group.is_correct = abs(group.solution - group.response) <= C.DEVIATION_GROUP for p in group.get_players(): p.is_correct = group.is_correct else: ## fürs alleine spielen for p in group.get_players(): if p.response == -1 and p.solution == 0: # nicht geantwortet p.response -= C.NO_SUBMIT_PENALTY p.solution = C.NO_SUBMIT_PENALTY p.is_correct = abs(p.solution - p.response) <= (C.DEVIATION_GROUP/2) #halb so viel Abweichung group.is_correct = False ## Gruppe kann nicht recht haben wenn es keine Gruppe geht, schau auf den Spieler for p in group.get_players(): if p.participant.count_numbers_together: p.payoff = C.PAYOFF_PER_PUZZLE if p.is_correct else cu(0) else: # solo if p.participant.vars['spieler'] is not None: if p.participant.vars['spieler'] =='Spieler1': p.payoff = C.PAYOFF_PER_PUZZLE_SP1 if p.is_correct else cu(0) else: p.payoff = C.PAYOFF_PER_PUZZLE_SP2 if p.is_correct else cu(0) ### PAGES class InstructionsPart2(Page): timeout_seconds = C.INSTRUCTION_READ_TIME timer_text = C.TIMER_TEXT @staticmethod def is_displayed(player: Player): return player.round_number == C.START_PART2 ## Only in the first of them @staticmethod def vars_for_template(player: Player): return dict( baseline= player.subsession.TREATMENT=='BASE' ) ## Mal schauen ob die gebraucht wird class InstructionsPart2_Ende(Page): @staticmethod def is_displayed(player: Player): return player.round_number == C.START_PART2 class GroupMatching(WaitPage): wait_for_all_groups = True @staticmethod def is_displayed(player: Player): return player.round_number >= C.START_PART2 @staticmethod def after_all_players_arrive(subsession: Subsession): if subsession.round_number == C.START_PART2: group_generator(subsession) elif subsession.round_number == C.SWAP_SELLER_ROUND: change_seller_order(subsession.session) class RoleAssignment(Page): ## Kunde & Anbieter zusammen pass class RoleInformation(Page): #Anbieter & Kundeninfo pass class CustomerChooseMatching(Page): ## Matching info Kunde @staticmethod def is_displayed(player: Player): return C.START_PART2_CHOICE == player.round_number class AssignmentCustomerSeller(WaitPage): ## TODO : hier text für warten bis alle kunden sich entschieden haben ### One To One Matching pass class AfterAssignmentInfo(Page): ## Nur ne Info (kann was in aftermatchinginfokunde rein) pass class TransactionInfo(Page): ## todo: Kunde & Anbieter in Subpages ziehen, und dann nur ein If else auf der Hauptseite pass class Similarity(Page): pass class PerceivedSimilarity(Page): pass ## Hier beginnen Spiele class Analogy_Stage1(Page): template_name = C.PAGE_ANALOGY form_model = 'player' form_fields = ['response_games'] #timeout_seconds = C.TIMEOUT_GAME ##todo nach dem testen wieder rein timer_text = C.TIMER_TEXT @staticmethod def is_displayed(player: Player): return True # todo: pcds Hier die richtige Anzeige wählen (runden oder was auch immer) @staticmethod def vars_for_template(player: Player): return dict( DEBUG=settings.DEBUG#False#settings.DEBUG ) @staticmethod def js_vars(player: Player): # analogy = Analogy.filter(player=player, round_number=player.round_number, stage=1)[0] ## todo: pcds nach type customer oder seller hier auswählen aus der participant variante ## für die Kunden ist es round_number*2 - 1 für den Index der ersten Analogy stage & roundnumber*2 für die 2te analogy = C.ANALOGY_LIST[get_number_for_player(player=player, customer=True, stage=1)] return dict( analogy=analogy #analogy=vars(analogy) ) @staticmethod def before_next_page(player: Player, timeout_happened): game_object = C.ANALOGY_LIST[get_number_for_player(player=player, customer=True, stage=1)] answer = check_for_correct_answer(player, game_object, player.response_games) ## todo: Behnud : reicht hier die summe als punkte oder willst du beide print('richtige Antwort ? ', answer) player.points_analogy += answer * C.ANALOGY_POINTS player.response_games = '' # leer machen class CountSevens_Stage1(Page): template_name = C.PAGE_7Task timeout_seconds = C.TIME_PER_PUZZLE live_method = play_game @staticmethod def js_vars(player: Player): return dict(params=player.session.params) @staticmethod def vars_for_template(player: Player): return dict(DEBUG=settings.DEBUG, input_type=task_matrix.INPUT_TYPE, placeholder=task_matrix.INPUT_HINT, together=player.participant.vars['count_numbers_together'] ) @staticmethod def before_next_page(player: Player, timeout_happened): if not timeout_happened and not player.session.params['max_iterations']: raise RuntimeError("malicious page submission") '''if C.PLAYERS_PER_GROUP is not None and C.PLAYERS_PER_GROUP > 1: ### An der Stelle Zusammenrechnen der Gruppenberechnung machen, bzw in der Gruppe speichern current = get_cur_puzzle(player) player.group.solution += int (current.solution) player.group.response += int (current.response) ''' @staticmethod def is_displayed(player: Player): ## todo: Behnud Nicht geprüft ist deine Methode Behnud if player.session.second_time_counting: if player.round_number > 1: return False else: if player.participant.count_numbers_together == False and player.participant.spieler == 'Spieler2': return False else: return True else: return True # return not (player.session.second_time_counting == True and player.round_number > 1) class Numbersequence_Stage1(Page): template_name = C.PAGE_NUMSEQ form_model = 'player' form_fields = ['response_games'] # timeout_seconds = C.TIMEOUT_GAME ##todo nach dem testen wieder rein timer_text = C.TIMER_TEXT @staticmethod def is_displayed(player: Player): return True ##todo: sag wann und was @staticmethod def vars_for_template(player: Player): return dict( DEBUG=settings.DEBUG#False#settings.DEBUG ) @staticmethod def js_vars(player: Player): # analogy = Analogy.filter(player=player, round_number=player.round_number, stage=1)[0] ## todo: pcds nach type customer oder seller hier auswählen aus der participant variante ## für die Kunden ist es round_number*2 - 1 für den Index der ersten Analogy stage & roundnumber*2 für die 2te num_seq = C.NUM_SEQUENCE_LIST[get_number_for_player(player=player, customer=True, stage=1)] return dict( number_sequence=num_seq #analogy=vars(analogy) ) @staticmethod def before_next_page(player: Player, timeout_happened): game_object = C.NUM_SEQUENCE_LIST[get_number_for_player(player=player, customer=True, stage=1)] answer = check_for_correct_answer(player, game_object, player.response_games) ## todo: Behnud : reicht hier die summe als punkte oder willst du beide print('richtige Antwort ? ', answer) player.points_number_seq += answer * C.NUMBER_SEQ_POINTS player.response_games = '' # leer machen class CountSevens_Stage2(Page): template_name = C.PAGE_7Task pass class Analogy_Stage2(Page): template_name = C.PAGE_ANALOGY form_model = 'player' form_fields = ['response_games'] #timeout_seconds = C.TIMEOUT_GAME ##todo nach dem testen wieder rein timer_text = C.TIMER_TEXT @staticmethod def is_displayed(player: Player): return True # todo: pcds Hier die richtige Anzeige wählen (runden oder was auch immer) @staticmethod def vars_for_template(player: Player): return dict( DEBUG=settings.DEBUG#False#settings.DEBUG ) @staticmethod def js_vars(player: Player): # analogy = Analogy.filter(player=player, round_number=player.round_number, stage=1)[0] ## todo: pcds nach type customer oder seller hier auswählen aus der participant variante ## für die Kunden ist es round_number*2 - 1 für den Index der ersten Analogy stage & roundnumber*2 für die 2te analogy = C.ANALOGY_LIST[get_number_for_player(player=player, customer=True, stage=2)] return dict( analogy=analogy #analogy=vars(analogy) ) @staticmethod def before_next_page(player: Player, timeout_happened): game_object = C.ANALOGY_LIST[get_number_for_player(player=player, customer=True, stage=1)] answer = check_for_correct_answer(player, game_object, player.response_games) ## todo: Behnud : reicht hier die summe als punkte oder willst du beide print('richtige Antwort ? ', answer) player.points_analogy += answer * C.ANALOGY_POINTS player.response_games = '' # leer machen class CountSevens_Stage3(Page): template_name = C.PAGE_7Task class Numbersequence_Stage2(Page): template_name = C.PAGE_NUMSEQ form_model = 'player' form_fields = ['response_games'] # timeout_seconds = C.TIMEOUT_GAME ##todo nach dem testen wieder rein timer_text = C.TIMER_TEXT @staticmethod def is_displayed(player: Player): return True ##todo: sag wann und was @staticmethod def vars_for_template(player: Player): return dict( DEBUG=settings.DEBUG#False#settings.DEBUG ) @staticmethod def js_vars(player: Player): # analogy = Analogy.filter(player=player, round_number=player.round_number, stage=1)[0] ## todo: pcds nach type customer oder seller hier auswählen aus der participant variante ## für die Kunden ist es round_number*2 - 1 für den Index der ersten Analogy stage & roundnumber*2 für die 2te num_seq = C.NUM_SEQUENCE_LIST[get_number_for_player(player=player, customer=True, stage=2)] return dict( number_sequence=num_seq #analogy=vars(analogy) ) @staticmethod def before_next_page(player: Player, timeout_happened): game_object = C.NUM_SEQUENCE_LIST[get_number_for_player(player=player, customer=True, stage=1)] answer = check_for_correct_answer(player, game_object, player.response_games) ## todo: Behnud : reicht hier die summe als punkte oder willst du beide print('richtige Antwort ? ', answer) player.points_number_seq += answer * C.NUMBER_SEQ_POINTS player.response_games = '' # leer machen ### SPIELE Ende class RoundResult(Page): pass class ShowPayoff(Page): ## Hier berechnen was in calcPayoff war & auch zeigen pass class RatingGroupMember(Page): ##Sternrating mitnehmen pass class EndRound(Page): pass ### ENDE class FinalResults(Page): pass page_sequence = [ InstructionsPart2, InstructionsPart2_Ende, GroupMatching, ## WaitPage RoleAssignment, RoleInformation, CustomerChooseMatching, AssignmentCustomerSeller, ## WAITPAGE AfterAssignmentInfo, TransactionInfo, Similarity, PerceivedSimilarity, Analogy_Stage1, CountSevens_Stage1, Numbersequence_Stage1, CountSevens_Stage2, Analogy_Stage2, CountSevens_Stage3, Numbersequence_Stage2, RoundResult, ShowPayoff, RatingGroupMember, EndRound, FinalResults ]