in sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java [451:976]
public void process(
ProcessContext c,
final PipelineOptions pipelineOptions,
@Element KV<ShardedKey<DestinationT>, Iterable<StorageApiWritePayload>> element,
@Timestamp org.joda.time.Instant elementTs,
final @AlwaysFetched @StateId("streamName") ValueState<String> streamName,
final @AlwaysFetched @StateId("streamOffset") ValueState<Long> streamOffset,
final @StateId("updatedSchema") ValueState<TableSchema> updatedSchema,
@TimerId("idleTimer") Timer idleTimer,
final MultiOutputReceiver o)
throws Exception {
BigQueryOptions bigQueryOptions = pipelineOptions.as(BigQueryOptions.class);
if (autoUpdateSchema) {
updatedSchema.readLater();
}
dynamicDestinations.setSideInputAccessorFromProcessContext(c);
TableDestination tableDestination =
destinations.computeIfAbsent(
element.getKey().getKey(),
dest -> {
TableDestination tableDestination1 = dynamicDestinations.getTable(dest);
checkArgument(
tableDestination1 != null,
"DynamicDestinations.getTable() may not return null, "
+ "but %s returned null for destination %s",
dynamicDestinations,
dest);
return tableDestination1;
});
final String tableId = tableDestination.getTableUrn(bigQueryOptions);
final String shortTableId = tableDestination.getShortTableUrn();
final DatasetService datasetService = getDatasetService(pipelineOptions);
final WriteStreamService writeStreamService = getWriteStreamService(pipelineOptions);
Lineage.getSinks()
.add(
"bigquery",
BigQueryHelpers.dataCatalogSegments(
tableDestination.getTableReference(), bigQueryOptions));
Coder<DestinationT> destinationCoder = dynamicDestinations.getDestinationCoder();
Callable<Boolean> tryCreateTable =
() -> {
DestinationT dest = element.getKey().getKey();
CreateTableHelpers.possiblyCreateTable(
c.getPipelineOptions().as(BigQueryOptions.class),
tableDestination,
() -> dynamicDestinations.getSchema(dest),
() -> dynamicDestinations.getTableConstraints(dest),
createDisposition,
destinationCoder,
kmsKey,
bqServices,
bigLakeConfiguration);
return true;
};
Supplier<String> getOrCreateStream =
() ->
getOrCreateStream(
tableId, streamName, streamOffset, idleTimer, writeStreamService, tryCreateTable);
Callable<AppendClientInfo> getAppendClientInfo =
() -> {
@Nullable TableSchema tableSchema;
DescriptorProtos.DescriptorProto descriptor;
TableSchema updatedSchemaValue = updatedSchema.read();
if (autoUpdateSchema && updatedSchemaValue != null) {
// We've seen an updated schema, so we use that instead of querying the
// MessageConverter.
tableSchema = updatedSchemaValue;
descriptor =
TableRowToStorageApiProto.descriptorSchemaFromTableSchema(
tableSchema, true, false);
} else {
// Start off with the base schema. As we get notified of schema updates, we
// will update the descriptor.
StorageApiDynamicDestinations.MessageConverter<?> converter =
messageConverters.get(
element.getKey().getKey(), dynamicDestinations, datasetService);
tableSchema = converter.getTableSchema();
descriptor = converter.getDescriptor(false);
if (autoUpdateSchema) {
// A StreamWriter ignores table schema updates that happen prior to its creation.
// So before creating a StreamWriter below, we fetch the table schema to check if we
// missed an update.
// If so, use the new schema instead of the base schema
@Nullable
TableSchema streamSchema =
MoreObjects.firstNonNull(
writeStreamService.getWriteStreamSchema(getOrCreateStream.get()),
TableSchema.getDefaultInstance());
Optional<TableSchema> newSchema =
TableSchemaUpdateUtils.getUpdatedSchema(tableSchema, streamSchema);
if (newSchema.isPresent()) {
tableSchema = newSchema.get();
descriptor =
TableRowToStorageApiProto.descriptorSchemaFromTableSchema(
tableSchema, true, false);
updatedSchema.write(tableSchema);
}
}
}
AppendClientInfo info =
AppendClientInfo.of(
Preconditions.checkStateNotNull(tableSchema),
descriptor,
// Make sure that the client is always closed in a different thread
// to
// avoid blocking.
client ->
runAsyncIgnoreFailure(
closeWriterExecutor,
() -> {
// Remove the pin that is "owned" by the cache.
client.unpin();
client.close();
}))
.withAppendClient(
writeStreamService,
getOrCreateStream,
false,
defaultMissingValueInterpretation);
// This pin is "owned" by the cache.
Preconditions.checkStateNotNull(info.getStreamAppendClient()).pin();
return info;
};
AtomicReference<AppendClientInfo> appendClientInfo =
new AtomicReference<>(APPEND_CLIENTS.get(element.getKey(), getAppendClientInfo));
String currentStream = getOrCreateStream.get();
if (!currentStream.equals(appendClientInfo.get().getStreamName())) {
// Cached append client is inconsistent with persisted state. Throw away cached item and
// force it to be
// recreated.
APPEND_CLIENTS.invalidate(element.getKey());
appendClientInfo.set(APPEND_CLIENTS.get(element.getKey(), getAppendClientInfo));
}
TableSchema updatedSchemaValue = updatedSchema.read();
if (autoUpdateSchema && updatedSchemaValue != null) {
if (appendClientInfo.get().hasSchemaChanged(updatedSchemaValue)) {
appendClientInfo.set(
AppendClientInfo.of(
updatedSchemaValue, appendClientInfo.get().getCloseAppendClient(), false));
APPEND_CLIENTS.invalidate(element.getKey());
APPEND_CLIENTS.put(element.getKey(), appendClientInfo.get());
}
}
// Each ProtoRows object contains at most 1MB of rows.
// TODO: Push messageFromTableRow up to top level. That we we cans skip TableRow entirely if
// already proto or already schema.
Iterable<SplittingIterable.Value> messages =
new SplittingIterable(
element.getValue(),
splitSize,
(fields, ignore) -> appendClientInfo.get().encodeUnknownFields(fields, ignore),
bytes -> appendClientInfo.get().toTableRow(bytes, Predicates.alwaysTrue()),
(failedRow, errorMessage) -> {
o.get(failedRowsTag)
.outputWithTimestamp(
new BigQueryStorageApiInsertError(failedRow.getValue(), errorMessage),
failedRow.getTimestamp());
rowsSentToFailedRowsCollection.inc();
BigQuerySinkMetrics.appendRowsRowStatusCounter(
BigQuerySinkMetrics.RowStatus.FAILED,
BigQuerySinkMetrics.PAYLOAD_TOO_LARGE,
shortTableId)
.inc(1);
},
autoUpdateSchema,
ignoreUnknownValues,
elementTs);
// Initialize stream names and offsets for all contexts. This will be called initially, but
// will also be called if we roll over to a new stream on a retry.
BiConsumer<Iterable<AppendRowsContext>, Boolean> initializeContexts =
(contexts, isFailure) -> {
try {
if (isFailure) {
// Clear the stream name, forcing a new one to be created.
streamName.write("");
}
appendClientInfo.set(
appendClientInfo
.get()
.withAppendClient(
writeStreamService,
getOrCreateStream,
false,
defaultMissingValueInterpretation));
StreamAppendClient streamAppendClient =
Preconditions.checkArgumentNotNull(
appendClientInfo.get().getStreamAppendClient());
String streamNameRead = Preconditions.checkArgumentNotNull(streamName.read());
long currentOffset = Preconditions.checkArgumentNotNull(streamOffset.read());
for (AppendRowsContext context : contexts) {
context.streamName = streamNameRead;
streamAppendClient.pin();
context.client = appendClientInfo.get().getStreamAppendClient();
context.offset = currentOffset;
++context.tryIteration;
currentOffset = context.offset + context.protoRows.getSerializedRowsCount();
}
streamOffset.write(currentOffset);
} catch (Exception e) {
throw new RuntimeException(e);
}
};
Consumer<Iterable<AppendRowsContext>> clearClients =
contexts -> {
APPEND_CLIENTS.invalidate(element.getKey());
appendClientInfo.set(appendClientInfo.get().withNoAppendClient());
APPEND_CLIENTS.put(element.getKey(), appendClientInfo.get());
for (AppendRowsContext context : contexts) {
if (context.client != null) {
// Unpin in a different thread, as it may execute a blocking close.
runAsyncIgnoreFailure(closeWriterExecutor, context.client::unpin);
context.client = null;
}
}
};
Function<AppendRowsContext, ApiFuture<AppendRowsResponse>> runOperation =
context -> {
if (context.protoRows.getSerializedRowsCount() == 0) {
// This might happen if all rows in a batch failed and were sent to the failed-rows
// PCollection.
return ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().build());
}
try {
appendClientInfo.set(
appendClientInfo
.get()
.withAppendClient(
writeStreamService,
getOrCreateStream,
false,
defaultMissingValueInterpretation));
return Preconditions.checkStateNotNull(appendClientInfo.get().getStreamAppendClient())
.appendRows(context.offset, context.protoRows);
} catch (Exception e) {
throw new RuntimeException(e);
}
};
Function<Iterable<AppendRowsContext>, RetryType> onError =
failedContexts -> {
// The first context is always the one that fails.
AppendRowsContext failedContext =
Preconditions.checkStateNotNull(Iterables.getFirst(failedContexts, null));
BigQuerySinkMetrics.reportFailedRPCMetrics(
failedContext, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, shortTableId);
String errorCode =
BigQuerySinkMetrics.throwableToGRPCCodeString(failedContext.getError());
// AppendSerializationError means that BigQuery detected errors on individual rows, e.g.
// a row not conforming
// to bigQuery invariants. These errors are persistent, so we redirect those rows to the
// failedInserts
// PCollection, and retry with the remaining rows.
if (failedContext.getError() != null
&& failedContext.getError() instanceof Exceptions.AppendSerializationError) {
Exceptions.AppendSerializationError error =
Preconditions.checkArgumentNotNull(
(Exceptions.AppendSerializationError) failedContext.getError());
Set<Integer> failedRowIndices = error.getRowIndexToErrorMessage().keySet();
for (int failedIndex : failedRowIndices) {
// Convert the message to a TableRow and send it to the failedRows collection.
TableRow failedRow = failedContext.failsafeTableRows.get(failedIndex);
if (failedRow == null) {
ByteString protoBytes = failedContext.protoRows.getSerializedRows(failedIndex);
failedRow =
appendClientInfo.get().toTableRow(protoBytes, Predicates.alwaysTrue());
}
org.joda.time.Instant timestamp = failedContext.timestamps.get(failedIndex);
o.get(failedRowsTag)
.outputWithTimestamp(
new BigQueryStorageApiInsertError(
failedRow, error.getRowIndexToErrorMessage().get(failedIndex)),
timestamp);
}
int failedRows = failedRowIndices.size();
rowsSentToFailedRowsCollection.inc(failedRows);
BigQuerySinkMetrics.appendRowsRowStatusCounter(
BigQuerySinkMetrics.RowStatus.FAILED, errorCode, shortTableId)
.inc(failedRows);
// Remove the failed row from the payload, so we retry the batch without the failed
// rows.
ProtoRows.Builder retryRows = ProtoRows.newBuilder();
@Nullable List<org.joda.time.Instant> timestamps = Lists.newArrayList();
for (int i = 0; i < failedContext.protoRows.getSerializedRowsCount(); ++i) {
if (!failedRowIndices.contains(i)) {
ByteString rowBytes = failedContext.protoRows.getSerializedRows(i);
retryRows.addSerializedRows(rowBytes);
timestamps.add(failedContext.timestamps.get(i));
}
}
failedContext.protoRows = retryRows.build();
failedContext.timestamps = timestamps;
int retriedRows = failedContext.protoRows.getSerializedRowsCount();
BigQuerySinkMetrics.appendRowsRowStatusCounter(
BigQuerySinkMetrics.RowStatus.RETRIED, errorCode, shortTableId)
.inc(retriedRows);
// Since we removed rows, we need to update the insert offsets for all remaining rows.
long offset = failedContext.offset;
for (AppendRowsContext context : failedContexts) {
context.offset = offset;
offset += context.protoRows.getSerializedRowsCount();
}
streamOffset.write(offset);
return RetryType.RETRY_ALL_OPERATIONS;
}
Throwable error = Preconditions.checkStateNotNull(failedContext.getError());
Status.Code statusCode = Status.fromThrowable(error).getCode();
// This means that the offset we have stored does not match the current end of
// the stream in the Storage API. Usually this happens because a crash or a bundle
// failure
// happened after an append but before the worker could checkpoint it's
// state. The records that were appended in a failed bundle will be retried,
// meaning that the unflushed tail of the stream must be discarded to prevent
// duplicates.
boolean offsetMismatch =
statusCode.equals(Code.OUT_OF_RANGE) || statusCode.equals(Code.ALREADY_EXISTS);
boolean quotaError = statusCode.equals(Code.RESOURCE_EXHAUSTED);
if (!offsetMismatch) {
// Don't log errors for expected offset mismatch. These will be logged as warnings
// below.
LOG.error(
"Got error " + failedContext.getError() + " closing " + failedContext.streamName);
}
try {
// TODO: Only do this on explicit NOT_FOUND errors once BigQuery reliably produces
// them.
tryCreateTable.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
if (!quotaError) {
// This forces us to close and reopen all gRPC connections to Storage API on error,
// which empirically fixes random stuckness issues.
clearClients.accept(failedContexts);
}
appendFailures.inc();
int retriedRows = failedContext.protoRows.getSerializedRowsCount();
BigQuerySinkMetrics.appendRowsRowStatusCounter(
BigQuerySinkMetrics.RowStatus.RETRIED, errorCode, shortTableId)
.inc(retriedRows);
boolean explicitStreamFinalized =
failedContext.getError() instanceof StreamFinalizedException;
// This implies that the stream doesn't exist or has already been finalized. In this
// case we have no choice but to create a new stream.
boolean streamDoesNotExist =
explicitStreamFinalized
|| statusCode.equals(Code.INVALID_ARGUMENT)
|| statusCode.equals(Code.NOT_FOUND)
|| statusCode.equals(Code.FAILED_PRECONDITION);
if (offsetMismatch || streamDoesNotExist) {
appendOffsetFailures.inc();
LOG.warn(
"Append to "
+ failedContext
+ " failed with "
+ failedContext.getError()
+ " Will retry with a new stream");
// Finalize the stream and clear streamName so a new stream will be created.
o.get(flushTag)
.output(
KV.of(
failedContext.streamName, new Operation(failedContext.offset - 1, true)));
// Reinitialize all contexts with the new stream and new offsets.
initializeContexts.accept(failedContexts, true);
// Offset failures imply that all subsequent parallel appends will also fail.
// Retry them all.
return RetryType.RETRY_ALL_OPERATIONS;
}
return RetryType.RETRY_ALL_OPERATIONS;
};
Consumer<AppendRowsContext> onSuccess =
context -> {
AppendRowsResponse response = Preconditions.checkStateNotNull(context.getResult());
o.get(flushTag)
.output(
KV.of(
context.streamName,
new Operation(
context.offset + context.protoRows.getSerializedRowsCount() - 1,
false)));
int flushedRows = context.protoRows.getSerializedRowsCount();
flushesScheduled.inc(flushedRows);
BigQuerySinkMetrics.reportSuccessfulRpcMetrics(
context, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, shortTableId);
BigQuerySinkMetrics.appendRowsRowStatusCounter(
BigQuerySinkMetrics.RowStatus.SUCCESSFUL, BigQuerySinkMetrics.OK, shortTableId)
.inc(flushedRows);
if (successfulRowsTag != null) {
for (int i = 0; i < context.protoRows.getSerializedRowsCount(); ++i) {
ByteString protoBytes = context.protoRows.getSerializedRows(i);
org.joda.time.Instant timestamp = context.timestamps.get(i);
o.get(successfulRowsTag)
.outputWithTimestamp(
appendClientInfo.get().toTableRow(protoBytes, successfulRowsPredicate),
timestamp);
}
}
};
Instant now = Instant.now();
List<AppendRowsContext> contexts = Lists.newArrayList();
RetryManager<AppendRowsResponse, AppendRowsContext> retryManager =
new RetryManager<>(
Duration.standardSeconds(1),
Duration.standardSeconds(20),
maxRetries,
BigQuerySinkMetrics.throttledTimeCounter(BigQuerySinkMetrics.RpcMethod.APPEND_ROWS));
int numAppends = 0;
for (SplittingIterable.Value splitValue : messages) {
// Handle the case of a row that is too large.
if (splitValue.getProtoRows().getSerializedSize() >= maxRequestSize) {
if (splitValue.getProtoRows().getSerializedRowsCount() > 1) {
// TODO(reuvenlax): Is it worth trying to handle this case by splitting the protoRows?
// Given that we split
// the ProtoRows iterable at 2MB and the max request size is 10MB, this scenario seems
// nearly impossible.
LOG.error(
"A request containing more than one row is over the request size limit of "
+ maxRequestSize
+ ". This is unexpected. All rows in the request will be sent to the failed-rows PCollection.");
}
for (int i = 0; i < splitValue.getProtoRows().getSerializedRowsCount(); ++i) {
org.joda.time.Instant timestamp = splitValue.getTimestamps().get(i);
TableRow failedRow = splitValue.getFailsafeTableRows().get(i);
if (failedRow == null) {
ByteString rowBytes = splitValue.getProtoRows().getSerializedRows(i);
failedRow = appendClientInfo.get().toTableRow(rowBytes, Predicates.alwaysTrue());
}
o.get(failedRowsTag)
.outputWithTimestamp(
new BigQueryStorageApiInsertError(
failedRow, "Row payload too large. Maximum size " + maxRequestSize),
timestamp);
}
int numRowsFailed = splitValue.getProtoRows().getSerializedRowsCount();
rowsSentToFailedRowsCollection.inc(numRowsFailed);
BigQuerySinkMetrics.appendRowsRowStatusCounter(
BigQuerySinkMetrics.RowStatus.FAILED,
BigQuerySinkMetrics.PAYLOAD_TOO_LARGE,
shortTableId)
.inc(numRowsFailed);
} else {
++numAppends;
// RetryManager
AppendRowsContext context =
new AppendRowsContext(
element.getKey(),
splitValue.getProtoRows(),
splitValue.getTimestamps(),
splitValue.getFailsafeTableRows());
contexts.add(context);
retryManager.addOperation(runOperation, onError, onSuccess, context);
recordsAppended.inc(splitValue.getProtoRows().getSerializedRowsCount());
appendSizeDistribution.update(context.protoRows.getSerializedRowsCount());
}
}
if (numAppends > 0) {
initializeContexts.accept(contexts, false);
try {
retryManager.run(true);
} finally {
// Make sure that all pins are removed.
for (AppendRowsContext context : contexts) {
if (context.client != null) {
runAsyncIgnoreFailure(closeWriterExecutor, context.client::unpin);
}
}
}
appendSplitDistribution.update(numAppends);
if (autoUpdateSchema) {
@Nullable
StreamAppendClient streamAppendClient = appendClientInfo.get().getStreamAppendClient();
TableSchema originalSchema = appendClientInfo.get().getTableSchema();
;
@Nullable
TableSchema updatedSchemaReturned =
(streamAppendClient != null) ? streamAppendClient.getUpdatedSchema() : null;
// Update the table schema and clear the append client.
if (updatedSchemaReturned != null) {
Optional<TableSchema> newSchema =
TableSchemaUpdateUtils.getUpdatedSchema(originalSchema, updatedSchemaReturned);
if (newSchema.isPresent()) {
appendClientInfo.set(
AppendClientInfo.of(
newSchema.get(), appendClientInfo.get().getCloseAppendClient(), false));
APPEND_CLIENTS.invalidate(element.getKey());
APPEND_CLIENTS.put(element.getKey(), appendClientInfo.get());
LOG.debug(
"Fetched updated schema for table {}:\n\t{}", tableId, updatedSchemaReturned);
updatedSchema.write(newSchema.get());
}
}
}
java.time.Duration timeElapsed = java.time.Duration.between(now, Instant.now());
appendLatencyDistribution.update(timeElapsed.toMillis());
}
idleTimer.offset(streamIdleTime).withNoOutputTimestamp().setRelative();
}