in source/idea/idea-sdk/src/ideasdk/context/soca_context.py [0:0]
def __init__(self, options: SocaContextOptions = None):
try:
self._lock = RLock()
if options is None:
options = SocaContextOptions.default()
self._options = options
self._config: Optional[SocaConfigType] = None
self._aws: Optional[AwsClientProvider] = None
self._permission_util: Optional[IamPermissionUtil] = None
self._instance_metadata_util: Optional[InstanceMetadataUtil] = None
self._aws_util: Optional[AWSUtil] = None
self._distributed_lock: Optional[DistributedLock] = None
self._metrics_service: Optional[MetricsService] = None
self._leader_election: Optional[LeaderElection] = None
is_app_server = Utils.get_as_bool(options.is_app_server, False)
if is_app_server:
if Utils.is_empty(options.cluster_name):
raise exceptions.invalid_params('cluster_name is required')
if Utils.is_empty(options.aws_region):
raise exceptions.invalid_params('aws_region is required')
if Utils.is_empty(options.module_id):
raise exceptions.invalid_params('module_id is required')
if Utils.is_empty(options.module_set):
raise exceptions.invalid_params('module_set is required')
self._config = ClusterConfig(
cluster_name=options.cluster_name,
aws_region=options.aws_region,
module_id=options.module_id,
module_set=options.module_set,
aws_profile=options.aws_profile,
create_subscription=True
)
self._logging = SocaLogging(config=self._config, module_id=options.module_id)
self._cache_provider = CacheProvider(context=self, module_id=options.module_id)
self._config.set_logger(self._logging.get_logger('cluster-config'))
elif options.config is None and Utils.is_not_empty(options.cluster_name) and Utils.is_not_empty(options.aws_region):
self._config = ClusterConfig(
cluster_name=options.cluster_name,
aws_region=options.aws_region,
module_id=options.module_id,
module_set=options.module_set,
aws_profile=options.aws_profile
)
self._logging = SocaLogging(default_logging_profile=self._options.default_logging_profile)
self._cache_provider = CacheProvider(context=self, module_id='default')
else:
config = Utils.get_as_dict(options.config, {})
self._config = SocaConfig(config)
self._logging = SocaLogging(module_id='default', default_logging_profile=self._options.default_logging_profile)
self._cache_provider = CacheProvider(context=self, module_id='default')
# soca sdk defaults
self._broadcast = SocaPubSub(
topic=constants.TOPIC_BROADCAST,
logger=self._logging.get_logger()
)
self._service_registry = SocaServiceRegistry(context=self)
# begin: optional components
if options.enable_aws_client_provider:
endpoints = []
if Utils.is_true(options.use_vpc_endpoints) and self.config().get_bool('cluster.network.use_vpc_endpoints', False):
vpc_endpoints = self.config().get_config('cluster.network.vpc_interface_endpoints', {})
for service_name in vpc_endpoints:
endpoint_config = vpc_endpoints[service_name]
if not Utils.get_value_as_bool('enabled', endpoint_config, False):
continue
endpoint_url = Utils.get_value_as_string('endpoint_url', endpoint_config)
if Utils.is_empty(endpoint_url):
continue
endpoints.append(AwsServiceEndpoint(
service_name=service_name,
endpoint_url=endpoint_url
))
self._aws = AwsClientProvider(
options=AWSClientProviderOptions(
profile=options.aws_profile,
region=options.aws_region,
endpoints=endpoints
)
)
# initialize locale
if Utils.is_not_empty(self._options.locale):
self._locale = self._options.locale
else:
self._locale = self.config().get_string('cluster.locale', EnvironmentUtils.get_environment_variable('LC_CTYPE', default='en_US'))
locale.init(self._locale)
# iam permission utils
if options.enable_iam_permission_util:
self._permission_util = IamPermissionUtil(context=self)
# ec2 instance metadata util
if options.enable_instance_metadata_util:
self._instance_metadata_util = InstanceMetadataUtil()
# aws util
if options.enable_aws_util:
self._aws_util = AWSUtil(context=self)
# distributed lock
if options.enable_distributed_lock:
self._distributed_lock = DistributedLock(context=self)
# leader election
if options.enable_leader_election:
self._leader_election = LeaderElection(context=self)
# metrics
if options.enable_metrics:
self._metrics_service = MetricsService(context=self, default_namespace=options.metrics_namespace)
except BaseException as e:
if self._distributed_lock is not None:
self._distributed_lock.stop()
if self._leader_election is not None:
self._leader_election.stop()
if self._metrics_service is not None:
self._metrics_service.stop()
if self._config is not None and isinstance(self._config, ClusterConfig):
self._config.db.stop()
raise e