constructor()

in source/lib/ec2-worker-stack.ts [50:282]


    constructor(scope: Construct, id: string, props: Ec2WorkerProps) {
        super(scope, id);

        const instanceType = new ec2.InstanceType('t4g.micro')

        const amznLinux = ec2.MachineImage.latestAmazonLinux({
            generation: ec2.AmazonLinuxGeneration.AMAZON_LINUX_2,
            edition: ec2.AmazonLinuxEdition.STANDARD,
            storage: ec2.AmazonLinuxStorage.GENERAL_PURPOSE,
            cpuType: ec2.AmazonLinuxCpuType.ARM_64,
        });


        const ec2SG = new ec2.SecurityGroup(this, 'S3RepEC2SG', {
            vpc: props.vpc,
            description: 'Security Group for Data Replication Hub EC2 instances',
            allowAllOutbound: true
        });
        // For dev only
        // ec2SG.addIngressRule(ec2.Peer.anyIpv4(), ec2.Port.tcp(22), 'Allow ssh access');

        const cfnSG = ec2SG.node.defaultChild as ec2.CfnSecurityGroup
        addCfnNagSuppressRules(cfnSG, [
            {
                id: 'W5',
                reason: 'Open egress rule is required to access public network'
            },
            {
                id: 'W40',
                reason: 'Open egress rule is required to access public network'
            },
        ])

        const workerAsgRole = new iam.Role(this, 'WorkerAsgRole', {
            assumedBy: new iam.ServicePrincipal('ec2.amazonaws.com')
        })

        const cwAgentPolicy = new iam.Policy(this, 'CWAgentPolicy', {
            statements: [
                new iam.PolicyStatement({
                    effect: iam.Effect.ALLOW,
                    resources: [
                        '*'
                    ],
                    actions: [
                        'cloudwatch:PutMetricData',
                        'ec2:DescribeVolumes',
                        'ec2:DescribeTags',
                        'logs:CreateLogGroup',
                        'logs:CreateLogStream',
                        'logs:PutLogEvents',
                        'logs:DescribeLogStreams',
                        'logs:DescribeLogGroups',
                    ],
                })
            ]
        });

        const cfnCwAgentPolicy = cwAgentPolicy.node.defaultChild as iam.CfnPolicy
        addCfnNagSuppressRules(cfnCwAgentPolicy, [
            {
                id: 'W12',
                reason: 'Publish log streams requires any resources'
            },
        ])

        workerAsgRole.attachInlinePolicy(cwAgentPolicy)


        this.workerAsg = new asg.AutoScalingGroup(this, 'S3RepWorkerASG', {
            autoScalingGroupName: `${Aws.STACK_NAME}-Worker-ASG`,
            vpc: props.vpc,
            instanceType: instanceType,
            machineImage: amznLinux,
            maxCapacity: props.maxCapacity ? props.maxCapacity : 20,
            minCapacity: props.minCapacity ? props.minCapacity : 1,
            desiredCapacity: props.desiredCapacity ? props.desiredCapacity : 1,
            // spotPrice: "0.01",
            // healthCheck: autoscaling.HealthCheck.ec2(),
            securityGroup: ec2SG,
            // keyName: 'ad-key',  // dev only
            instanceMonitoring: asg.Monitoring.DETAILED,
            associatePublicIpAddress: true,
            groupMetrics: [new asg.GroupMetrics(asg.GroupMetric.DESIRED_CAPACITY, asg.GroupMetric.IN_SERVICE_INSTANCES)],
            cooldown: Duration.minutes(2),
            role: workerAsgRole,
            signals: asg.Signals.waitForMinCapacity(),
            blockDevices: [
                {
                    deviceName: '/dev/xvda',
                    volume: asg.BlockDeviceVolume.ebs(8, {
                        encrypted: true,
                    }),
                },
            ]
        });

        Tags.of(this.workerAsg).add('Name', `${Aws.STACK_NAME}-Replication-Worker`, {})

        const ec2LG = new LogGroup(this, 'S3RepWorkerLogGroup', {
            retention: RetentionDays.TWO_WEEKS,
        });

        const cfnEc2LG = ec2LG.node.defaultChild as CfnLogGroup
        addCfnNagSuppressRules(cfnEc2LG, [
            {
                id: 'W84',
                reason: 'log group is encrypted with the default master key'
            }
        ])


        const assetTable = new CfnMapping(this, 'AssetTable', {
            mapping: {
                'aws': {
                    assetDomain: 'https://aws-gcr-solutions-assets.s3.amazonaws.com',
                },
                'aws-cn': {
                    assetDomain: 'https://aws-gcr-solutions-assets.s3.cn-northwest-1.amazonaws.com.cn',
                },
            }
        });

        const cliAssetDomain = assetTable.findInMap(Aws.PARTITION, 'assetDomain')

        this.workerAsg.applyCloudFormationInit(ec2.CloudFormationInit.fromElements(ec2.InitFile.fromFileInline('/home/ec2-user/cw_agent_config.json', path.join(__dirname, '../config/cw_agent_config.json'))))

        this.workerAsg.userData.addCommands(
            'yum update -y',
            'cd /home/ec2-user/',

            // Enable BBR
            'echo "net.core.default_qdisc = fq" >> /etc/sysctl.conf',
            'echo "net.ipv4.tcp_congestion_control = bbr" >> /etc/sysctl.conf',
            'sysctl -p',
            'echo `sysctl net.ipv4.tcp_congestion_control` > worker.log',

            // Enable Cloudwatch Agent
            'yum install -y amazon-cloudwatch-agent',
            `sed -i  -e "s/##log group##/${ec2LG.logGroupName}/g" cw_agent_config.json`,
            '/opt/aws/amazon-cloudwatch-agent/bin/amazon-cloudwatch-agent-ctl -a fetch-config -m ec2 -c file:/home/ec2-user/cw_agent_config.json -s',

            // Get CLI from solution assets
            `curl -LO "${cliAssetDomain}/data-transfer-hub-cli/v${props.cliRelease}/dthcli_${props.cliRelease}_linux_arm64.tar.gz"`,
            `tar zxvf dthcli_${props.cliRelease}_linux_arm64.tar.gz`,

            // Prepare the environment variables
            `echo "export JOB_TABLE_NAME=${props.env.JOB_TABLE_NAME}" >> env.sh`,
            `echo "export JOB_QUEUE_NAME=${props.env.JOB_QUEUE_NAME}" >> env.sh`,

            `echo "export SOURCE_TYPE=${props.env.SOURCE_TYPE}" >> env.sh`,
            `echo "export SRC_BUCKET=${props.env.SRC_BUCKET}" >> env.sh`,
            `echo "export SRC_PREFIX=${props.env.SRC_PREFIX}" >> env.sh`,
            `echo "export SRC_REGION=${props.env.SRC_REGION}" >> env.sh`,
            `echo "export SRC_ENDPOINT=${props.env.SRC_ENDPOINT}" >> env.sh`,
            `echo "export SRC_CREDENTIALS=${props.env.SRC_CREDENTIALS}" >> env.sh`,
            `echo "export SRC_IN_CURRENT_ACCOUNT=${props.env.SRC_IN_CURRENT_ACCOUNT}" >> env.sh`,

            `echo "export DEST_BUCKET=${props.env.DEST_BUCKET}" >> env.sh`,
            `echo "export DEST_PREFIX=${props.env.DEST_PREFIX}" >> env.sh`,
            `echo "export DEST_REGION=${props.env.DEST_REGION}" >> env.sh`,
            `echo "export DEST_CREDENTIALS=${props.env.DEST_CREDENTIALS}" >> env.sh`,
            `echo "export DEST_IN_CURRENT_ACCOUNT=${props.env.DEST_IN_CURRENT_ACCOUNT}" >> env.sh`,
            `echo "export DEST_STORAGE_CLASS=${props.env.DEST_STORAGE_CLASS}" >> env.sh`,
            `echo "export DEST_ACL=${props.env.DEST_ACL}" >> env.sh`,

            // `echo "export MULTIPART_THRESHOLD=${props.env.MULTIPART_THRESHOLD}" >> env.sh`,
            // `echo "export CHUNK_SIZE=${props.env.CHUNK_SIZE}" >> env.sh`,
            `echo "export FINDER_DEPTH=${props.env.FINDER_DEPTH}" >> env.sh`,
            `echo "export FINDER_NUMBER=${props.env.FINDER_NUMBER}" >> env.sh`,
            `echo "export WORKER_NUMBER=${props.env.WORKER_NUMBER}" >> env.sh`,
            `echo "export INCLUDE_METADATA=${props.env.INCLUDE_METADATA}" >> env.sh`,
            `echo "export AWS_DEFAULT_REGION=${Aws.REGION}" >> env.sh`,

            // Create the script
            'echo "source /home/ec2-user/env.sh" >> start-worker.sh',
            'echo "nohup ./dthcli run -t Worker |& tee -a /home/ec2-user/worker.log" >> start-worker.sh',
            'echo "echo \'Error occured, trying to terminate instance...\' >> /home/ec2-user/worker.log" >> start-worker.sh',
            'echo "shutdown" >> start-worker.sh', // shutdown will terminate the instance as asg will automatically replace the stopped one

            'chmod +x start-worker.sh',
            // Run the script
            './start-worker.sh',
        )



        ec2LG.addMetricFilter('CompletedBytes', {
            metricName: 'CompletedBytes',
            metricNamespace: `${Aws.STACK_NAME}`,
            metricValue: '$Bytes',
            filterPattern: FilterPattern.literal('[data, time, p="----->Completed", Bytes, ...]')
        })

        ec2LG.addMetricFilter('Transferred-Objects', {
            metricName: 'TransferredObjects',
            metricNamespace: `${Aws.STACK_NAME}`,
            metricValue: '1',
            filterPattern: FilterPattern.literal('[data, time, p="----->Transferred", ..., s="DONE"]')
        })

        ec2LG.addMetricFilter('Failed-Objects', {
            metricName: 'FailedObjects',
            metricNamespace: `${Aws.STACK_NAME}`,
            metricValue: '1',
            filterPattern: FilterPattern.literal('[data, time, p="----->Transferred", ..., s="ERROR"]')
        })

        const allMsg = new cw.MathExpression({
            expression: "notvisible + visible",
            usingMetrics: {
                notvisible: props.queue.metricApproximateNumberOfMessagesNotVisible(),
                visible: props.queue.metricApproximateNumberOfMessagesVisible(),
            },
            period: Duration.minutes(1),
            label: "# of messages",
        })

        this.workerAsg.scaleOnMetric('ScaleOutSQS', {
            metric: allMsg,
            scalingSteps: [
                { upper: 0, change: -10000 }, // Scale in when no messages to process
                { lower: 100, change: +1 },
                { lower: 500, change: +2 },
                { lower: 2000, change: +5 },
                { lower: 10000, change: +10 },
            ],
            adjustmentType: asg.AdjustmentType.CHANGE_IN_CAPACITY,
        })



    }