in parquet_flask/cdms_lambda_func/s3_records/s3_2_sqs.py [0:0]
def __is_valid(self):
is_valid, validation_err = GeneralUtils.is_json_valid(self.__event, self.OUTER_SCHEMA)
if is_valid is False:
raise ValueError(f'invalid OUTER_SCHEMA: {self.__event} vs {self.OUTER_SCHEMA}. errors: {validation_err}')
self.__s3_record = []
# Unpack SQS messages from lambda event
for each_sqs_record in self.__event['Records']:
s3_record = each_sqs_record['body']
if isinstance(s3_record, str):
s3_record = json.loads(s3_record)
is_valid, validation_err = GeneralUtils.is_json_valid(s3_record, self.S3_RECORD_SCHEMA)
if is_valid is False:
raise ValueError(f'invalid S3_RECORD_SCHEMA: {s3_record} vs {self.S3_RECORD_SCHEMA}. errors: {validation_err}')
# Unpack S3 events from SQS message. (I believe multiple events can be consolidated into single messages)
for each_s3_record in s3_record['Records']:
self.__s3_record.append(each_s3_record)
return self