in parquet_flask/cdms_lambda_func/ingest_s3_to_cdms/ingest_s3_to_cdms.py [0:0]
def start(self, event):
LOGGER.debug(f'for event: {event}')
s3_records = S3ToSqs(event)
results = []
for i in range(s3_records.size()):
s3_url = s3_records.get_s3_url(i)
if not (s3_url.upper().endswith('.JSON') or s3_url.upper().endswith('.JSON.GZ')):
LOGGER.debug(f'skipping file: {s3_url}')
continue
LOGGER.debug(f'executing file: {s3_url}')
put_body = {
's3_url': s3_url,
'sanitize_record': self.__sanitize_records,
'wait_till_finish': self.__wait_till_finished,
}
ddb_record = self.__ddb.get_one_item(s3_url)
header = {
'Authorization': base64.standard_b64encode(os.environ.get(LambdaFuncEnv.CDMS_BEARER_TOKEN).encode()).decode(), # TODO this comes from Secret manager. not directly from env variable
'Content-Type': 'application/json'
}
if ddb_record is None:
put_url = f'{self.__cdms_domain}/1.0/ingest_json_s3'
else:
put_url = f'{self.__cdms_domain}/1.0/replace_json_s3'
put_body['job_id'] = ddb_record['uuid']
LOGGER.debug(f'putting {put_body} to {put_url}')
result = requests.put(url=put_url,
data=json.dumps(put_body),
headers=header,
verify=False)
LOGGER.info(f'ingest result: {result.status_code}')
LOGGER.debug(f'ingest result details: {result.text}')
results.append({
'status_code': result.status_code,
'details': result.text,
})
return results