in packages/blueprints/gen-ai-chatbot/static-assets/chatbot-genai-cdk/custom-resources/cognito-trigger/index.py [0:0]
def handler(event, context):
"""Custom resource to implement the functionality to add triggers to existing Cognito user pools.
Because CloudFormation does not provide that functionality.
"""
request_type = event['RequestType']
physical_resource_id = event.get('PhysicalResourceId') or f'{USER_POOL_ID}-triggers'
resource_properties = event['ResourceProperties']
triggers = resource_properties['Triggers']
try:
if request_type == 'Create':
response = cognito.describe_user_pool(UserPoolId=USER_POOL_ID)
attr = response['UserPool']
lambda_config = attr.get('LambdaConfig', {})
update_user_pool_lambda_config(USER_POOL_ID, attr, lambda_config={
**lambda_config,
**triggers,
})
physical_resource_id = f'{USER_POOL_ID}-triggers'
cfnresponse.send(event, context, cfnresponse.SUCCESS, resource_properties, physical_resource_id)
elif request_type == 'Update':
response = cognito.describe_user_pool(UserPoolId=USER_POOL_ID)
attr = response['UserPool']
lambda_config = attr.get('LambdaConfig', {})
old_resource_properties = event['OldResourceProperties']
old_triggers = old_resource_properties['Triggers']
update_user_pool_lambda_config(USER_POOL_ID, attr, lambda_config={
**{ k: v for k, v in lambda_config.items() if k not in old_triggers.keys() },
**triggers,
})
physical_resource_id = f'{USER_POOL_ID}-triggers'
cfnresponse.send(event, context, cfnresponse.SUCCESS, resource_properties, physical_resource_id)
elif request_type == 'Delete':
response = cognito.describe_user_pool(UserPoolId=USER_POOL_ID)
attr = response['UserPool']
lambda_config = attr.get('LambdaConfig', {})
update_user_pool_lambda_config(USER_POOL_ID, attr, lambda_config={
k: v for k, v in lambda_config.items() if k not in triggers.keys()
})
cfnresponse.send(event, context, cfnresponse.SUCCESS, None, physical_resource_id)
except Exception as err:
print(err)
cfnresponse.send(event, context, cfnresponse.FAILED, None, physical_resource_id)