in sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java [1083:1318]
<T> long insertAll(
TableReference ref,
List<FailsafeValueInSingleWindow<TableRow, TableRow>> rowList,
@Nullable List<String> insertIdList,
BackOff backoff,
FluentBackoff rateLimitBackoffFactory,
final Sleeper sleeper,
InsertRetryPolicy retryPolicy,
List<ValueInSingleWindow<T>> failedInserts,
ErrorContainer<T> errorContainer,
boolean skipInvalidRows,
boolean ignoreUnknownValues,
boolean ignoreInsertIds,
List<ValueInSingleWindow<TableRow>> successfulRows)
throws IOException, InterruptedException {
checkNotNull(ref, "ref");
if (executor == null) {
this.executor =
new BoundedExecutorService(
MoreExecutors.listeningDecorator(options.as(GcsOptions.class).getExecutorService()),
options.as(BigQueryOptions.class).getInsertBundleParallelism());
}
if (insertIdList != null && rowList.size() != insertIdList.size()) {
throw new AssertionError(
"If insertIdList is not null it needs to have at least "
+ "as many elements as rowList");
}
StreamingInsertsMetrics streamingInsertsResults =
BigQuerySinkMetrics.streamingInsertsMetrics();
int numFailedRows = 0;
final Set<Integer> failedIndices = new HashSet<>();
long retTotalDataSize = 0;
List<TableDataInsertAllResponse.InsertErrors> allErrors = new ArrayList<>();
// These lists contain the rows to publish. Initially the contain the entire list.
// If there are failures, they will contain only the failed rows to be retried.
List<FailsafeValueInSingleWindow<TableRow, TableRow>> rowsToPublish = rowList;
List<String> idsToPublish = null;
if (!ignoreInsertIds) {
idsToPublish = insertIdList;
}
while (true) {
List<FailsafeValueInSingleWindow<TableRow, TableRow>> retryRows = new ArrayList<>();
List<String> retryIds = (idsToPublish != null) ? new ArrayList<>() : null;
int strideIndex = 0;
// Upload in batches.
List<TableDataInsertAllRequest.Rows> rows = new ArrayList<>();
long dataSize = 0L;
List<Future<List<TableDataInsertAllResponse.InsertErrors>>> futures = new ArrayList<>();
List<Integer> strideIndices = new ArrayList<>();
// Store the longest throttled time across all parallel threads
final AtomicLong maxThrottlingMsec = new AtomicLong();
int rowIndex = 0;
while (rowIndex < rowsToPublish.size()) {
TableRow row = rowsToPublish.get(rowIndex).getValue();
long nextRowSize = 0L;
try {
nextRowSize = TableRowJsonCoder.of().getEncodedElementByteSize(row);
} catch (Exception ex) {
throw new RuntimeException("Failed to convert the row to JSON", ex);
}
// The following scenario must be *extremely* rare.
// If this row's encoding by itself is larger than the maximum row payload, then it's
// impossible to insert into BigQuery, and so we send it out through the dead-letter
// queue.
if (nextRowSize >= MAX_BQ_ROW_PAYLOAD) {
InsertErrors error =
new InsertErrors()
.setErrors(ImmutableList.of(new ErrorProto().setReason("row-too-large")));
// We verify whether the retryPolicy parameter expects us to retry. If it does, then
// it will return true. Otherwise it will return false.
Boolean isRetry = retryPolicy.shouldRetry(new InsertRetryPolicy.Context(error));
if (isRetry) {
throw new RuntimeException(
String.format(
"We have observed a row that is %s bytes in size and exceeded BigQueryIO"
+ " limit of 9MB. While BigQuery supports request sizes up to 10MB,"
+ " BigQueryIO sets the limit at 9MB to leave room for request"
+ " overhead. You may change your retry strategy to unblock this"
+ " pipeline, and the row will be output as a failed insert.",
nextRowSize));
} else {
numFailedRows += 1;
errorContainer.add(failedInserts, error, ref, rowsToPublish.get(rowIndex));
failedIndices.add(rowIndex);
rowIndex++;
continue;
}
}
// If adding the next row will push the request above BQ row limits, or
// if the current batch of elements is larger than the targeted request size,
// we immediately go and issue the data insertion.
if (dataSize + nextRowSize >= MAX_BQ_ROW_PAYLOAD
|| dataSize >= maxRowBatchSize
|| rows.size() + 1 > maxRowsPerBatch) {
// If the row does not fit into the insert buffer, then we take the current buffer,
// issue the insert call, and we retry adding the same row to the troublesome buffer.
// Add a future to insert the current batch into BQ.
futures.add(
executor.submit(
new InsertBatchofRowsCallable(
ref,
skipInvalidRows,
ignoreUnknownValues,
client,
rateLimitBackoffFactory,
rows,
maxThrottlingMsec,
sleeper,
streamingInsertsResults)));
strideIndices.add(strideIndex);
retTotalDataSize += dataSize;
strideIndex = rowIndex;
rows = new ArrayList<>();
dataSize = 0L;
}
// If the row fits into the insert buffer, then we add it to the buffer to be inserted
// later, and we move onto the next row.
TableDataInsertAllRequest.Rows out = new TableDataInsertAllRequest.Rows();
if (idsToPublish != null) {
out.setInsertId(idsToPublish.get(rowIndex));
}
out.setJson(row.getUnknownKeys());
rows.add(out);
rowIndex++;
dataSize += nextRowSize;
}
if (rows.size() > 0) {
futures.add(
executor.submit(
new InsertBatchofRowsCallable(
ref,
skipInvalidRows,
ignoreUnknownValues,
client,
rateLimitBackoffFactory,
rows,
maxThrottlingMsec,
sleeper,
streamingInsertsResults)));
strideIndices.add(strideIndex);
retTotalDataSize += dataSize;
rows = new ArrayList<>();
}
try {
for (int i = 0; i < futures.size(); i++) {
List<TableDataInsertAllResponse.InsertErrors> errors = futures.get(i).get();
if (errors == null) {
continue;
}
for (TableDataInsertAllResponse.InsertErrors error : errors) {
if (error.getIndex() == null) {
throw new IOException("Insert failed: " + error + ", other errors: " + allErrors);
}
int errorIndex = error.getIndex().intValue() + strideIndices.get(i);
failedIndices.add(errorIndex);
if (retryPolicy.shouldRetry(new InsertRetryPolicy.Context(error))) {
allErrors.add(error);
retryRows.add(rowsToPublish.get(errorIndex));
// TODO (https://github.com/apache/beam/issues/20891): Select the retry rows(using
// errorIndex) from the batch of rows which attempted insertion in this call.
// Not the entire set of rows in rowsToPublish.
if (retryIds != null) {
retryIds.add(idsToPublish.get(errorIndex));
}
} else {
numFailedRows += 1;
errorContainer.add(failedInserts, error, ref, rowsToPublish.get(errorIndex));
}
}
}
// Accumulate the longest throttled time across all parallel threads
throttlingMsecs.inc(maxThrottlingMsec.get());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while inserting " + rowsToPublish);
} catch (ExecutionException e) {
streamingInsertsResults.updateStreamingInsertsMetrics(
ref, rowList.size(), rowList.size());
throw new RuntimeException(e.getCause());
}
if (allErrors.isEmpty()) {
break;
}
long nextBackoffMillis = backoff.nextBackOffMillis();
if (nextBackoffMillis == BackOff.STOP) {
break;
}
try {
sleeper.sleep(nextBackoffMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while waiting before retrying insert of " + retryRows);
}
rowsToPublish = retryRows;
idsToPublish = retryIds;
streamingInsertsResults.updateRetriedRowsWithStatus(
BigQuerySinkMetrics.INTERNAL, retryRows.size());
// print first 5 failures
int numErrorToLog = Math.min(allErrors.size(), 5);
LOG.info(
"Retrying {} failed inserts to BigQuery. First {} fails: {}",
rowsToPublish.size(),
numErrorToLog,
allErrors.subList(0, numErrorToLog));
allErrors.clear();
}
if (successfulRows != null) {
for (int i = 0; i < rowsToPublish.size(); i++) {
if (!failedIndices.contains(i)) {
successfulRows.add(
ValueInSingleWindow.of(
rowsToPublish.get(i).getValue(),
rowsToPublish.get(i).getTimestamp(),
rowsToPublish.get(i).getWindow(),
rowsToPublish.get(i).getPane()));
}
}
}
numFailedRows += allErrors.size();
streamingInsertsResults.updateStreamingInsertsMetrics(ref, rowList.size(), numFailedRows);
if (!allErrors.isEmpty()) {
throw new IOException("Insert failed: " + allErrors);
} else {
return retTotalDataSize;
}
}