in bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/kafka/KafkaSetup.java [44:95]
public static ShellResult configure(Params params) {
log.info("Configuring Kafka");
KafkaParams kafkaParams = (KafkaParams) params;
String confDir = kafkaParams.confDir();
String kafkaUser = kafkaParams.user();
String kafkaGroup = kafkaParams.group();
LinuxFileUtils.createDirectories(kafkaParams.getKafkaDataDir(), kafkaUser, kafkaGroup, PERMISSION_755, true);
LinuxFileUtils.createDirectories(kafkaParams.getKafkaLogDir(), kafkaUser, kafkaGroup, PERMISSION_755, true);
LinuxFileUtils.createDirectories(kafkaParams.getKafkaPidDir(), kafkaUser, kafkaGroup, PERMISSION_755, true);
List<String> zookeeperServerHosts = LocalSettings.hosts("zookeeper_server");
Map<String, Object> paramMap = new HashMap<>();
paramMap.put("zk_server_list", zookeeperServerHosts);
paramMap.put("host", kafkaParams.hostname());
LinuxFileUtils.toFile(
ConfigType.PROPERTIES,
MessageFormat.format("{0}/server.properties", confDir),
kafkaUser,
kafkaGroup,
PERMISSION_644,
kafkaParams.kafkaBroker(),
paramMap);
LinuxFileUtils.toFileByTemplate(
kafkaParams.getKafkaEnvContent(),
MessageFormat.format("{0}/kafka-env.sh", confDir),
kafkaUser,
kafkaGroup,
PERMISSION_644,
kafkaParams.getGlobalParamsMap());
LinuxFileUtils.toFileByTemplate(
kafkaParams.getKafkaLog4jContent(),
MessageFormat.format("{0}/log4j.properties", confDir),
kafkaUser,
kafkaGroup,
PERMISSION_644,
kafkaParams.getGlobalParamsMap());
LinuxFileUtils.toFileByTemplate(
kafkaParams.kafkaLimits(),
MessageFormat.format("{0}/kafka.conf", KafkaParams.LIMITS_CONF_DIR),
ROOT_USER,
ROOT_USER,
PERMISSION_644,
kafkaParams.getGlobalParamsMap());
log.info("Successfully configured Kafka");
return ShellResult.success();
}