def lambda_handler()

in lambda-functions/LambdaConfig.py [0:0]


def lambda_handler(event, context):
    
    
    stack_name = os.environ["stackname"]

    # Logic to handle Stack delete
    
    if event["RequestType"] == 'Delete':

        try:
            # List All connectors with the stackName as the prefix and get the ARN of the connector

            response = connect.list_connectors(connectorNamePrefix=stack_name)
            connectorArn=response['connectors'][0]['connectorArn']

            #Delete the connector
            response = connect.delete_connector(connectorArn=connectorArn)

            # Wait for 60 seconds for the connector to get deleted. Otherwise this Lambda function will get deleted before recieving a response

            sleep(60)
        
        except Exception as e:
            logging.error('Exception: %s' % e, exc_info=True)

        # Return Results

        #Send success message to CFn to continue deleting the stack

        status = cfnresponse.SUCCESS
        cfnresponse.send(event, context, status, {})
        
    else:
    
        print('Received event: %s' % json.dumps(event))
        status = cfnresponse.SUCCESS

        # Get Env variables
    
        msk_cluster_arn = os.environ["mskClusterArn"]
    
        msk_response_bootstrap = msk.get_bootstrap_brokers(ClusterArn=msk_cluster_arn)
        bootstrap_servers=msk_response_bootstrap["BootstrapBrokerStringTls"]
        
        kda_name = os.environ["kdaName"]
        subnet_1 = os.environ["subnet1"]
        subnet_2 = os.environ["subnet2"]
        sg_id = os.environ["sgID"]

        aws_account = os.environ["awsAccount"]
        aws_region = os.environ["awsRegion"]
        opensearch_username = os.environ["opensearchUsername"]
        opensearch_password = os.environ["opensearchPassword"]
        opensearch_endpoint = os.environ["opensearchDomainEndpoint"]
        file_key = os.environ["s3connectorkey"]
        connector_role_arn= os.environ["mskconnectRole"]
        processed_topic=os.environ["topic"]
        mskconnect_log_group=os.environ["logGroupName"]
    
    
    
        #kda = f"arn:aws:kinesisanalytics:{aws_region}:{aws_account}:application/{kda_name}"]

        # Get current version of MSK cluster
        
        msk_response_describe = msk.describe_cluster(ClusterArn=msk_cluster_arn)
        msk_current_version= msk_response_describe["ClusterInfo"]["CurrentVersion"]
    
    
        try:

            # Create MSK config to enable automatic topic creation
    
            msk_response_create = msk.create_configuration(
            Name=f'{stack_name}-msk-config',
            Description='The configuration to use on blog MSK clusters.',
            KafkaVersions=['2.2.1'],
            ServerProperties=b"auto.create.topics.enable = true")
    
            configuration_arn = msk_response_create["Arn"]
            configuration_revision = msk_response_create["LatestRevision"]["Revision"]


            # Update MSK Cluster config to use the newly created Config
    
            msk_response_update = msk.update_cluster_configuration(
            ClusterArn=msk_cluster_arn,
            ConfigurationInfo={
                'Arn': configuration_arn,
                'Revision': configuration_revision
            },
            CurrentVersion=msk_current_version)

            #################################################
            # Describe KDA application to get the exsisting properties of the application and current version ID
    
            kda_response_describe = kda.describe_application(ApplicationName=kda_name)
            current_application_version_Id =kda_response_describe["ApplicationDetail"]["ApplicationVersionId"]


            # Add VPC config to the current application version
    
            kda_response_vpc = kda.add_application_vpc_configuration(
            ApplicationName=kda_name,
            CurrentApplicationVersionId=current_application_version_Id,
            VpcConfiguration={
                'SubnetIds': [
                    subnet_1,subnet_2
                ],
                'SecurityGroupIds': [sg_id]
            })
            
            # Update the application properties to add bootstrap_servers
            
            
            kda_response_describe = kda.describe_application(ApplicationName=kda_name)
            current_application_version_Id =kda_response_describe["ApplicationDetail"]["ApplicationVersionId"]
            s3_bucket_arn = kda_response_describe["ApplicationDetail"]["ApplicationConfigurationDescription"]["ApplicationCodeConfigurationDescription"]["CodeContentDescription"]["S3ApplicationCodeLocationDescription"]["BucketARN"]
            
            properties = kda_response_describe["ApplicationDetail"]["ApplicationConfigurationDescription"]["EnvironmentPropertyDescriptions"]["PropertyGroupDescriptions"]
            properties[1]["PropertyMap"]["bootstrap.servers"]=bootstrap_servers
            
            kda_response_bootstrap = kda.update_application(
            ApplicationName=kda_name,
            CurrentApplicationVersionId=current_application_version_Id,
            ApplicationConfigurationUpdate={
                'ApplicationCodeConfigurationUpdate': {
                    'CodeContentTypeUpdate': 'ZIPFILE',
                    'CodeContentUpdate': {
                        'S3ContentLocationUpdate': {
                            'BucketARNUpdate': s3_bucket_arn,
                            'FileKeyUpdate': 'RealTimeFraudPrevention.zip'
                        }
                    }
                },
                'EnvironmentPropertyUpdates': {
                    'PropertyGroups': properties
                    }
            })


            # Start KDA application
    
    
            kda_response_start = kda.start_application(
            ApplicationName=kda_name,
            RunConfiguration={
                'FlinkRunConfiguration': {
                    'AllowNonRestoredState': True
                },
                'ApplicationRestoreConfiguration': {
                    'ApplicationRestoreType': 'SKIP_RESTORE_FROM_SNAPSHOT'
                }
            })


            ######################################################################
            # MSK Connect


            # Create Custom plugin to be used by the connector. Custom plugin is created using the opensource Confluent ElasticSearch Connector 
    
            plugin_name = f'{stack_name}-osplugin'
            connect_response_create_pl = connect.create_custom_plugin(
            contentType='ZIP',
            location={
                's3Location': {
                    'bucketArn': s3_bucket_arn,
                    'fileKey': file_key,
                    }
                },
            name=plugin_name)

            # Wait for custom plugin to be created
    
            sleep(45)


            # Create connector passing details from the Stack
            #   1- sink details (opensearch domain endpoint, opensearch username, opensearch passord)
            #   2- bootstrap_servers and topic to read from
            #   3- Networking delails - same VPC, subnet and SG as MSK Cluster
            #   4- Enabling logging and Cloudwatch Log Group
            #   5- Custom plugin created earlier
            #   6- IAM Role for MSK connect cluster
    
            connect_responsecreate_con = connect.create_connector(
            capacity={
                'autoScaling': {
                    'maxWorkerCount': 2,
                    'mcuCount': 1,
                    'minWorkerCount': 1,
                    'scaleInPolicy': {
                        'cpuUtilizationPercentage': 20
                    },
                    'scaleOutPolicy': {
                        'cpuUtilizationPercentage': 80
                    }
                }
            },
            connectorConfiguration={
                "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
                "connection.username": opensearch_username,
                "connection.password": opensearch_password,
                "tasks.max": "1",
                "topics": processed_topic,
                "key.ignore": "true",
                "connection.url": f'https://{opensearch_endpoint}:443',
                "schema.ignore": "true",
                "elastic.security.protocol":"SSL",
                "elastic.https.ssl.protocol":"TLSv1.2",
                "value.converter": "org.apache.kafka.connect.json.JsonConverter",
                "key.converter": "org.apache.kafka.connect.storage.StringConverter",
                "value.converter.schemas.enable": "false",
                "name": f'{stack_name}-os-sink'
            },
            connectorDescription='os-sink',
            connectorName=f'{stack_name}-os-sink',
            kafkaCluster={
                'apacheKafkaCluster': {
                    'bootstrapServers': bootstrap_servers,
                    'vpc': {
                        'securityGroups': [sg_id],
                        'subnets': [
                            subnet_1,subnet_2
                        ]
                    }
                }
            },
            kafkaClusterClientAuthentication={
                'authenticationType': 'NONE'
            },
            kafkaClusterEncryptionInTransit={
                'encryptionType': 'TLS'
            },
            kafkaConnectVersion='2.7.1',
            logDelivery={
                'workerLogDelivery': {
                    'cloudWatchLogs': {
                        'enabled': True,
                        'logGroup': mskconnect_log_group
                    }
                }
            },
            plugins=[
                {
                    'customPlugin': {
                        'customPluginArn': connect_response_create_pl["customPluginArn"],
                        'revision': 1
                    }
                },
            ],
            serviceExecutionRoleArn=connector_role_arn)

        # Catch errors
    
        except Exception as e:
            logging.error('Exception: %s' % e, exc_info=True)
            status = cfnresponse.FAILED

        # Send response to CFN to continue with Stack creation or rollback
    
        cfnresponse.send(event, context, status, {})