from re import sub from otree.api import * from decimal import * from settings import SESSION_CONFIG_DEFAULTS, SESSION_CONFIGS, PARTICIPANT_FIELDS author = "Nathaniel Archer Lawrence, LEMMA, Université Panthéon-Assas - Paris II" doc = """ Consumption simulation with interface based off of 'Shopping app (online grocery store)' from oTree demos, see: . """ def read_csv(): import csv f = open(__name__ + '/catalog.csv', encoding='utf-8-sig') rows = [row for row in csv.DictReader(f)] for row in rows: # all values in CSV are string unless you convert them row['unit_price'] = float(row['unit_price']) return rows def read_csv_inflation(): import csv f = open(__name__ + '/animal_spirits.csv', encoding='utf-8-sig') rows = [row for row in csv.DictReader(f)] for row in rows: # all values in CSV are string unless you convert them row['430'] = float(row['430']) #/ 12 # convert from the annualized rate row['1012'] = float(row['1012']) #/ 12 # convert from the annualized rate row['period'] = int(row['period']) return rows class C(BaseConstants): NAME_IN_URL = 'task' PLAYERS_PER_GROUP = None NUM_ROUNDS = 120 TIME_LIMIT = SESSION_CONFIG_DEFAULTS['time_limit'] # consumption constants INITIAL_ENDOWMENT = SESSION_CONFIG_DEFAULTS['initial_endowment'] INCOME = SESSION_CONFIG_DEFAULTS['income'] INTEREST_RATE = SESSION_CONFIG_DEFAULTS['interest_rate'] CONSUMPTION_RATE = 1 # amount of good consumed from stock balance each period MONETARY_POLICY = SESSION_CONFIG_DEFAULTS['monetary_policy'] # inflation parameters from csv INFLATION = read_csv_inflation() # period = ID in dict INFLATION_DICT = {row['period']: row for row in INFLATION} # list of products taken from csv file PRODUCTS = read_csv() # SKU = 'stock keeping unit' = product ID PRODUCTS_DICT = {row['sku']: row for row in PRODUCTS} class Subsession(BaseSubsession): pass class Group(BaseGroup): pass class Player(BasePlayer): ## Session indices day = models.IntegerField() app_sequence = models.IntegerField() inf_sequence = models.IntegerField() intervention = models.IntegerField() total_price = models.CurrencyField(initial=0) initial_savings = models.CurrencyField(initial=0) cashOnHand = models.CurrencyField(initial=0) finalSavings = models.CurrencyField(initial=0) finalStock = models.IntegerField(initial=0) interestEarned = models.CurrencyField(initial=0) decision = models.LongStringField(initial='') newPrice = models.FloatField(initial=C.PRODUCTS_DICT['1']['unit_price']) realInterest = models.FloatField() # Response time responseTime = models.FloatField(initial=0) # Choice confidence current_choiceConfidence = models.IntegerField( choices = [[1,'1 – Not at all confident to have made the right decision'],[2,'2'],[3,'3'],[4,'4'],[5,'5 – Absolutely confident to have made the right decision']], label='How confident are you that you made the right decision this month?', widget=widgets.RadioSelect ) # Inflation estimates inf = models.FloatField() class Item(ExtraModel): player = models.Link(Player) sku = models.StringField() name = models.StringField() quantity = models.IntegerField() unit_price = models.CurrencyField() newPrice = models.FloatField() # Calculate total price of item with quantity selected def total_price(item: Item): return item.quantity * item.unit_price # Convert information about items into a dictionary def to_dict(item: Item): return dict( sku=item.sku, name=item.name, quantity=item.quantity, total_price=total_price(item) ) # Send info to HTML def live_method(player: Player, data): import json # to convert dict to json print("Round number: ", player.round_number) print("player new price csv: ", player.newPrice) print("inflation: ", C.INFLATION_DICT[player.round_number][player.session.config['inflation']]) print('player.session.config[inflation]:',player.session.config['inflation']) if player.round_number == 1: player.newPrice = float(C.PRODUCTS_DICT['1']['unit_price'] * (1+C.INFLATION_DICT[player.round_number][player.session.config['inflation']])) if 'sku' in data: sku = data['sku'] delta = data['delta'] product = C.PRODUCTS_DICT[sku] matches = Item.filter(player=player, sku=sku) if matches: [item] = matches item.quantity += delta if item.quantity <= 0: item.delete() else: if delta > 0: Item.create( player=player, quantity=delta, sku=sku, name=product['name'], unit_price=player.newPrice ) items = Item.filter(player=player) item_dicts = [to_dict(item) for item in items] player.total_price = sum([total_price(item) for item in items]) player.initial_savings = C.INITIAL_ENDOWMENT player.cashOnHand = player.initial_savings + C.INCOME player.finalSavings = player.cashOnHand - player.total_price player.finalStock = sum([item.quantity for item in items])# - C.CONSUMPTION_RATE player.interestEarned = 0 player.newPrice = player.newPrice player.realInterest = round(100 * (C.INTEREST_RATE - C.INFLATION_DICT[player.round_number][player.session.config['inflation']]), 2) print('real int 1: ',player.realInterest) else: player.newPrice = player.in_round(player.round_number - 1).newPrice * (1+C.INFLATION_DICT[player.round_number][player.session.config['inflation']]) # allows subject to add and remove from shopping cart if 'sku' in data: sku = data['sku'] delta = data['delta'] product = C.PRODUCTS_DICT[sku] matches = Item.filter(player=player, sku=sku) if matches: [item] = matches item.quantity += delta if item.quantity <= 0: item.delete() else: if delta > 0: Item.create( player=player, quantity=delta, sku=sku, name=product['name'], unit_price=player.newPrice ) items = Item.filter(player=player) item_dicts = [to_dict(item) for item in items] player.total_price = sum([total_price(item) for item in items]) player.initial_savings = player.in_round(player.round_number - 1).finalSavings * (1 + (C.INTEREST_RATE + (C.MONETARY_POLICY * C.INFLATION_DICT[player.round_number - 1][player.session.config['inflation']]))) # fetch previous period's final savings print("monetary policy: ", (C.MONETARY_POLICY * C.INFLATION_DICT[player.round_number - 1][player.session.config['inflation']])) print("inflation rate: ", C.INFLATION_DICT[player.round_number][player.session.config['inflation']]) player.cashOnHand = player.initial_savings + C.INCOME player.finalSavings = player.cashOnHand - player.total_price player.finalStock = player.in_round(player.round_number - 1).finalStock + sum([item.quantity for item in items])# - C.CONSUMPTION_RATE # fetch previous period's final stock player.interestEarned = player.in_round(player.round_number - 1).finalSavings * (C.INTEREST_RATE + (C.MONETARY_POLICY * C.INFLATION_DICT[player.round_number - 1][player.session.config['inflation']])) player.realInterest = round(100 * ((C.INTEREST_RATE + (C.MONETARY_POLICY * C.INFLATION_DICT[player.round_number - 1][player.session.config['inflation']])) - C.INFLATION_DICT[player.round_number][player.session.config['inflation']]), 2) print('real int 2: ',player.realInterest) player.decision = json.dumps({"item":[item.sku for item in items],"quantity":[item.quantity for item in items],"price":[float(item.unit_price) for item in items]}) print('Current player decision: ', player.decision) print('Player new price: ', player.newPrice) return { player.id_in_group: dict( items=item_dicts, total_price=player.total_price, initial_savings=player.initial_savings, interestEarned=player.interestEarned, cashOnHand=player.cashOnHand, finalSavings=player.finalSavings, finalStock=player.finalStock, decision=player.decision, newPrice=player.newPrice, realInterest=player.realInterest, ) } # PAGES class MyPage_nf(Page): live_method = live_method form_model = 'player' form_fields = ['responseTime'] @staticmethod def is_displayed(player): return player.session.config['intervention'] != 'framing' @staticmethod def error_message(player, value): if player.finalSavings < 0: return 'Your do not have sufficient Total Cash to complete this purchase.' @staticmethod def before_next_page(player: Player, timeout_happened): participant = player.participant participant.periods_survived = player.round_number print('periods survived recorded: ', participant.periods_survived) player.finalStock -= 1 print('before next Stock: ', player.finalStock) class MyPage_f(Page): live_method = live_method form_model = 'player' form_fields = ['responseTime'] @staticmethod def is_displayed(player): return player.session.config['intervention'] == 'framing' @staticmethod def error_message(player, value): if player.finalSavings < 0: return 'Your do not have sufficient Total Cash to complete this purchase.' @staticmethod def before_next_page(player: Player, timeout_happened): participant = player.participant participant.periods_survived = player.round_number print('periods survived recorded: ', participant.periods_survived) player.finalStock -= 1 print('before next Stock: ', player.finalStock) ### Choice confidence quesitons about current decision class Decision(Page): form_model = 'player' form_fields = ['current_choiceConfidence'] @staticmethod def vars_for_template(player: Player): return dict(items=Item.filter(player=player)) @staticmethod def is_displayed(player): if player.session.config['inflation'] == '430': return player.round_number % 30 == 29 or player.round_number % 30 == 1 else: return player.round_number % 12 == 11 or player.round_number % 12 == 1 # @staticmethod # def before_next_page(player: Player, timeout_happened): # participant = player.participant # participant.periods_survived = player.round_number # print('periods survived recorded: ', participant.periods_survived) class PriceChanges(Page): form_model = 'player' form_fields = ['inf'] timeout_seconds = C.TIME_LIMIT @staticmethod def is_displayed(player): return player.round_number % 12 == 0 @staticmethod def vars_for_template(player: Player): return dict(rounds12Before=player.round_number - 11) @staticmethod def before_next_page(player: Player, timeout_happened): participant = player.participant ## Record number or months survived participant.periods_survived = player.round_number print('periods survived recorded: ', participant.periods_survived) ## Record payoff if player.session.config['day'] == '1': participant.task_1 = player.finalSavings print('task_1: ', participant.task_1) if player.session.config['day'] == '2': participant.task_2 = player.finalSavings print('task_2: ', participant.task_2) if player.session.config['day'] == '3': participant.task_3 = player.finalSavings print('task_3: ', participant.task_3) if player.session.config['day'] == '4': participant.task_4 = player.finalSavings print('task_4: ', participant.task_4) class Failed(Page): @staticmethod def is_displayed(player: Player): return player.in_round(player.round_number).finalStock < 0 or player.in_round(player.round_number).finalSavings < 0 @staticmethod def app_after_this_page(player, upcoming_apps): if player.in_round(player.round_number).finalStock < 0 or player.in_round(player.round_number).finalSavings < 0: player.participant.periods_survived=player.round_number return 'task_questions' @staticmethod def before_next_page(player: Player, timeout_happened): participant = player.participant ## Record payoff if player.session.config['day'] == '1': participant.task_1 = player.finalSavings print('task_1: ', participant.task_1) if player.session.config['day'] == '2': participant.task_2 = player.finalSavings print('task_2: ', participant.task_2) if player.session.config['day'] == '3': participant.task_3 = player.finalSavings print('task_3: ', participant.task_3) if player.session.config['day'] == '4': participant.task_4 = player.finalSavings print('task_4: ', participant.task_4) class Results(Page): @staticmethod def is_displayed(player): return player.round_number == 120 # display final results @staticmethod def vars_for_template(player: Player): return dict(final_results=player.finalSavings) page_sequence = [MyPage_f, MyPage_nf, Decision, PriceChanges, Failed, Results]