public static Input getInput()

in ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/config/SinkConfig.java [535:587]


  public static Input getInput(Output output) throws IOException {
    final Env env = new Env(INPUT_ENV_VARS);
    final DecompressPayload decompress = getInputCompression(env);
    final Input input;
    if (env.containsKey(INPUT_PIPE)) {
      final String inputPipe = env.getString(INPUT_PIPE);
      final InputStream pipe;
      switch (inputPipe) {
        case "-":
        case "0":
        case "in":
        case "stdin":
        case "/dev/stdin":
          pipe = System.in;
          break;
        default:
          pipe = Files.newInputStream(Paths.get(inputPipe));
      }
      input = Pipe.Read.of(pipe, output.write, decompress);
    } else {
      final String subscription = env.getString(INPUT_SUBSCRIPTION);
      final long messagesOutstanding = output.type.getMaxOutstandingElementCount(env);
      final long bytesOutstanding = output.type.getMaxOutstandingRequestBytes(env);
      // Pub/Sub Lite subscriptions specify a zone, otherwise it is a standard Pub/Sub subscription
      if (subscription.matches(".*/locations/.*")) {
        input = new PubsubLite.Read(subscription, messagesOutstanding, bytesOutstanding,
            output.write, builder -> builder, decompress);
      } else {
        input = new Pubsub.Read(subscription, output, builder -> builder
            .setFlowControlSettings(
                FlowControlSettings.newBuilder().setMaxOutstandingElementCount(messagesOutstanding)
                    .setMaxOutstandingRequestBytes(bytesOutstanding).build())
            // The number of streaming subscriber connections for reading from Pub/Sub.
            // https://github.com/googleapis/java-pubsub/blob/v1.105.0/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java#L141
            // https://github.com/googleapis/java-pubsub/blob/v1.105.0/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java#L318-L320
            // The default number of executor threads is max(6, 2*parallelPullCount).
            // https://github.com/googleapis/java-pubsub/blob/v1.105.0/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java#L566-L568
            // Subscriber connections are expected to be CPU bound until flow control thresholds are
            // reached, so parallelism should be no less than the number of available processors.
            .setParallelPullCount(
                env.getInt(INPUT_PARALLELISM, Runtime.getRuntime().availableProcessors())),
            decompress);
      }
      // Setup OpenCensus stackdriver exporter after all measurement views have been registered,
      // as seen in https://opencensus.io/exporters/supported-exporters/java/stackdriver-stats/
      if (getEnableOpenCensus(output.commonEnv)) {
        StackdriverStatsExporter.createAndRegister();
      }
    }
    env.requireAllVarsUsed();
    output.commonEnv.requireAllVarsUsed();
    return input;
  }