decisionai_plugin/common/util/kafka_operator.py [16:37]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
DeadLetterTopicFormat = "{base_topic}-dl"


# get config info
def _get_endpoint_with_pattern(name):
    config_dir = os.environ['KENSHO2_CONFIG_DIR']
    endpoints = Configuration(config_dir + 'endpoints.ini')

    kvs = endpoints[name]
    if 'endpoint' in kvs:
        return kvs['endpoint']
    elif 'endpoint-pattern' in kvs:
        num_replicas = int(get_config_as_str(('%ssystem/%s/replicas' % (config_dir, name)).strip()))
        pattern = kvs['endpoint-pattern']
        val = ','.join(map(lambda x: pattern % (x), range(num_replicas)))
        print("kafka-endpoint: " + val)
        return val
    else:
        raise Exception('missing endpoint for %s' % (name))


KAFKA_BOOTSTRAP_SERVERS = _get_endpoint_with_pattern('kafka') if IS_INTERNAL else os.environ['KAFKA_ENDPOINT']
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



decisionai_plugin/common/util/kafka_operator_confluent.py [15:35]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
DeadLetterTopicFormat = "{base_topic}-dl"

# get config info
def _get_endpoint_with_pattern(name):

    config_dir = os.environ['KENSHO2_CONFIG_DIR']
    endpoints = Configuration(config_dir + 'endpoints.ini')

    kvs = endpoints[name]
    if 'endpoint' in kvs:
        return kvs['endpoint']
    elif 'endpoint-pattern' in kvs:
        num_replicas = int(get_config_as_str(('%ssystem/%s/replicas' % (config_dir, name)).strip()))
        pattern = kvs['endpoint-pattern']
        val = ','.join(map(lambda x: pattern % (x), range(num_replicas)))
        print("kafka-endpoint: " + val)
        return val 
    else:
        raise Exception('missing endpoint for %s' % (name))

KAFKA_BOOTSTRAP_SERVERS = _get_endpoint_with_pattern('kafka') if IS_INTERNAL else os.environ['KAFKA_ENDPOINT']
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



