def __init__()

in dask-fargate/dask_fargate/dask_fargate_stack.py [0:0]


    def __init__(self, scope: core.Construct, id: str, **kwargs) -> None:
        super().__init__(scope, id, **kwargs)


        # CONTAINER_IMAGE = 'daskdev/dask:0.19.4'
        # if use_rapids:
        #   CONTAINER_IMAGE = 'rapidsai/rapidsai:latest'

        # if use_notebook:
        #   CONTAINER_IMAGE = 'daskdev/dask-notebook:latest'

        #TODO : Create ECR repository
        #Update: Not required sunce ecs.ContainerImage already creates and pushes using same asset

        #ecr = aws_ecr.Repository(self, 'MyECR', repository_name='dask')
        # not needed if you use an asset like below:

        dockercontainer = ecs.ContainerImage.from_asset(directory = 'dockerstuff', build_args=['-t dask .'])
        
        # Create vpc
        vpc = ec2.Vpc(self, 'MyVpc', max_azs=3)     # default is all AZs in region
        subnets = vpc.private_subnets

        # Create log groups for the scheduler and workers
        s_logs = logs.LogGroup(self, 'SlogGroup', log_group_name='SlogGroup')
        w_logs = logs.LogGroup(self, 'WlogGroup', log_group_name='WlogGroup')

        #Create private namespace
        #nspace = sd.PrivateDnsNamespace(self, 'MyNamespace', vpc=vpc, name='local-dask')

        # #Create role for ECS
        nRole = iam_.Role(self,'ECSExecutionRole',
            assumed_by = iam_.ServicePrincipal('ecs-tasks'))
        
        nPolicy = iam_.Policy(
            self,
            "ECSExecutionPolicy",
            policy_name = "ECSExecutionPolicy",
            statements = [iam_.PolicyStatement(actions = 
                ['ecr:BatchCheckLayerAvailability',
                'ecr:GetDownloadUrlForLayer',
                'ecr:BatchGetImage',
                'ecr:GetAuthorizationToken',
                'logs:CreateLogStream',
                'logs:PutLogEvents','sagemaker:*','s3:*'], resources=['*',]),]).attach_to_role(nRole)


        # Create ECS cluster
        cluster = ecs.Cluster(self, 'DaskCluster', 
            vpc=vpc, cluster_name='Fargate-Dask-Cluster')

        nspace = cluster.add_default_cloud_map_namespace(name='local-dask',type=sd.NamespaceType.DNS_PRIVATE,vpc=vpc)

        #TO DO: Use default namespace for cluster and use cmap options within fargate service
        #Update: done

        # schedulerRegistry = sd.Service(self,'serviceRegistryScheduler', 
        #     namespace=nspace,dns_ttl=core.Duration.seconds(60),
        #     custom_health_check=sd.HealthCheckCustomConfig(failure_threshold=10),
        #     name='Dask-Scheduler')
       
        # # schedulerRegistry.register_ip_instance(id='serviceRegistryScheduler',ipv4='')

        # workerRegistry = sd.Service(self,'workerRegistryScheduler', 
        #     namespace=nspace,dns_ttl=core.Duration.seconds(60),
        #     custom_health_check=sd.HealthCheckCustomConfig(failure_threshold=10),
        #     name='Dask-Worker')
       

        # -------------------- Add scheduler task ------------------------
        schedulerTask = ecs.TaskDefinition(self, 'taskDefinitionScheduler',
            compatibility=ecs.Compatibility.FARGATE,
            cpu='4096', memory_mib='8192',
            network_mode=ecs.NetworkMode.AWS_VPC,
            placement_constraints=None, execution_role=nRole,
            family='Dask-Scheduler', task_role=nRole)

        schedulerTask.add_container('MySchedulerImage', image=dockercontainer,
            command=['dask-scheduler'], cpu=4096, essential=True,
            logging=ecs.LogDriver.aws_logs(stream_prefix='ecs',log_group = s_logs),
            memory_limit_mib=8192, memory_reservation_mib=8192)


        # -------------------- Add worker task -----------------------------
        workerTask = ecs.TaskDefinition(self, 'taskDefinitionWorker',
            compatibility=ecs.Compatibility.FARGATE,
            cpu='4096', memory_mib='8192',
            network_mode=ecs.NetworkMode.AWS_VPC,
            placement_constraints=None, execution_role=nRole,
            family='Dask-Worker', task_role=nRole)

        workerTask.add_container('MyWorkerImage', image=dockercontainer,
            command=['dask-worker','dask-scheduler.local-dask:8786','--memory-limit 1800MB',
            '--worker-port 9000','--nanny-port 9001','--bokeh-port 9002'], 
            cpu=4096, essential=True,
            logging=ecs.LogDriver.aws_logs(stream_prefix='ecs',log_group = s_logs),
            memory_limit_mib=8192, memory_reservation_mib=8192)

        # Task security group
        sg = ec2.SecurityGroup(self, 'MySG',
            vpc=vpc,description='Enable Scheduler ports access',
            security_group_name='DaskSecurityGroup')

        # Ingress rule requires IPeer not Peer
        # TO DO: fix from any ipv4 to SG
        p1 = ec2.Peer().ipv4('0.0.0.0/0'); p2 = ec2.Peer().ipv4('0.0.0.0/0');

        sg.add_ingress_rule(peer = p1, connection = ec2.Port(protocol=ec2.Protocol.TCP,
                string_representation='p1', 
                from_port=8786, to_port=8789))

        sg.add_ingress_rule(peer = p2, connection = ec2.Port(protocol=ec2.Protocol.TCP,
                string_representation='p2', 
                from_port=9000, to_port=9002))

        # ----------------- Add Scheduler Service -----------------------

        # deployconfig = ecs.CfnService.DeploymentConfigurationProperty(maximum_percent=200,minimum_healthy_percent=100)

        # vpcconfig = ecs.CfnService.AwsVpcConfigurationProperty(subnets = subnets,assign_public_ip=True,security_groups=[sg])

        # networkconfig = ecs.CfnService.NetworkConfigurationProperty(awsvpc_configuration=vpcconfig)

        # schedulerService = ecs.CfnService(self, 'DaskSchedulerService', 
        #     task_definition = schedulerTask, deployment_configuration=deployconfig,
        #     cluster=cluster, desired_count=1, enable_ecs_managed_tags=None,
        #     launch_type='FARGATE',network_configuration=networkconfig,
        #     service_registries=schedulerRegistry)

        #ecs.CfnService.ServiceRegistryProperty()

        # Try fargate service? No service registry option available
        #using default cluster namespace
        cmap1 = ecs.CloudMapOptions(dns_ttl=core.Duration.seconds(60), failure_threshold=10, name='Dask-Scheduler')

        schedulerService = ecs.FargateService(self, 'DaskSchedulerService',
            task_definition=schedulerTask,
            assign_public_ip=True,security_group=sg,
            #vpc_subnets=subnets, 
        cluster=cluster,desired_count=1,
            max_healthy_percent=200, min_healthy_percent=100,
            service_name='Dask-Scheduler',cloud_map_options=cmap1)

        # schedulerService.enable_cloud_map(name = 'serviceRegistryScheduler')
        # schedulerRegistry.register_non_ip_instance(self,instance_id='DaskSchedulerService')

        # ----------------- Add Worker Service -----------------------
        #using default cluster namespace
        cmap2 = ecs.CloudMapOptions(dns_ttl=core.Duration.seconds(60), failure_threshold=10, name='Dask-Worker')


        workerService = ecs.FargateService(self, 'DaskWorkerService',
            task_definition=workerTask,
            assign_public_ip=True,security_group=sg,
            #vpc_subnets=subnets, 
        cluster=cluster,desired_count=1,
            max_healthy_percent=200, min_healthy_percent=100,
            service_name='Dask-Worker',cloud_map_options=cmap2)

        # workerService.enable_cloud_map(name = 'workerRegistryScheduler')



        #------------------------------------------------------------------------

        # Very less control with ECS patterns, did not work

        # ecs_patterns.ApplicationLoadBalancedFargateService(self, "DaskFargateStack",
        #     cluster=cluster,            # Required
        #     cpu=512,                    # Default is 256
        #     desired_count=6,            # Default is 1
        #     task_image_options=ecs_patterns.ApplicationLoadBalancedTaskImageOptions(
        #         image=ecs.ContainerImage.from_registry(CONTAINER_IMAGE)),
        #     memory_limit_mib=2048,      # Default is 512
        #     public_load_balancer=True)  # Default is False


        # Start a notebook in the same vpc
        # print(type(sg.security_group_id))
        # print("------------------------------")
        # print(subnets[0].subnet_id)
                #Create role for Notebook instance
        smRole = iam_.Role(
            self,
            "notebookAccessRole",
            assumed_by = iam_.ServicePrincipal('sagemaker'))
        
        smPolicy = iam_.Policy(
            self,
            "notebookAccessPolicy",
            policy_name = "notebookAccessPolicy",
            statements = [iam_.PolicyStatement(actions = ['s3:*','ecs:*'], resources=['*',]),]).attach_to_role(smRole)


        notebook = sagemaker_.CfnNotebookInstance(
                self,
                'DaskNotebook',
                instance_type = 'ml.t2.medium',
                volume_size_in_gb = 50,
                security_group_ids = [sg.security_group_id],
                subnet_id = subnets[0].subnet_id,
                notebook_instance_name = 'DaskNotebook',
                role_arn = smRole.role_arn,
                root_access='Enabled',
                direct_internet_access='Enabled',
                default_code_repository='https://github.com/w601sxs/dask-examples.git')