import json from datetime import datetime from collections import defaultdict from typing import List, Dict, Union, Optional import logging from xml.etree import ElementTree from django.conf import settings from django.contrib import messages from django.urls import reverse from django.http import HttpResponseServerError from django.shortcuts import get_object_or_404 from django.template.loader import render_to_string import vanilla try: import boto3 except ImportError: boto3 = None import otree from otree.views.abstract import AdminSessionPageMixin from otree.models_concrete import add_time_spent_waiting from otree.models import Session, Participant from decimal import Decimal from django.shortcuts import redirect logger = logging.getLogger('otree') import contextlib from dataclasses import dataclass @dataclass class MTurkSettings: keywords: Union[str, list] title: str description: str frame_height: int template: str minutes_allotted_per_assignment: int expiration_hours: float qualification_requirements: List grant_qualification_id: Optional[str] = None def get_mturk_client(*, use_sandbox=True): if use_sandbox: endpoint_url = 'https://mturk-requester-sandbox.us-east-1.amazonaws.com' else: endpoint_url = 'https://mturk-requester.us-east-1.amazonaws.com' return boto3.client( 'mturk', aws_access_key_id=settings.AWS_ACCESS_KEY_ID, aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY, endpoint_url=endpoint_url, # if I specify endpoint_url without region_name, it complains region_name='us-east-1', ) @contextlib.contextmanager def MTurkClient(*, use_sandbox=True, request): '''Alternative to get_mturk_client, for when we need exception handling in admin views, we should pass it, so that we can show the user the message without crashing. for participant-facing views and commandline tools, should use get_mturk_client. ''' try: yield get_mturk_client(use_sandbox=use_sandbox) except Exception as exc: logger.error('MTurk error', exc_info=True) messages.error(request, str(exc), extra_tags='safe') def get_all_assignments(mturk_client, hit_id): # Accumulate all relevant assignments, one page of results at # a time. assignments = [] args = dict( HITId=hit_id, # i think 100 is the max page size MaxResults=100, AssignmentStatuses=['Submitted', 'Approved', 'Rejected'], ) while True: response = mturk_client.list_assignments_for_hit(**args) if not response['Assignments']: break assignments.extend(response['Assignments']) args['NextToken'] = response['NextToken'] return assignments def in_public_domain(request): """This method validates if oTree are published on a public domain because mturk need it """ host = request.get_host().lower() if ":" in host: host = host.split(":", 1)[0] if host in ["localhost", '127.0.0.1']: return False # IPy had a compat problem with py 3.8. # in the future, could move some IPy code here. return True class MTurkCreateHIT(AdminSessionPageMixin, vanilla.FormView): # make these class attributes so they can be mocked aws_keys_exist = bool( getattr(settings, 'AWS_ACCESS_KEY_ID', None) and getattr(settings, 'AWS_SECRET_ACCESS_KEY', None) ) boto3_installed = bool(boto3) def get(self, request): session = self.session mturk_settings = session.config['mturk_hit_settings'] is_new_format = 'template' in mturk_settings is_usd = settings.REAL_WORLD_CURRENCY_CODE == 'USD' mturk_ready = ( self.aws_keys_exist and self.boto3_installed and is_new_format and is_usd ) context = self.get_context_data( mturk_settings=mturk_settings, participation_fee=session.config['participation_fee'], mturk_num_participants=session.mturk_num_participants, mturk_ready=mturk_ready, boto3_installed=self.boto3_installed, aws_keys_exist=self.aws_keys_exist, is_new_format=is_new_format, is_usd=is_usd, ) return self.render_to_response(context) def post(self, request): session = self.session use_sandbox = bool(request.POST.get('use_sandbox')) if not in_public_domain(request) and not use_sandbox: msg = ( '

Error: ' 'oTree must run on a public domain for Mechanical Turk' '

' ) return HttpResponseServerError(msg) mturk_settings = MTurkSettings(**session.config['mturk_hit_settings']) start_url = self.request.build_absolute_uri( reverse('MTurkStart', args=(session.code,)) ) keywords = mturk_settings.keywords if isinstance(keywords, (list, tuple)): keywords = ', '.join(keywords) html_question = render_to_string( 'otree/MTurkHTMLQuestion.html', context=dict( user_template=mturk_settings.template, frame_height=mturk_settings.frame_height, start_url=start_url, ), ) mturk_hit_parameters = { 'Title': mturk_settings.title, 'Description': mturk_settings.description, 'Keywords': keywords, 'MaxAssignments': session.mturk_num_participants, 'Reward': str(float(session.config['participation_fee'])), 'AssignmentDurationInSeconds': 60 * mturk_settings.minutes_allotted_per_assignment, 'LifetimeInSeconds': int(60 * 60 * mturk_settings.expiration_hours), # prevent duplicate HITs 'UniqueRequestToken': 'otree_{}'.format(session.code), 'Question': html_question, } if not use_sandbox: # drop requirements checks in sandbox mode. mturk_hit_parameters[ 'QualificationRequirements' ] = mturk_settings.qualification_requirements with MTurkClient(use_sandbox=use_sandbox, request=request) as mturk_client: hit = mturk_client.create_hit(**mturk_hit_parameters)['HIT'] session.mturk_HITId = hit['HITId'] session.mturk_HITGroupId = hit['HITGroupId'] session.mturk_use_sandbox = use_sandbox session.mturk_expiration = hit['Expiration'].timestamp() session.save() return redirect('MTurkCreateHIT', session.code) class MTurkSessionPayments(AdminSessionPageMixin, vanilla.TemplateView): def vars_for_template(self): session = self.session published = bool(session.mturk_HITId) if not published: return dict(published=False) with MTurkClient( use_sandbox=session.mturk_use_sandbox, request=self.request ) as mturk_client: all_assignments = get_all_assignments(mturk_client, session.mturk_HITId) workers_by_status = get_workers_by_status(all_assignments) participants_not_reviewed = session.participant_set.filter( mturk_worker_id__in=workers_by_status['Submitted'] ) participants_approved = session.participant_set.filter( mturk_worker_id__in=workers_by_status['Approved'] ) participants_rejected = session.participant_set.filter( mturk_worker_id__in=workers_by_status['Rejected'] ) add_time_spent_waiting(participants_not_reviewed) add_answers(participants_not_reviewed, all_assignments) return dict( published=True, participants_approved=participants_approved, participants_rejected=participants_rejected, participants_not_reviewed=participants_not_reviewed, participation_fee=session.config['participation_fee'], ) def get_workers_by_status(all_assignments) -> Dict[str, List[str]]: workers_by_status = defaultdict(list) for assignment in all_assignments: workers_by_status[assignment['AssignmentStatus']].append(assignment['WorkerId']) return workers_by_status def get_completion_code(xml: str) -> str: if not xml: return '' root = ElementTree.fromstring(xml) for ans in root: if ans[0].text == 'taskAnswers': answer_data = json.loads(ans[1].text) try: return answer_data[0]['completion_code'] except: return '' return '' def add_answers(participants: List[Participant], all_assignments: List[dict]): answers = {} for assignment in all_assignments: answers[assignment['WorkerId']] = assignment['Answer'] for p in participants: p.mturk_answers_formatted = get_completion_code(answers[p.mturk_worker_id]) class PayMTurk(vanilla.View): url_pattern = r'^PayMTurk/(?P[a-z0-9]+)/$' def post(self, request, session_code): session = get_object_or_404(otree.models.Session, code=session_code) successful_payments = 0 failed_payments = 0 mturk_client = get_mturk_client(use_sandbox=session.mturk_use_sandbox) payment_page_response = redirect('MTurkSessionPayments', session.code) # use worker ID instead of assignment ID. Because 2 workers can have # the same assignment (if 1 starts it then returns it). we can't really # block that. # however, we can ensure that 1 worker does not get 2 assignments, # by enforcing that the same worker is always assigned to the same participant. participants = session.participant_set.filter( mturk_worker_id__in=request.POST.getlist('workers') ) # we require only that there's enough for paying the bonuses, # because the participation fee (reward) is already deducted from # available balance and held in escrow. (see forum post from 2019-06-19) # The 1.2 is because of the 20% surcharge to bonuses, as described here: # https://requester.mturk.com/pricing required_balance = Decimal( sum(p.payoff_in_real_world_currency() for p in participants) * 1.2 ) available_balance = Decimal( mturk_client.get_account_balance()['AvailableBalance'] ) if available_balance < required_balance: msg = ( f'Insufficient balance: you have ${available_balance:.2f}, ' f'but paying the selected participants costs ${required_balance:.2f}.' ) messages.error(request, msg) return payment_page_response for p in participants: # need the try/except so that we try to pay the rest of the participants payoff = p.payoff_in_real_world_currency() try: # approve assignment mturk_client.approve_assignment(AssignmentId=p.mturk_assignment_id) if payoff > 0: mturk_client.send_bonus( WorkerId=p.mturk_worker_id, AssignmentId=p.mturk_assignment_id, BonusAmount='{0:.2f}'.format(Decimal(payoff)), # prevent duplicate payments UniqueRequestToken='{}_{}'.format( p.mturk_worker_id, p.mturk_assignment_id ), # although the Boto documentation doesn't say so, # this field is required. A user reported: # "Value null at 'reason' failed to satisfy constraint: # Member must not be null." Reason='Thank you', ) successful_payments += 1 except Exception as e: msg = ( 'Could not pay {} because of an error communicating ' 'with MTurk: {}'.format(p._id_in_session(), str(e)) ) messages.error(request, msg) logger.error(msg) failed_payments += 1 msg = 'Successfully made {} payments.'.format(successful_payments) if failed_payments > 0: msg += ' {} payments failed.'.format(failed_payments) messages.warning(request, msg) else: messages.success(request, msg) return payment_page_response class RejectMTurk(vanilla.View): url_pattern = r'^RejectMTurk/(?P[a-z0-9]+)/$' def post(self, request, session_code): session = get_object_or_404(Session, code=session_code) with MTurkClient( use_sandbox=session.mturk_use_sandbox, request=request ) as mturk_client: for p in session.participant_set.filter( mturk_worker_id__in=request.POST.getlist('workers') ): mturk_client.reject_assignment( AssignmentId=p.mturk_assignment_id, # The boto3 docs say this param is optional, but if I omit it, I get: # An error occurred (ValidationException) when calling the RejectAssignment operation: # 1 validation error detected: Value null at 'requesterFeedback' # failed to satisfy constraint: Member must not be null RequesterFeedback='', ) messages.success( request, "You successfully rejected " "selected assignments" ) return redirect('MTurkSessionPayments', session_code) class MTurkExpireHIT(vanilla.View): url_pattern = r'^MTurkExpireHIT/(?P[a-z0-9]+)/$' def post(self, request, session_code): session = get_object_or_404(Session, code=session_code) with MTurkClient( use_sandbox=session.mturk_use_sandbox, request=request ) as mturk_client: expiration = datetime(2015, 1, 1) mturk_client.update_expiration_for_hit( HITId=session.mturk_HITId, # If you update it to a time in the past, # the HIT will be immediately expired. ExpireAt=expiration, ) session.mturk_expiration = expiration.timestamp() session.save() # don't need a message because the MTurkCreateHIT page will # statically say the HIT has expired. return redirect('MTurkCreateHIT', session.code)