in source/backend/chalicelib/framework.py [0:0]
def put_challenge_frame(challenge_id):
blueprint.log.debug('put_challenge_frame: %s', challenge_id)
request = blueprint.current_request.json_body
# Validating timestamp input
try:
timestamp = int(request['timestamp'])
except ValueError:
raise BadRequestError('Invalid timestamp')
blueprint.log.debug('timestamp: %s', timestamp)
# Validating frame input
try:
frame = base64.b64decode(request['frameBase64'], validate=True)
except binascii.Error:
raise BadRequestError('Invalid Image')
if len(frame) > _MAX_IMAGE_SIZE:
raise BadRequestError('Image size too large')
if imghdr.what(None, h=frame) != 'jpeg':
raise BadRequestError('Image must be JPEG')
frame_key = '{}/{}.jpg'.format(challenge_id, timestamp)
blueprint.log.debug('frame_key: %s', frame_key)
# Updating challenge on DynamoDB table
try:
_table.update_item(
Key={'id': challenge_id},
UpdateExpression='set #frames = list_append(if_not_exists(#frames, :empty_list), :frame)',
ExpressionAttributeNames={'#frames': 'frames'},
ExpressionAttributeValues={
':empty_list': [],
':frame': [{
'timestamp': timestamp,
'key': frame_key
}]
},
ReturnValues='NONE'
)
except ClientError as error:
if error.response['Error']['Code'] == 'ConditionalCheckFailedException':
blueprint.log.info('Challenge not found: %s', challenge_id)
raise NotFoundError('Challenge not found')
# Uploading frame to S3 bucket
_s3.put_object(
Body=frame,
Bucket=_BUCKET_NAME,
Key=frame_key,
ExpectedBucketOwner=os.getenv('ACCOUNT_ID') # Bucket Sniping prevention
)
return {'message': 'Frame saved successfully'}