public static ShellResult configure()

in bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/kafka/KafkaSetup.java [44:95]


    public static ShellResult configure(Params params) {
        log.info("Configuring Kafka");
        KafkaParams kafkaParams = (KafkaParams) params;

        String confDir = kafkaParams.confDir();
        String kafkaUser = kafkaParams.user();
        String kafkaGroup = kafkaParams.group();

        LinuxFileUtils.createDirectories(kafkaParams.getKafkaDataDir(), kafkaUser, kafkaGroup, PERMISSION_755, true);
        LinuxFileUtils.createDirectories(kafkaParams.getKafkaLogDir(), kafkaUser, kafkaGroup, PERMISSION_755, true);
        LinuxFileUtils.createDirectories(kafkaParams.getKafkaPidDir(), kafkaUser, kafkaGroup, PERMISSION_755, true);

        List<String> zookeeperServerHosts = LocalSettings.hosts("zookeeper_server");
        Map<String, Object> paramMap = new HashMap<>();
        paramMap.put("zk_server_list", zookeeperServerHosts);
        paramMap.put("host", kafkaParams.hostname());
        LinuxFileUtils.toFile(
                ConfigType.PROPERTIES,
                MessageFormat.format("{0}/server.properties", confDir),
                kafkaUser,
                kafkaGroup,
                PERMISSION_644,
                kafkaParams.kafkaBroker(),
                paramMap);

        LinuxFileUtils.toFileByTemplate(
                kafkaParams.getKafkaEnvContent(),
                MessageFormat.format("{0}/kafka-env.sh", confDir),
                kafkaUser,
                kafkaGroup,
                PERMISSION_644,
                kafkaParams.getGlobalParamsMap());

        LinuxFileUtils.toFileByTemplate(
                kafkaParams.getKafkaLog4jContent(),
                MessageFormat.format("{0}/log4j.properties", confDir),
                kafkaUser,
                kafkaGroup,
                PERMISSION_644,
                kafkaParams.getGlobalParamsMap());

        LinuxFileUtils.toFileByTemplate(
                kafkaParams.kafkaLimits(),
                MessageFormat.format("{0}/kafka.conf", KafkaParams.LIMITS_CONF_DIR),
                ROOT_USER,
                ROOT_USER,
                PERMISSION_644,
                kafkaParams.getGlobalParamsMap());

        log.info("Successfully configured Kafka");
        return ShellResult.success();
    }