in parquet_cli/ingest_s3/__main__.py [0:0]
def start(self):
options = self.__get_args()
logging.basicConfig(level=int(getattr(options, LambdaFuncEnv.LOG_LEVEL)),
format="%(asctime)s [%(levelname)s] [%(name)s::%(lineno)d] %(message)s")
os.environ[LambdaFuncEnv.LOG_LEVEL] = getattr(options, LambdaFuncEnv.LOG_LEVEL)
os.environ[LambdaFuncEnv.CDMS_DOMAIN] = getattr(options, LambdaFuncEnv.CDMS_DOMAIN)
os.environ[LambdaFuncEnv.CDMS_BEARER_TOKEN] = base64.standard_b64encode(getattr(options, LambdaFuncEnv.CDMS_BEARER_TOKEN).encode()).decode()
os.environ[LambdaFuncEnv.PARQUET_META_TBL_NAME] = getattr(options, LambdaFuncEnv.PARQUET_META_TBL_NAME)
bucket_name = getattr(options, self.BUCKET_NAME_KEY)
key_prefix = getattr(options, self.KEY_PREFIX_KEY)
from parquet_flask.cdms_lambda_func.ingest_s3_to_cdms.ingest_s3_to_cdms import IngestS3ToCdms
from parquet_flask.aws.aws_s3 import AwsS3
s3 = AwsS3()
for key, size in s3.get_child_s3_files(bucket_name, key_prefix,
lambda x: x['Key'].endswith('.json') or x['Key'].endswith('.json.gz')):
try:
print(f'working on: {key}')
IngestS3ToCdms().start(event={'s3_url': f's3://{bucket_name}/{key}'})
except Exception as e:
print(f'error while processing: s3://{bucket_name}/{key}. details: {str(e)}')
return