in src/bulk-loader/batch-handler/lib/registrationproviders.py [0:0]
def get_registration_data(self, task:BatchTask)->UserRegistrationInfo:
assert task is not None, "No task available."
response = self.s3.get_object_tagging(
Bucket = task.bucket_name,
Key= task.s3Key
)
'''
Convert the Object Tags into Registration metadata.
'''
registration = UserRegistrationInfo()
for tag in response['TagSet']:
key:str = tag['Key']
compare_key = key.lower()
value:str = tag['Value']
if compare_key == "userid":
registration.user_id = value
elif compare_key in ['indexed', 'ignore']:
continue
elif compare_key == 'properties':
registration.properties = self.get_property_bag(value)
return registration