in ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/io/BigQuery.java [210:241]
protected CompletableFuture<Void> close() {
List<String> sourceUris = sourceBlobIds.stream().map(BlobIdToString::apply)
.collect(Collectors.toList());
boolean loadSuccess = false;
try {
JobStatus status = bigQuery
.create(JobInfo.of(LoadJobConfiguration.newBuilder(tableId, sourceUris)
.setCreateDisposition(JobInfo.CreateDisposition.CREATE_NEVER)
.setWriteDisposition(JobInfo.WriteDisposition.WRITE_APPEND)
.setFormatOptions(FormatOptions.json()).setIgnoreUnknownValues(true)
.setAutodetect(false).setMaxBadRecords(0).build()))
.waitFor().getStatus();
if (status.getError() != null) {
throw new BigQueryErrors(ImmutableList.of(status.getError()));
} else if (status.getExecutionErrors() != null
&& status.getExecutionErrors().size() > 0) {
throw new BigQueryErrors(status.getExecutionErrors());
}
loadSuccess = true;
return CompletableFuture.completedFuture(null);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
if (delete == Delete.always || (delete == Delete.onSuccess && loadSuccess)) {
try {
storage.delete(sourceBlobIds);
} catch (RuntimeException ignore2) {
// don't fail batch when delete throws
}
}
}
}