in scripts/s3-extract-tar.py [0:0]
def __submit_batch_jobs(cls, config=None, total_file_count=None):
batch_client = boto3.client('batch')
# batch jobs
jobs=[]
s3_python_script = config['s3_python_script']
s3_json_config = config['s3_json_config']
file_path = config['file_path']
aws_region = os.environ['AWS_DEFAULT_REGION']
job_ts = str(time.time()).replace('.','-')
for i in range(0, total_file_count, cls.FILE_CHUNK_COUNT):
start = i
end = min(i + cls.FILE_CHUNK_COUNT, total_file_count)
response = batch_client.submit_job(
jobName=f'extract-{start}-{end}-{job_ts}',
jobQueue=config['job_queue'],
jobDefinition=config['job_definition'],
retryStrategy={'attempts': 2},
containerOverrides={
'command': ['--file-path', f'{file_path}', '--start', f'{start}', '--end', f'{end}'],
'environment': [
{
'name': 'S3_PYTHON_SCRIPT',
'value': s3_python_script
},
{
'name': 'S3_JSON_CONFIG',
'value': s3_json_config
},
{
'name': 'AWS_DEFAULT_REGION',
'value': aws_region
}
]
})
jobs.append(response["jobId"])
succeeded=[]
failed = []
pending=[ job_id for job_id in jobs ]
while pending:
response = batch_client.describe_jobs(jobs=pending)
pending = []
for _job in response["jobs"]:
if _job["status"] == 'SUCCEEDED':
succeeded.append(_job["jobId"])
elif _job["status"] == 'FAILED':
failed.append(_job["jobId"])
else:
pending.append(_job["jobId"])
time.sleep(5)
if failed:
import sys
sys.exit(f"Failed: batch jobs: {failed}")