in sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java [622:913]
long flush(
RetryManager<AppendRowsResponse, AppendRowsContext> retryManager,
OutputReceiver<BigQueryStorageApiInsertError> failedRowsReceiver,
@Nullable OutputReceiver<TableRow> successfulRowsReceiver)
throws Exception {
if (pendingMessages.isEmpty()) {
return 0;
}
final ProtoRows.Builder insertsBuilder = ProtoRows.newBuilder();
insertsBuilder.addAllSerializedRows(pendingMessages);
pendingMessages.clear();
final ProtoRows inserts = insertsBuilder.build();
List<org.joda.time.Instant> insertTimestamps = pendingTimestamps;
List<@Nullable TableRow> failsafeTableRows = pendingFailsafeTableRows;
pendingTimestamps = Lists.newArrayList();
pendingFailsafeTableRows = Lists.newArrayList();
// Handle the case where the request is too large.
if (inserts.getSerializedSize() >= maxRequestSize) {
if (inserts.getSerializedRowsCount() > 1) {
// TODO(reuvenlax): Is it worth trying to handle this case by splitting the protoRows?
// Given that we split
// the ProtoRows iterable at 2MB and the max request size is 10MB, this scenario seems
// nearly impossible.
LOG.error(
"A request containing more than one row is over the request size limit of {}. "
+ "This is unexpected. All rows in the request will be sent to the failed-rows PCollection.",
maxRequestSize);
}
for (int i = 0; i < inserts.getSerializedRowsCount(); ++i) {
@Nullable TableRow failedRow = failsafeTableRows.get(i);
if (failedRow == null) {
ByteString rowBytes = inserts.getSerializedRows(i);
failedRow =
TableRowToStorageApiProto.tableRowFromMessage(
DynamicMessage.parseFrom(
TableRowToStorageApiProto.wrapDescriptorProto(
getAppendClientInfo(true, null).getDescriptor()),
rowBytes),
true,
successfulRowsPredicate);
}
org.joda.time.Instant timestamp = insertTimestamps.get(i);
failedRowsReceiver.outputWithTimestamp(
new BigQueryStorageApiInsertError(
failedRow, "Row payload too large. Maximum size " + maxRequestSize),
timestamp);
}
int numRowsFailed = inserts.getSerializedRowsCount();
BigQuerySinkMetrics.appendRowsRowStatusCounter(
BigQuerySinkMetrics.RowStatus.FAILED,
BigQuerySinkMetrics.PAYLOAD_TOO_LARGE,
shortTableUrn)
.inc(numRowsFailed);
rowsSentToFailedRowsCollection.inc(numRowsFailed);
return 0;
}
long offset = -1;
if (!this.useDefaultStream) {
getOrCreateStreamName(); // Force creation of the stream before we get offsets.
offset = this.currentOffset;
this.currentOffset += inserts.getSerializedRowsCount();
}
AppendRowsContext appendRowsContext =
new AppendRowsContext(offset, inserts, insertTimestamps, failsafeTableRows);
retryManager.addOperation(
c -> {
if (c.protoRows.getSerializedRowsCount() == 0) {
// This might happen if all rows in a batch failed and were sent to the failed-rows
// PCollection.
return ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().build());
}
try {
StreamAppendClient writeStream =
Preconditions.checkStateNotNull(
getAppendClientInfo(true, null).getStreamAppendClient());
ApiFuture<AppendRowsResponse> response =
writeStream.appendRows(c.offset, c.protoRows);
inflightWaitSecondsDistribution.update(writeStream.getInflightWaitSeconds());
if (!usingMultiplexing) {
if (writeStream.getInflightWaitSeconds() > 5) {
LOG.warn(
"Storage Api write delay more than {} seconds.",
writeStream.getInflightWaitSeconds());
}
}
return response;
} catch (Exception e) {
throw new RuntimeException(e);
}
},
contexts -> {
AppendRowsContext failedContext =
Preconditions.checkStateNotNull(Iterables.getFirst(contexts, null));
BigQuerySinkMetrics.reportFailedRPCMetrics(
failedContext, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, shortTableUrn);
String errorCode =
BigQuerySinkMetrics.throwableToGRPCCodeString(failedContext.getError());
if (failedContext.getError() != null
&& failedContext.getError() instanceof Exceptions.AppendSerializationError) {
Exceptions.AppendSerializationError error =
Preconditions.checkStateNotNull(
(Exceptions.AppendSerializationError) failedContext.getError());
Set<Integer> failedRowIndices = error.getRowIndexToErrorMessage().keySet();
for (int failedIndex : failedRowIndices) {
// Convert the message to a TableRow and send it to the failedRows collection.
BigQueryStorageApiInsertError element = null;
org.joda.time.Instant timestamp = failedContext.timestamps.get(failedIndex);
try {
TableRow failedRow = failedContext.failsafeTableRows.get(failedIndex);
if (failedRow == null) {
ByteString protoBytes =
failedContext.protoRows.getSerializedRows(failedIndex);
failedRow =
TableRowToStorageApiProto.tableRowFromMessage(
DynamicMessage.parseFrom(
TableRowToStorageApiProto.wrapDescriptorProto(
Preconditions.checkStateNotNull(appendClientInfo)
.getDescriptor()),
protoBytes),
true,
Predicates.alwaysTrue());
}
element =
new BigQueryStorageApiInsertError(
failedRow, error.getRowIndexToErrorMessage().get(failedIndex));
} catch (Exception e) {
LOG.error("Failed to insert row and could not parse the result!", e);
}
// output outside try {} clause to avoid suppress downstream Exception
if (element != null) {
failedRowsReceiver.outputWithTimestamp(element, timestamp);
}
}
int numRowsFailed = failedRowIndices.size();
rowsSentToFailedRowsCollection.inc(numRowsFailed);
BigQuerySinkMetrics.appendRowsRowStatusCounter(
BigQuerySinkMetrics.RowStatus.FAILED, errorCode, shortTableUrn)
.inc(numRowsFailed);
// Remove the failed row from the payload, so we retry the batch without the failed
// rows.
ProtoRows.Builder retryRows = ProtoRows.newBuilder();
List<org.joda.time.Instant> retryTimestamps = Lists.newArrayList();
for (int i = 0; i < failedContext.protoRows.getSerializedRowsCount(); ++i) {
if (!failedRowIndices.contains(i)) {
ByteString rowBytes = failedContext.protoRows.getSerializedRows(i);
retryRows.addSerializedRows(rowBytes);
retryTimestamps.add(failedContext.timestamps.get(i));
}
}
failedContext.protoRows = retryRows.build();
failedContext.timestamps = retryTimestamps;
int numRowsRetried = failedContext.protoRows.getSerializedRowsCount();
BigQuerySinkMetrics.appendRowsRowStatusCounter(
BigQuerySinkMetrics.RowStatus.RETRIED, errorCode, shortTableUrn)
.inc(numRowsRetried);
// Since we removed rows, we need to update the insert offsets for all remaining
// rows.
long newOffset = failedContext.offset;
for (AppendRowsContext context : contexts) {
context.offset = newOffset;
newOffset += context.protoRows.getSerializedRowsCount();
}
this.currentOffset = newOffset;
return RetryType.RETRY_ALL_OPERATIONS;
}
LOG.warn(
"Append to stream {} by client #{} failed with error, operations will be retried.\n{}",
streamName,
clientNumber,
retrieveErrorDetails(contexts));
failedContext.failureCount += 1;
boolean quotaError = false;
Throwable error = failedContext.getError();
Status.Code statusCode = Status.Code.OK;
if (error != null) {
statusCode = Status.fromThrowable(error).getCode();
quotaError = statusCode.equals(Status.Code.RESOURCE_EXHAUSTED);
}
int allowedRetry;
if (!quotaError) {
// This forces us to close and reopen all gRPC connections to Storage API on error,
// which empirically fixes random stuckness issues.
invalidateWriteStream();
allowedRetry = 5;
} else {
allowedRetry = 35;
}
// Maximum number of times we retry before we fail the work item.
if (failedContext.failureCount > allowedRetry) {
throw new RuntimeException(
String.format(
"More than %d attempts to call AppendRows failed.", allowedRetry));
}
// The following errors are known to be persistent, so always fail the work item in
// this case.
if (statusCode.equals(Status.Code.OUT_OF_RANGE)
|| statusCode.equals(Status.Code.ALREADY_EXISTS)) {
throw new RuntimeException(
"Append to stream "
+ this.streamName
+ " failed with invalid "
+ "offset of "
+ failedContext.offset);
}
boolean hasPersistentErrors =
failedContext.getError() instanceof Exceptions.StreamFinalizedException
|| statusCode.equals(Status.Code.INVALID_ARGUMENT)
|| statusCode.equals(Status.Code.NOT_FOUND)
|| statusCode.equals(Status.Code.FAILED_PRECONDITION);
if (hasPersistentErrors) {
throw new RuntimeException(
String.format(
"Append to stream %s failed with Status Code %s. The stream may not exist.",
this.streamName, statusCode),
error);
}
// TODO: Only do this on explicit NOT_FOUND errors once BigQuery reliably produces
// them.
try {
tryCreateTable.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
int numRowsRetried = failedContext.protoRows.getSerializedRowsCount();
BigQuerySinkMetrics.appendRowsRowStatusCounter(
BigQuerySinkMetrics.RowStatus.RETRIED, errorCode, shortTableUrn)
.inc(numRowsRetried);
appendFailures.inc();
return RetryType.RETRY_ALL_OPERATIONS;
},
c -> {
int numRecordsAppended = c.protoRows.getSerializedRowsCount();
recordsAppended.inc(numRecordsAppended);
BigQuerySinkMetrics.appendRowsRowStatusCounter(
BigQuerySinkMetrics.RowStatus.SUCCESSFUL,
BigQuerySinkMetrics.OK,
shortTableUrn)
.inc(numRecordsAppended);
BigQuerySinkMetrics.reportSuccessfulRpcMetrics(
c, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, shortTableUrn);
if (successfulRowsReceiver != null) {
Descriptor descriptor = null;
try {
descriptor =
TableRowToStorageApiProto.wrapDescriptorProto(
Preconditions.checkStateNotNull(appendClientInfo).getDescriptor());
} catch (DescriptorValidationException e) {
LOG.warn(
"Failure getting proto descriptor. Successful output will not be produced.",
e);
}
if (descriptor != null) {
for (int i = 0; i < c.protoRows.getSerializedRowsCount(); ++i) {
ByteString rowBytes = c.protoRows.getSerializedRowsList().get(i);
try {
TableRow row =
TableRowToStorageApiProto.tableRowFromMessage(
DynamicMessage.parseFrom(descriptor, rowBytes),
true,
successfulRowsPredicate);
org.joda.time.Instant timestamp = c.timestamps.get(i);
successfulRowsReceiver.outputWithTimestamp(row, timestamp);
} catch (Exception e) {
LOG.warn("Failure parsing TableRow", e);
}
}
}
}
},
appendRowsContext);
maybeTickleCache();
return inserts.getSerializedRowsCount();
}