import contextlib import json import logging from collections import defaultdict, namedtuple from dataclasses import dataclass from datetime import datetime from decimal import Decimal from typing import List, Dict, Union, Optional import vanilla from django.conf import settings from django.contrib import messages from django.http import HttpResponseServerError from django.shortcuts import get_object_or_404 from django.shortcuts import redirect from django.template.loader import render_to_string from django.urls import reverse import otree from otree.models import Session, Participant from otree.views.abstract import AdminSessionPageMixin try: import boto3 except ImportError: boto3 = None logger = logging.getLogger('otree') @dataclass class MTurkSettings: keywords: Union[str, list] title: str description: str frame_height: int template: str minutes_allotted_per_assignment: int expiration_hours: float qualification_requirements: List grant_qualification_id: Optional[str] = None def get_mturk_client(*, use_sandbox=True): if use_sandbox: endpoint_url = 'https://mturk-requester-sandbox.us-east-1.amazonaws.com' else: endpoint_url = 'https://mturk-requester.us-east-1.amazonaws.com' return boto3.client( 'mturk', aws_access_key_id=settings.AWS_ACCESS_KEY_ID, aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY, endpoint_url=endpoint_url, # if I specify endpoint_url without region_name, it complains region_name='us-east-1', ) @contextlib.contextmanager def MTurkClient(*, use_sandbox=True, request): '''Alternative to get_mturk_client, for when we need exception handling in admin views, we should pass it, so that we can show the user the message without crashing. for participant-facing views and commandline tools, should use get_mturk_client. ''' try: yield get_mturk_client(use_sandbox=use_sandbox) except Exception as exc: logger.error('MTurk error', exc_info=True) messages.error(request, str(exc), extra_tags='safe') def in_public_domain(request): """This method validates if oTree are published on a public domain because mturk need it """ host = request.get_host().lower() if ":" in host: host = host.split(":", 1)[0] if host in ["localhost", '127.0.0.1']: return False # IPy had a compat problem with py 3.8. # in the future, could move some IPy code here. return True class MTurkCreateHIT(AdminSessionPageMixin, vanilla.FormView): # make these class attributes so they can be mocked aws_keys_exist = bool( getattr(settings, 'AWS_ACCESS_KEY_ID', None) and getattr(settings, 'AWS_SECRET_ACCESS_KEY', None) ) boto3_installed = bool(boto3) def get(self, request): session = self.session mturk_settings = session.config['mturk_hit_settings'] is_new_format = 'template' in mturk_settings is_usd = settings.REAL_WORLD_CURRENCY_CODE == 'USD' mturk_ready = ( self.aws_keys_exist and self.boto3_installed and is_new_format and is_usd ) context = self.get_context_data( mturk_settings=mturk_settings, participation_fee=session.config['participation_fee'], mturk_num_workers=session.mturk_num_workers(), mturk_ready=mturk_ready, boto3_installed=self.boto3_installed, aws_keys_exist=self.aws_keys_exist, is_new_format=is_new_format, is_usd=is_usd, ) return self.render_to_response(context) def post(self, request): session = self.session use_sandbox = bool(request.POST.get('use_sandbox')) if not in_public_domain(request) and not use_sandbox: msg = ( '

Error: ' 'oTree must run on a public domain for Mechanical Turk' '

' ) return HttpResponseServerError(msg) mturk_settings = MTurkSettings(**session.config['mturk_hit_settings']) start_url = self.request.build_absolute_uri( reverse('MTurkStart', args=(session.code,)) ) keywords = mturk_settings.keywords if isinstance(keywords, (list, tuple)): keywords = ', '.join(keywords) html_question = render_to_string( 'otree/MTurkHTMLQuestion.html', context=dict( user_template=mturk_settings.template, frame_height=mturk_settings.frame_height, start_url=start_url, ), ) mturk_hit_parameters = { 'Title': mturk_settings.title, 'Description': mturk_settings.description, 'Keywords': keywords, 'MaxAssignments': session.mturk_num_workers(), 'Reward': str(float(session.config['participation_fee'])), 'AssignmentDurationInSeconds': 60 * mturk_settings.minutes_allotted_per_assignment, 'LifetimeInSeconds': int(60 * 60 * mturk_settings.expiration_hours), # prevent duplicate HITs 'UniqueRequestToken': 'otree_{}'.format(session.code), 'Question': html_question, } if not use_sandbox: # drop requirements checks in sandbox mode. mturk_hit_parameters[ 'QualificationRequirements' ] = mturk_settings.qualification_requirements with MTurkClient(use_sandbox=use_sandbox, request=request) as mturk_client: hit = mturk_client.create_hit(**mturk_hit_parameters)['HIT'] session.mturk_HITId = hit['HITId'] session.mturk_HITGroupId = hit['HITGroupId'] session.mturk_use_sandbox = use_sandbox session.mturk_expiration = hit['Expiration'].timestamp() session.mturk_qual_id = mturk_settings.grant_qualification_id or '' session.save() return redirect('MTurkCreateHIT', session.code) Assignment = namedtuple( 'Assignment', ['worker_id', 'assignment_id', 'status', 'answer'] ) def get_all_assignments(mturk_client, hit_id) -> List[Assignment]: # Accumulate all relevant assignments, one page of results at # a time. assignments = [] args = dict( HITId=hit_id, # i think 100 is the max page size MaxResults=100, AssignmentStatuses=['Submitted', 'Approved', 'Rejected'], ) while True: response = mturk_client.list_assignments_for_hit(**args) if not response['Assignments']: break for d in response['Assignments']: assignments.append( Assignment( worker_id=d['WorkerId'], assignment_id=d['AssignmentId'], status=d['AssignmentStatus'], answer=d['Answer'], ) ) args['NextToken'] = response['NextToken'] return assignments def get_workers_by_status( all_assignments: List[Assignment], ) -> Dict[str, List[Assignment]]: workers_by_status = defaultdict(list) for assignment in all_assignments: workers_by_status[assignment.status].append(assignment.worker_id) return workers_by_status class MTurkSessionPayments(AdminSessionPageMixin, vanilla.TemplateView): def vars_for_template(self): session = self.session published = bool(session.mturk_HITId) if not published: return dict(published=False) with MTurkClient( use_sandbox=session.mturk_use_sandbox, request=self.request ) as mturk_client: all_assignments = get_all_assignments(mturk_client, session.mturk_HITId) # auto-reject logic assignment_ids_in_db = session.participant_set.exclude( mturk_assignment_id=None ).values_list('mturk_assignment_id', flat=True) submitted_assignment_ids = [ a.assignment_id for a in all_assignments if a.status == 'Submitted' ] auto_rejects = set(submitted_assignment_ids) - set(assignment_ids_in_db) for assignment_id in auto_rejects: mturk_client.reject_assignment( AssignmentId=assignment_id, RequesterFeedback='Auto-rejecting because this assignment was not found in our database.', ) workers_by_status = get_workers_by_status(all_assignments) participants_approved = session.participant_set.filter( mturk_worker_id__in=workers_by_status['Approved'] ) participants_rejected = session.participant_set.filter( mturk_worker_id__in=workers_by_status['Rejected'] ) submitted_worker_ids = workers_by_status['Submitted'] participants_not_reviewed = session.participant_set.filter( mturk_worker_id__in=submitted_worker_ids ) add_answers(participants_not_reviewed, all_assignments) add_answers(participants_approved, all_assignments) add_answers(participants_rejected, all_assignments) return dict( published=True, participants_approved=participants_approved, participants_rejected=participants_rejected, participants_not_reviewed=participants_not_reviewed, participation_fee=session.config['participation_fee'], auto_rejects=auto_rejects, ) def get_completion_code(xml: str) -> str: if not xml: return '' # move inside function because it adds 0.03s to startup time from xml.etree import ElementTree root = ElementTree.fromstring(xml) for ans in root: if ans[0].text == 'taskAnswers': answer_data = json.loads(ans[1].text) try: return answer_data[0]['completion_code'] except: return '' return '' def add_answers(participants: List[Participant], all_assignments: List[Assignment]): answers = {} for assignment in all_assignments: answers[assignment.worker_id] = assignment.answer for p in participants: p._is_frozen = False p.mturk_answers_formatted = get_completion_code(answers[p.mturk_worker_id]) class PayMTurk(vanilla.View): url_pattern = r'^PayMTurk/(?P[a-z0-9]+)/$' def post(self, request, session_code): session = get_object_or_404(otree.models.Session, code=session_code) successful_payments = 0 failed_payments = 0 mturk_client = get_mturk_client(use_sandbox=session.mturk_use_sandbox) payment_page_response = redirect('MTurkSessionPayments', session.code) # use worker ID instead of assignment ID. Because 2 workers can have # the same assignment (if 1 starts it then returns it). we can't really # block that. # however, we can ensure that 1 worker does not get 2 assignments, # by enforcing that the same worker is always assigned to the same participant. participants = session.participant_set.filter( mturk_worker_id__in=request.POST.getlist('workers') ) for p in participants: # need the try/except so that we try to pay the rest of the participants payoff = p.payoff_in_real_world_currency() try: if payoff > 0: mturk_client.send_bonus( WorkerId=p.mturk_worker_id, AssignmentId=p.mturk_assignment_id, BonusAmount='{0:.2f}'.format(Decimal(payoff)), # prevent duplicate payments UniqueRequestToken='{}_{}'.format( p.mturk_worker_id, p.mturk_assignment_id ), # this field is required. Reason='Thank you', ) # approve assignment should happen AFTER bonus, so that if bonus fails, # the user will still show up in assignments_not_reviewed. # worst case is that bonus succeeds but approval fails. # in that case, exception will be raised on send_bonus because of UniqueRequestToken. # but that's OK, then you can just unselect that participant and pay the others. mturk_client.approve_assignment(AssignmentId=p.mturk_assignment_id) successful_payments += 1 except Exception as e: msg = ( 'Could not pay {} because of an error communicating ' 'with MTurk: {}'.format(p._numeric_label(), str(e)) ) messages.error(request, msg) logger.error(msg) failed_payments += 1 if failed_payments > 10: return payment_page_response msg = 'Successfully made {} payments.'.format(successful_payments) if failed_payments > 0: msg += ' {} payments failed.'.format(failed_payments) messages.warning(request, msg) else: messages.success(request, msg) return payment_page_response class RejectMTurk(vanilla.View): url_pattern = r'^RejectMTurk/(?P[a-z0-9]+)/$' def post(self, request, session_code): session = get_object_or_404(Session, code=session_code) with MTurkClient( use_sandbox=session.mturk_use_sandbox, request=request ) as mturk_client: for p in session.participant_set.filter( mturk_worker_id__in=request.POST.getlist('workers') ): mturk_client.reject_assignment( AssignmentId=p.mturk_assignment_id, # The boto3 docs say this param is optional, but if I omit it, I get: # An error occurred (ValidationException) when calling the RejectAssignment operation: # 1 validation error detected: Value null at 'requesterFeedback' # failed to satisfy constraint: Member must not be null RequesterFeedback='', ) messages.success( request, "You successfully rejected " "selected assignments" ) return redirect('MTurkSessionPayments', session_code) class MTurkExpireHIT(vanilla.View): url_pattern = r'^MTurkExpireHIT/(?P[a-z0-9]+)/$' def post(self, request, session_code): session = get_object_or_404(Session, code=session_code) with MTurkClient( use_sandbox=session.mturk_use_sandbox, request=request ) as mturk_client: expiration = datetime(2015, 1, 1) mturk_client.update_expiration_for_hit( HITId=session.mturk_HITId, # If you update it to a time in the past, # the HIT will be immediately expired. ExpireAt=expiration, ) session.mturk_expiration = expiration.timestamp() session.save() # don't need a message because the MTurkCreateHIT page will # statically say the HIT has expired. return redirect('MTurkCreateHIT', session.code)