public static Output getOutput()

in ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/config/SinkConfig.java [500:532]


  public static Output getOutput() {
    final Env commonEnv = new Env(COMMON_ENV_VARS);
    final Env outputEnv = new Env(OUTPUT_ENV_VARS);
    final Env errorEnv = new Env(OUTPUT_ENV_VARS, "ERROR_");
    // Executor to use for CompletableFutures in outputs instead of ForkJoinPool.commonPool(),
    // because the default parallelism is 1 in stage and prod. Parallelism should be more than one
    // per batch key (i.e. output table), because this executor is used for batch close operations
    // that may be slow synchronous IO. As of 2020-04-25 there are 89 tables for telemetry and 118
    // tables for structured.
    final Executor executor = new ForkJoinPool(outputEnv.getInt(OUTPUT_PARALLELISM, 150));
    final Output output = OutputType.get(outputEnv).getOutput(outputEnv, commonEnv, executor);

    // Handle error output if configured.
    final Output withErrors;
    if (outputEnv.containsKey(OUTPUT_MAX_ATTEMPTS) || !errorEnv.isEmpty()) {
      OutputType errorOutputType = OutputType.get(errorEnv);
      if (errorOutputType.validErrorOutput()) {
        withErrors = output.errorsVia(
            errorOutputType.getOutput(errorEnv, commonEnv, executor).write,
            outputEnv.getInt(OUTPUT_MAX_ATTEMPTS, DEFAULT_OUTPUT_MAX_ATTEMPTS));
      } else {
        throw new IllegalArgumentException(
            "Invalid error output type detected: " + errorOutputType.name());
      }
    } else {
      withErrors = output;
    }

    // defer checking commonEnv to getInput in case it's used there
    outputEnv.requireAllVarsUsed();
    errorEnv.requireAllVarsUsed();
    return withErrors;
  }