def start()

in parquet_flask/cdms_lambda_func/ingest_s3_to_cdms/ingest_s3_to_cdms.py [0:0]


    def start(self, event):
        LOGGER.debug(f'for event: {event}')
        s3_records = S3ToSqs(event)
        results = []
        for i in range(s3_records.size()):
            s3_url = s3_records.get_s3_url(i)
            if not (s3_url.upper().endswith('.JSON') or s3_url.upper().endswith('.JSON.GZ')):
                LOGGER.debug(f'skipping file: {s3_url}')
                continue
            LOGGER.debug(f'executing file: {s3_url}')
            put_body = {
                's3_url': s3_url,
                'sanitize_record': self.__sanitize_records,
                'wait_till_finish': self.__wait_till_finished,
            }
            ddb_record = self.__ddb.get_one_item(s3_url)
            header = {
                'Authorization': base64.standard_b64encode(os.environ.get(LambdaFuncEnv.CDMS_BEARER_TOKEN).encode()).decode(),  # TODO this comes from Secret manager. not directly from env variable
                'Content-Type': 'application/json'
            }
            if ddb_record is None:
                put_url = f'{self.__cdms_domain}/1.0/ingest_json_s3'
            else:
                put_url = f'{self.__cdms_domain}/1.0/replace_json_s3'
                put_body['job_id'] = ddb_record['uuid']
            LOGGER.debug(f'putting {put_body} to {put_url}')
            result = requests.put(url=put_url,
                                  data=json.dumps(put_body),
                                  headers=header,
                                  verify=False)
            LOGGER.info(f'ingest result: {result.status_code}')
            LOGGER.debug(f'ingest result details: {result.text}')
            results.append({
                'status_code': result.status_code,
                'details': result.text,
            })
        return results