in ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/config/SinkConfig.java [535:587]
public static Input getInput(Output output) throws IOException {
final Env env = new Env(INPUT_ENV_VARS);
final DecompressPayload decompress = getInputCompression(env);
final Input input;
if (env.containsKey(INPUT_PIPE)) {
final String inputPipe = env.getString(INPUT_PIPE);
final InputStream pipe;
switch (inputPipe) {
case "-":
case "0":
case "in":
case "stdin":
case "/dev/stdin":
pipe = System.in;
break;
default:
pipe = Files.newInputStream(Paths.get(inputPipe));
}
input = Pipe.Read.of(pipe, output.write, decompress);
} else {
final String subscription = env.getString(INPUT_SUBSCRIPTION);
final long messagesOutstanding = output.type.getMaxOutstandingElementCount(env);
final long bytesOutstanding = output.type.getMaxOutstandingRequestBytes(env);
// Pub/Sub Lite subscriptions specify a zone, otherwise it is a standard Pub/Sub subscription
if (subscription.matches(".*/locations/.*")) {
input = new PubsubLite.Read(subscription, messagesOutstanding, bytesOutstanding,
output.write, builder -> builder, decompress);
} else {
input = new Pubsub.Read(subscription, output, builder -> builder
.setFlowControlSettings(
FlowControlSettings.newBuilder().setMaxOutstandingElementCount(messagesOutstanding)
.setMaxOutstandingRequestBytes(bytesOutstanding).build())
// The number of streaming subscriber connections for reading from Pub/Sub.
// https://github.com/googleapis/java-pubsub/blob/v1.105.0/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java#L141
// https://github.com/googleapis/java-pubsub/blob/v1.105.0/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java#L318-L320
// The default number of executor threads is max(6, 2*parallelPullCount).
// https://github.com/googleapis/java-pubsub/blob/v1.105.0/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java#L566-L568
// Subscriber connections are expected to be CPU bound until flow control thresholds are
// reached, so parallelism should be no less than the number of available processors.
.setParallelPullCount(
env.getInt(INPUT_PARALLELISM, Runtime.getRuntime().availableProcessors())),
decompress);
}
// Setup OpenCensus stackdriver exporter after all measurement views have been registered,
// as seen in https://opencensus.io/exporters/supported-exporters/java/stackdriver-stats/
if (getEnableOpenCensus(output.commonEnv)) {
StackdriverStatsExporter.createAndRegister();
}
}
env.requireAllVarsUsed();
output.commonEnv.requireAllVarsUsed();
return input;
}