Output getOutput()

in ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/config/SinkConfig.java [320:385]


      Output getOutput(Env env, Env commonEnv, Executor executor) {
        final com.google.cloud.bigquery.BigQuery bigQuery = getBigQueryService(env);
        final Storage storage = getGcsService(env);
        final Function<Blob, CompletableFuture<Void>> bigQueryLoad;
        if (env.containsKey(OUTPUT_TOPIC)) {
          // BigQuery Load API limits maximum load requests per table per day to 1,000 so if
          // OUTPUT_TOPIC is present send blobInfo to pubsub and run load jobs separately
          final Function<PubsubMessage, CompletableFuture<Void>> pubsubOutput = pubsub
              .getOutput(env, commonEnv, executor);
          bigQueryLoad = blob -> pubsubOutput.apply(BlobIdToPubsubMessage.encode(blob.getBlobId()));
        } else {
          bigQueryLoad = new BigQuery.Load(bigQuery, storage,
              env.getLong(LOAD_MAX_BYTES, DEFAULT_LOAD_MAX_BYTES),
              env.getInt(LOAD_MAX_FILES, DEFAULT_LOAD_MAX_FILES),
              env.getDuration(LOAD_MAX_DELAY, DEFAULT_STREAMING_LOAD_MAX_DELAY), executor,
              getEnableOpenCensus(commonEnv),
              // files will be recreated if not successfully loaded
              BigQuery.Load.Delete.always);
        }
        // Combine bigQueryFiles and bigQueryLoad without an intermediate PubSub topic
        final Function<PubsubMessage, CompletableFuture<Void>> fileOutput = new Gcs.Write.Ndjson(
            storage, env.getLong(BATCH_MAX_BYTES, DEFAULT_BATCH_MAX_BYTES),
            env.getInt(BATCH_MAX_MESSAGES, DEFAULT_BATCH_MAX_MESSAGES),
            env.getDuration(BATCH_MAX_DELAY, DEFAULT_BATCH_MAX_DELAY),
            PubsubMessageToTemplatedString.forBigQuery(getBigQueryOutputBucket(env)), executor,
            getEnableOpenCensus(commonEnv), getFormat(env, commonEnv), bigQueryLoad);
        // Like bigQueryStreaming, but use STREAMING_ prefix env vars for batch configuration
        final Function<PubsubMessage, CompletableFuture<Void>> streamingOutput = new BigQuery.Write(
            bigQuery, env.getLong(STREAMING_BATCH_MAX_BYTES, DEFAULT_STREAMING_BATCH_MAX_BYTES),
            env.getInt(STREAMING_BATCH_MAX_MESSAGES, DEFAULT_STREAMING_BATCH_MAX_MESSAGES),
            env.getDuration(STREAMING_BATCH_MAX_DELAY, DEFAULT_STREAMING_BATCH_MAX_DELAY),
            PubsubMessageToTemplatedString.forBigQuery(env.getString(OUTPUT_TABLE)), executor,
            getEnableOpenCensus(commonEnv), getFormat(env, commonEnv));
        // fallbackOutput sends messages to fileOutput when rejected by streamingOutput due to size
        Function<PubsubMessage, CompletableFuture<Void>> fallbackOutput = message -> streamingOutput
            .apply(message).thenApply(CompletableFuture::completedFuture).exceptionally(t -> {
              if (t.getCause() instanceof BigQueryErrors) {
                BigQueryErrors cause = (BigQueryErrors) t.getCause();
                if (cause.errors.size() == 1 && cause.errors.get(0).getMessage()
                    .startsWith("Maximum allowed row size exceeded")) {
                  return fileOutput.apply(message);
                }
              } else if (t.getCause() instanceof BigQueryException && t.getCause().getMessage()
                  .startsWith("Request payload size exceeds the limit")) {
                // t.getCause() was not a BatchException, so this message exceeded the
                // request payload size limit when sent individually.
                return fileOutput.apply(message);
              }
              throw (RuntimeException) t;
            }).thenCompose(v -> v);
        // Send messages not matched by STREAMING_DOCTYPES directly to fileOutput
        final Function<PubsubMessage, CompletableFuture<Void>> mixedOutput;
        if (env.containsKey(STREAMING_DOCTYPES)) {
          Predicate<PubsubMessage> streamingDoctypes = DocumentTypePredicate
              .of(env.getPattern(STREAMING_DOCTYPES));
          mixedOutput = message -> {
            if (streamingDoctypes.test(message)) {
              return fallbackOutput.apply(message);
            }
            return fileOutput.apply(message);
          };
        } else {
          mixedOutput = fallbackOutput;
        }
        return new Output(this, mixedOutput, commonEnv);
      }