in ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/config/SinkConfig.java [500:532]
public static Output getOutput() {
final Env commonEnv = new Env(COMMON_ENV_VARS);
final Env outputEnv = new Env(OUTPUT_ENV_VARS);
final Env errorEnv = new Env(OUTPUT_ENV_VARS, "ERROR_");
// Executor to use for CompletableFutures in outputs instead of ForkJoinPool.commonPool(),
// because the default parallelism is 1 in stage and prod. Parallelism should be more than one
// per batch key (i.e. output table), because this executor is used for batch close operations
// that may be slow synchronous IO. As of 2020-04-25 there are 89 tables for telemetry and 118
// tables for structured.
final Executor executor = new ForkJoinPool(outputEnv.getInt(OUTPUT_PARALLELISM, 150));
final Output output = OutputType.get(outputEnv).getOutput(outputEnv, commonEnv, executor);
// Handle error output if configured.
final Output withErrors;
if (outputEnv.containsKey(OUTPUT_MAX_ATTEMPTS) || !errorEnv.isEmpty()) {
OutputType errorOutputType = OutputType.get(errorEnv);
if (errorOutputType.validErrorOutput()) {
withErrors = output.errorsVia(
errorOutputType.getOutput(errorEnv, commonEnv, executor).write,
outputEnv.getInt(OUTPUT_MAX_ATTEMPTS, DEFAULT_OUTPUT_MAX_ATTEMPTS));
} else {
throw new IllegalArgumentException(
"Invalid error output type detected: " + errorOutputType.name());
}
} else {
withErrors = output;
}
// defer checking commonEnv to getInput in case it's used there
outputEnv.requireAllVarsUsed();
errorEnv.requireAllVarsUsed();
return withErrors;
}