in CommonLayerCode/datalake-library/python/datalake_library/configuration/event_configs.py [0:0]
def _fetch_from_event(self):
self._logger.info("Collecting config parameters from S3 write event")
try:
self._region = self._event['Records'][0]['awsRegion']
self._source_bucket = self._event['Records'][0]['s3']['bucket']['name']
self._object_key = parse.unquote_plus(
self._event['Records'][0]['s3']['object']['key'].encode('utf8').decode('utf8'))
if self._source_bucket.split('-')[-1] in ['raw', 'stage', 'analytics']:
self._stage = self._source_bucket.split('-')[-1]
self._team = self._object_key.split('/')[0]
self._dataset = self._object_key.split('/')[1]
else:
self._stage = self._object_key.split('/')[0]
self._team = self._object_key.split('/')[1]
self._dataset = self._object_key.split('/')[2]
self._size = int(self._event['Records'][0]['s3']['object']['size'])
self._landing_time = self._event['Records'][0]['eventTime']
except KeyError:
if self._event['detail'].get('errorCode', None):
msg = 'Event refers to a failed command: ' \
'error_code {}, bucket {}, object {}'.format(self._event['detail']['error_code'],
self._event['detail']['raw_s3_bucket'],
self._event['detail']['file_key'])
raise ValueError(msg)
# Falling back on CloudTrail Event
self._region = self._event['detail']['awsRegion']
self._source_bucket = self._event['detail']['requestParameters']['bucketName']
self._object_key = parse.unquote_plus(
self._event['detail']['requestParameters']['key'].encode('utf8').decode('utf8'))
if self._source_bucket.split('-')[-1] in ['raw', 'stage', 'analytics']:
self._stage = self._source_bucket.split('-')[-1]
self._team = self._object_key.split('/')[0]
self._dataset = self._object_key.split('/')[1]
else:
self._stage = self._object_key.split('/')[0]
self._team = self._object_key.split('/')[1]
self._dataset = self._object_key.split('/')[2]
self._size = int(self._event['detail']['additionalEventData']['bytesTransferredIn'])
self._landing_time = self._event['detail']['eventTime']