in test.py [0:0]
def main():
task_queue_name = None
task_completed_queue_name = None
try:
task_queue_name = os.environ['SQS_TASK_QUEUE']
task_completed_queue_name = os.environ['SQS_TASK_COMPLETED_QUEUE']
except KeyError:
log.error('Please set the environment variables for SQS_TASK_QUEUE and SQS_TASK_COMPLETED_QUEUE')
sys.exit(1)
# Get the instance information
r = requests.get("http://169.254.169.254/latest/dynamic/instance-identity/document")
r.raise_for_status()
response_json = r.json()
region = response_json.get('region')
instance_id = response_json.get('instanceId')
ec2 = boto3.client('ec2', region_name=region)
s3 = boto3.client('s3', region_name=region)
task_queue = boto3.resource('sqs', region_name=region).get_queue_by_name(QueueName=task_queue_name)
task_completed_queue = boto3.resource('sqs', region_name=region).get_queue_by_name(QueueName=task_completed_queue_name)
log.info('Initialized - instance: %s', instance_id)
prepare_queue = queue.Queue()
inference_queue = queue.Queue(maxsize=FRAME_BATCH)
prepare_worker = threading.Thread(target=prepare, args=(prepare_queue, inference_queue,))
prepare_worker.start()
while True:
for message in task_queue.receive_messages(WaitTimeSeconds=10):
try:
log.info('Message received - instance: %s', instance_id)
ec2.modify_instance_attribute(
InstanceId=instance_id,
DisableApiTermination={ 'Value': True },
)
log.info('Termination protection engaged - instance: %s', instance_id)
message.change_visibility(VisibilityTimeout=600)
log.info('Message visibility updated - instance: %s', instance_id)
# Process the message
doc = json.loads(message.body)
log.info('Message body is loaded - instance: %s', instance_id)
s3.download_file(doc['bucket'], doc['object'], TMP_FILE)
log.info('File is downloaded - instance: %s', instance_id)
log.info('Starting predictions - instance: %s', instance_id)
predictions_for_frames = process_video_from_file(TMP_FILE, prepare_queue, inference_queue)
log.info('Predictions completed - instance: %s', instance_id)
log.info(''.join(e for e in predictions_for_frames))
task_completed_queue.send_message(MessageBody=''.join(e for e in predictions_for_frames))
log.info('Task completed msg sent - instance: %s', instance_id)
message.delete()
log.info('Message deleted - instance: %s', instance_id)
ec2.modify_instance_attribute(
InstanceId=instance_id,
DisableApiTermination={ 'Value': False },
)
log.info('Termination protection disengaged - instance: %s', instance_id)
if os.path.exists(TMP_FILE):
os.remove(TMP_FILE)
except:
log.error('Problem processing message: %s - instance: %s', sys.exc_info()[0], instance_id)