in parquet_flask/cdms_lambda_func/index_to_es/parquet_file_es_indexer.py [0:0]
def start(self, event):
# LOGGER.warning(self.__es.query({
# 'size': 10,
# 'query': {
# 'match_all': {}
# }
# }))
LOGGER.debug(f'Triggering event:\n{json.dumps(event, indent=4)}')
s3_records = S3ToSqs(event)
ignoring_phrases = ['spark-staging', '_temporary']
for i in range(s3_records.size()):
self.__s3_url = s3_records.get_s3_url(i)
if any([k in self.__s3_url for k in ignoring_phrases]):
LOGGER.debug(f'skipping temp file: {self.__s3_url}')
continue
LOGGER.debug(f'executing: {self.__s3_url}')
s3_event = s3_records.get_event_name(i).strip().lower()
if s3_event.startswith('objectcreated'):
LOGGER.debug('executing index')
self.ingest_file()
elif s3_event.startswith('objectremoved'):
LOGGER.debug('executing to remove index')
self.remove_file()
else:
LOGGER.error(f'invalid s3_event: {s3_event}; skipping it')
return