def __init__()

in source/idea/idea-sdk/src/ideasdk/context/soca_context.py [0:0]


    def __init__(self, options: SocaContextOptions = None):

        try:

            self._lock = RLock()

            if options is None:
                options = SocaContextOptions.default()

            self._options = options
            self._config: Optional[SocaConfigType] = None
            self._aws: Optional[AwsClientProvider] = None
            self._permission_util: Optional[IamPermissionUtil] = None
            self._instance_metadata_util: Optional[InstanceMetadataUtil] = None
            self._aws_util: Optional[AWSUtil] = None
            self._distributed_lock: Optional[DistributedLock] = None
            self._metrics_service: Optional[MetricsService] = None
            self._leader_election: Optional[LeaderElection] = None

            is_app_server = Utils.get_as_bool(options.is_app_server, False)
            if is_app_server:

                if Utils.is_empty(options.cluster_name):
                    raise exceptions.invalid_params('cluster_name is required')
                if Utils.is_empty(options.aws_region):
                    raise exceptions.invalid_params('aws_region is required')
                if Utils.is_empty(options.module_id):
                    raise exceptions.invalid_params('module_id is required')
                if Utils.is_empty(options.module_set):
                    raise exceptions.invalid_params('module_set is required')

                self._config = ClusterConfig(
                    cluster_name=options.cluster_name,
                    aws_region=options.aws_region,
                    module_id=options.module_id,
                    module_set=options.module_set,
                    aws_profile=options.aws_profile,
                    create_subscription=True
                )

                self._logging = SocaLogging(config=self._config, module_id=options.module_id)
                self._cache_provider = CacheProvider(context=self, module_id=options.module_id)

                self._config.set_logger(self._logging.get_logger('cluster-config'))

            elif options.config is None and Utils.is_not_empty(options.cluster_name) and Utils.is_not_empty(options.aws_region):

                self._config = ClusterConfig(
                    cluster_name=options.cluster_name,
                    aws_region=options.aws_region,
                    module_id=options.module_id,
                    module_set=options.module_set,
                    aws_profile=options.aws_profile
                )

                self._logging = SocaLogging(default_logging_profile=self._options.default_logging_profile)
                self._cache_provider = CacheProvider(context=self, module_id='default')

            else:

                config = Utils.get_as_dict(options.config, {})
                self._config = SocaConfig(config)
                self._logging = SocaLogging(module_id='default', default_logging_profile=self._options.default_logging_profile)
                self._cache_provider = CacheProvider(context=self, module_id='default')

            # soca sdk defaults
            self._broadcast = SocaPubSub(
                topic=constants.TOPIC_BROADCAST,
                logger=self._logging.get_logger()
            )
            self._service_registry = SocaServiceRegistry(context=self)

            # begin: optional components
            if options.enable_aws_client_provider:
                endpoints = []
                if Utils.is_true(options.use_vpc_endpoints) and self.config().get_bool('cluster.network.use_vpc_endpoints', False):
                    vpc_endpoints = self.config().get_config('cluster.network.vpc_interface_endpoints', {})
                    for service_name in vpc_endpoints:
                        endpoint_config = vpc_endpoints[service_name]
                        if not Utils.get_value_as_bool('enabled', endpoint_config, False):
                            continue
                        endpoint_url = Utils.get_value_as_string('endpoint_url', endpoint_config)
                        if Utils.is_empty(endpoint_url):
                            continue
                        endpoints.append(AwsServiceEndpoint(
                            service_name=service_name,
                            endpoint_url=endpoint_url
                        ))

                self._aws = AwsClientProvider(
                    options=AWSClientProviderOptions(
                        profile=options.aws_profile,
                        region=options.aws_region,
                        endpoints=endpoints
                    )
                )

            # initialize locale
            if Utils.is_not_empty(self._options.locale):
                self._locale = self._options.locale
            else:
                self._locale = self.config().get_string('cluster.locale', EnvironmentUtils.get_environment_variable('LC_CTYPE', default='en_US'))
            locale.init(self._locale)

            # iam permission utils
            if options.enable_iam_permission_util:
                self._permission_util = IamPermissionUtil(context=self)

            # ec2 instance metadata util
            if options.enable_instance_metadata_util:
                self._instance_metadata_util = InstanceMetadataUtil()

            # aws util
            if options.enable_aws_util:
                self._aws_util = AWSUtil(context=self)

            # distributed lock
            if options.enable_distributed_lock:
                self._distributed_lock = DistributedLock(context=self)

            # leader election
            if options.enable_leader_election:
                self._leader_election = LeaderElection(context=self)

            # metrics
            if options.enable_metrics:
                self._metrics_service = MetricsService(context=self, default_namespace=options.metrics_namespace)

        except BaseException as e:
            if self._distributed_lock is not None:
                self._distributed_lock.stop()
            if self._leader_election is not None:
                self._leader_election.stop()
            if self._metrics_service is not None:
                self._metrics_service.stop()
            if self._config is not None and isinstance(self._config, ClusterConfig):
                self._config.db.stop()
            raise e