in java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/Daemon.java [430:492]
private void startChildProcess() throws IOException, InterruptedException {
List<String> args = new ArrayList<>(Arrays.asList(pathToExecutable, "-o", outPipe.getAbsolutePath(), "-i",
inPipe.getAbsolutePath(), "-c", protobufToHex(config.toProtobufMessage()), "-k",
protobufToHex(makeSetCredentialsMessage(config.getCredentialsProvider(), false)), "-t"));
AWSCredentialsProvider metricsCreds = config.getMetricsCredentialsProvider();
if (metricsCreds == null) {
metricsCreds = config.getCredentialsProvider();
}
args.add("-w");
args.add(protobufToHex(makeSetCredentialsMessage(metricsCreds, true)));
args.add("-l");
args.add(config.getLogLevel());
log.debug("Starting Native Process: {}", StringUtils.join(args, " "));
final ProcessBuilder pb = new ProcessBuilder(args);
for (Entry<String, String> e : environmentVariables.entrySet()) {
pb.environment().put(e.getKey(), e.getValue());
}
executor.execute(new Runnable() {
@Override
public void run() {
try {
connectToChild();
startLoops();
} catch (IOException e) {
fatalError("Unexpected error connecting to child process", e, false);
}
}
});
try {
process = pb.start();
} catch (Exception e) {
fatalError("Error starting child process", e, false);
}
stdOutReader = new LogInputStreamReader(process.getInputStream(), "StdOut", new LogInputStreamReader.DefaultLoggingFunction() {
@Override
public void apply(Logger logger, String message) {
logger.info(message);
}
});
stdErrReader = new LogInputStreamReader(process.getErrorStream(), "StdErr", new LogInputStreamReader.DefaultLoggingFunction() {
@Override
public void apply(Logger logger, String message) {
logger.warn(message);
}
});
executor.execute(stdOutReader);
executor.execute(stdErrReader);
try {
int code = process.waitFor();
fatalError("Child process exited with code " + code, code != 1);
} finally {
stdOutReader.shutdown();
stdErrReader.shutdown();
deletePipes();
}
}