in ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/config/SinkConfig.java [320:385]
Output getOutput(Env env, Env commonEnv, Executor executor) {
final com.google.cloud.bigquery.BigQuery bigQuery = getBigQueryService(env);
final Storage storage = getGcsService(env);
final Function<Blob, CompletableFuture<Void>> bigQueryLoad;
if (env.containsKey(OUTPUT_TOPIC)) {
// BigQuery Load API limits maximum load requests per table per day to 1,000 so if
// OUTPUT_TOPIC is present send blobInfo to pubsub and run load jobs separately
final Function<PubsubMessage, CompletableFuture<Void>> pubsubOutput = pubsub
.getOutput(env, commonEnv, executor);
bigQueryLoad = blob -> pubsubOutput.apply(BlobIdToPubsubMessage.encode(blob.getBlobId()));
} else {
bigQueryLoad = new BigQuery.Load(bigQuery, storage,
env.getLong(LOAD_MAX_BYTES, DEFAULT_LOAD_MAX_BYTES),
env.getInt(LOAD_MAX_FILES, DEFAULT_LOAD_MAX_FILES),
env.getDuration(LOAD_MAX_DELAY, DEFAULT_STREAMING_LOAD_MAX_DELAY), executor,
getEnableOpenCensus(commonEnv),
// files will be recreated if not successfully loaded
BigQuery.Load.Delete.always);
}
// Combine bigQueryFiles and bigQueryLoad without an intermediate PubSub topic
final Function<PubsubMessage, CompletableFuture<Void>> fileOutput = new Gcs.Write.Ndjson(
storage, env.getLong(BATCH_MAX_BYTES, DEFAULT_BATCH_MAX_BYTES),
env.getInt(BATCH_MAX_MESSAGES, DEFAULT_BATCH_MAX_MESSAGES),
env.getDuration(BATCH_MAX_DELAY, DEFAULT_BATCH_MAX_DELAY),
PubsubMessageToTemplatedString.forBigQuery(getBigQueryOutputBucket(env)), executor,
getEnableOpenCensus(commonEnv), getFormat(env, commonEnv), bigQueryLoad);
// Like bigQueryStreaming, but use STREAMING_ prefix env vars for batch configuration
final Function<PubsubMessage, CompletableFuture<Void>> streamingOutput = new BigQuery.Write(
bigQuery, env.getLong(STREAMING_BATCH_MAX_BYTES, DEFAULT_STREAMING_BATCH_MAX_BYTES),
env.getInt(STREAMING_BATCH_MAX_MESSAGES, DEFAULT_STREAMING_BATCH_MAX_MESSAGES),
env.getDuration(STREAMING_BATCH_MAX_DELAY, DEFAULT_STREAMING_BATCH_MAX_DELAY),
PubsubMessageToTemplatedString.forBigQuery(env.getString(OUTPUT_TABLE)), executor,
getEnableOpenCensus(commonEnv), getFormat(env, commonEnv));
// fallbackOutput sends messages to fileOutput when rejected by streamingOutput due to size
Function<PubsubMessage, CompletableFuture<Void>> fallbackOutput = message -> streamingOutput
.apply(message).thenApply(CompletableFuture::completedFuture).exceptionally(t -> {
if (t.getCause() instanceof BigQueryErrors) {
BigQueryErrors cause = (BigQueryErrors) t.getCause();
if (cause.errors.size() == 1 && cause.errors.get(0).getMessage()
.startsWith("Maximum allowed row size exceeded")) {
return fileOutput.apply(message);
}
} else if (t.getCause() instanceof BigQueryException && t.getCause().getMessage()
.startsWith("Request payload size exceeds the limit")) {
// t.getCause() was not a BatchException, so this message exceeded the
// request payload size limit when sent individually.
return fileOutput.apply(message);
}
throw (RuntimeException) t;
}).thenCompose(v -> v);
// Send messages not matched by STREAMING_DOCTYPES directly to fileOutput
final Function<PubsubMessage, CompletableFuture<Void>> mixedOutput;
if (env.containsKey(STREAMING_DOCTYPES)) {
Predicate<PubsubMessage> streamingDoctypes = DocumentTypePredicate
.of(env.getPattern(STREAMING_DOCTYPES));
mixedOutput = message -> {
if (streamingDoctypes.test(message)) {
return fallbackOutput.apply(message);
}
return fileOutput.apply(message);
};
} else {
mixedOutput = fallbackOutput;
}
return new Output(this, mixedOutput, commonEnv);
}