long flush()

in sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java [622:913]


      long flush(
          RetryManager<AppendRowsResponse, AppendRowsContext> retryManager,
          OutputReceiver<BigQueryStorageApiInsertError> failedRowsReceiver,
          @Nullable OutputReceiver<TableRow> successfulRowsReceiver)
          throws Exception {
        if (pendingMessages.isEmpty()) {
          return 0;
        }

        final ProtoRows.Builder insertsBuilder = ProtoRows.newBuilder();
        insertsBuilder.addAllSerializedRows(pendingMessages);
        pendingMessages.clear();
        final ProtoRows inserts = insertsBuilder.build();
        List<org.joda.time.Instant> insertTimestamps = pendingTimestamps;
        List<@Nullable TableRow> failsafeTableRows = pendingFailsafeTableRows;
        pendingTimestamps = Lists.newArrayList();
        pendingFailsafeTableRows = Lists.newArrayList();

        // Handle the case where the request is too large.
        if (inserts.getSerializedSize() >= maxRequestSize) {
          if (inserts.getSerializedRowsCount() > 1) {
            // TODO(reuvenlax): Is it worth trying to handle this case by splitting the protoRows?
            // Given that we split
            // the ProtoRows iterable at 2MB and the max request size is 10MB, this scenario seems
            // nearly impossible.
            LOG.error(
                "A request containing more than one row is over the request size limit of {}. "
                    + "This is unexpected. All rows in the request will be sent to the failed-rows PCollection.",
                maxRequestSize);
          }
          for (int i = 0; i < inserts.getSerializedRowsCount(); ++i) {
            @Nullable TableRow failedRow = failsafeTableRows.get(i);
            if (failedRow == null) {
              ByteString rowBytes = inserts.getSerializedRows(i);
              failedRow =
                  TableRowToStorageApiProto.tableRowFromMessage(
                      DynamicMessage.parseFrom(
                          TableRowToStorageApiProto.wrapDescriptorProto(
                              getAppendClientInfo(true, null).getDescriptor()),
                          rowBytes),
                      true,
                      successfulRowsPredicate);
            }
            org.joda.time.Instant timestamp = insertTimestamps.get(i);
            failedRowsReceiver.outputWithTimestamp(
                new BigQueryStorageApiInsertError(
                    failedRow, "Row payload too large. Maximum size " + maxRequestSize),
                timestamp);
          }
          int numRowsFailed = inserts.getSerializedRowsCount();
          BigQuerySinkMetrics.appendRowsRowStatusCounter(
                  BigQuerySinkMetrics.RowStatus.FAILED,
                  BigQuerySinkMetrics.PAYLOAD_TOO_LARGE,
                  shortTableUrn)
              .inc(numRowsFailed);
          rowsSentToFailedRowsCollection.inc(numRowsFailed);
          return 0;
        }

        long offset = -1;
        if (!this.useDefaultStream) {
          getOrCreateStreamName(); // Force creation of the stream before we get offsets.
          offset = this.currentOffset;
          this.currentOffset += inserts.getSerializedRowsCount();
        }
        AppendRowsContext appendRowsContext =
            new AppendRowsContext(offset, inserts, insertTimestamps, failsafeTableRows);

        retryManager.addOperation(
            c -> {
              if (c.protoRows.getSerializedRowsCount() == 0) {
                // This might happen if all rows in a batch failed and were sent to the failed-rows
                // PCollection.
                return ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().build());
              }
              try {
                StreamAppendClient writeStream =
                    Preconditions.checkStateNotNull(
                        getAppendClientInfo(true, null).getStreamAppendClient());
                ApiFuture<AppendRowsResponse> response =
                    writeStream.appendRows(c.offset, c.protoRows);
                inflightWaitSecondsDistribution.update(writeStream.getInflightWaitSeconds());
                if (!usingMultiplexing) {
                  if (writeStream.getInflightWaitSeconds() > 5) {
                    LOG.warn(
                        "Storage Api write delay more than {} seconds.",
                        writeStream.getInflightWaitSeconds());
                  }
                }
                return response;
              } catch (Exception e) {
                throw new RuntimeException(e);
              }
            },
            contexts -> {
              AppendRowsContext failedContext =
                  Preconditions.checkStateNotNull(Iterables.getFirst(contexts, null));
              BigQuerySinkMetrics.reportFailedRPCMetrics(
                  failedContext, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, shortTableUrn);
              String errorCode =
                  BigQuerySinkMetrics.throwableToGRPCCodeString(failedContext.getError());

              if (failedContext.getError() != null
                  && failedContext.getError() instanceof Exceptions.AppendSerializationError) {
                Exceptions.AppendSerializationError error =
                    Preconditions.checkStateNotNull(
                        (Exceptions.AppendSerializationError) failedContext.getError());

                Set<Integer> failedRowIndices = error.getRowIndexToErrorMessage().keySet();
                for (int failedIndex : failedRowIndices) {
                  // Convert the message to a TableRow and send it to the failedRows collection.
                  BigQueryStorageApiInsertError element = null;
                  org.joda.time.Instant timestamp = failedContext.timestamps.get(failedIndex);
                  try {
                    TableRow failedRow = failedContext.failsafeTableRows.get(failedIndex);
                    if (failedRow == null) {
                      ByteString protoBytes =
                          failedContext.protoRows.getSerializedRows(failedIndex);
                      failedRow =
                          TableRowToStorageApiProto.tableRowFromMessage(
                              DynamicMessage.parseFrom(
                                  TableRowToStorageApiProto.wrapDescriptorProto(
                                      Preconditions.checkStateNotNull(appendClientInfo)
                                          .getDescriptor()),
                                  protoBytes),
                              true,
                              Predicates.alwaysTrue());
                    }
                    element =
                        new BigQueryStorageApiInsertError(
                            failedRow, error.getRowIndexToErrorMessage().get(failedIndex));
                  } catch (Exception e) {
                    LOG.error("Failed to insert row and could not parse the result!", e);
                  }
                  // output outside try {} clause to avoid suppress downstream Exception
                  if (element != null) {
                    failedRowsReceiver.outputWithTimestamp(element, timestamp);
                  }
                }
                int numRowsFailed = failedRowIndices.size();
                rowsSentToFailedRowsCollection.inc(numRowsFailed);
                BigQuerySinkMetrics.appendRowsRowStatusCounter(
                        BigQuerySinkMetrics.RowStatus.FAILED, errorCode, shortTableUrn)
                    .inc(numRowsFailed);

                // Remove the failed row from the payload, so we retry the batch without the failed
                // rows.
                ProtoRows.Builder retryRows = ProtoRows.newBuilder();
                List<org.joda.time.Instant> retryTimestamps = Lists.newArrayList();
                for (int i = 0; i < failedContext.protoRows.getSerializedRowsCount(); ++i) {
                  if (!failedRowIndices.contains(i)) {
                    ByteString rowBytes = failedContext.protoRows.getSerializedRows(i);
                    retryRows.addSerializedRows(rowBytes);
                    retryTimestamps.add(failedContext.timestamps.get(i));
                  }
                }
                failedContext.protoRows = retryRows.build();
                failedContext.timestamps = retryTimestamps;
                int numRowsRetried = failedContext.protoRows.getSerializedRowsCount();
                BigQuerySinkMetrics.appendRowsRowStatusCounter(
                        BigQuerySinkMetrics.RowStatus.RETRIED, errorCode, shortTableUrn)
                    .inc(numRowsRetried);

                // Since we removed rows, we need to update the insert offsets for all remaining
                // rows.
                long newOffset = failedContext.offset;
                for (AppendRowsContext context : contexts) {
                  context.offset = newOffset;
                  newOffset += context.protoRows.getSerializedRowsCount();
                }
                this.currentOffset = newOffset;
                return RetryType.RETRY_ALL_OPERATIONS;
              }

              LOG.warn(
                  "Append to stream {} by client #{} failed with error, operations will be retried.\n{}",
                  streamName,
                  clientNumber,
                  retrieveErrorDetails(contexts));
              failedContext.failureCount += 1;

              boolean quotaError = false;
              Throwable error = failedContext.getError();
              Status.Code statusCode = Status.Code.OK;
              if (error != null) {
                statusCode = Status.fromThrowable(error).getCode();
                quotaError = statusCode.equals(Status.Code.RESOURCE_EXHAUSTED);
              }

              int allowedRetry;

              if (!quotaError) {
                // This forces us to close and reopen all gRPC connections to Storage API on error,
                // which empirically fixes random stuckness issues.
                invalidateWriteStream();
                allowedRetry = 5;
              } else {
                allowedRetry = 35;
              }

              // Maximum number of times we retry before we fail the work item.
              if (failedContext.failureCount > allowedRetry) {
                throw new RuntimeException(
                    String.format(
                        "More than %d attempts to call AppendRows failed.", allowedRetry));
              }

              // The following errors are known to be persistent, so always fail the work item in
              // this case.
              if (statusCode.equals(Status.Code.OUT_OF_RANGE)
                  || statusCode.equals(Status.Code.ALREADY_EXISTS)) {
                throw new RuntimeException(
                    "Append to stream "
                        + this.streamName
                        + " failed with invalid "
                        + "offset of "
                        + failedContext.offset);
              }

              boolean hasPersistentErrors =
                  failedContext.getError() instanceof Exceptions.StreamFinalizedException
                      || statusCode.equals(Status.Code.INVALID_ARGUMENT)
                      || statusCode.equals(Status.Code.NOT_FOUND)
                      || statusCode.equals(Status.Code.FAILED_PRECONDITION);
              if (hasPersistentErrors) {
                throw new RuntimeException(
                    String.format(
                        "Append to stream %s failed with Status Code %s. The stream may not exist.",
                        this.streamName, statusCode),
                    error);
              }
              // TODO: Only do this on explicit NOT_FOUND errors once BigQuery reliably produces
              // them.
              try {
                tryCreateTable.call();
              } catch (Exception e) {
                throw new RuntimeException(e);
              }

              int numRowsRetried = failedContext.protoRows.getSerializedRowsCount();
              BigQuerySinkMetrics.appendRowsRowStatusCounter(
                      BigQuerySinkMetrics.RowStatus.RETRIED, errorCode, shortTableUrn)
                  .inc(numRowsRetried);

              appendFailures.inc();
              return RetryType.RETRY_ALL_OPERATIONS;
            },
            c -> {
              int numRecordsAppended = c.protoRows.getSerializedRowsCount();
              recordsAppended.inc(numRecordsAppended);
              BigQuerySinkMetrics.appendRowsRowStatusCounter(
                      BigQuerySinkMetrics.RowStatus.SUCCESSFUL,
                      BigQuerySinkMetrics.OK,
                      shortTableUrn)
                  .inc(numRecordsAppended);

              BigQuerySinkMetrics.reportSuccessfulRpcMetrics(
                  c, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, shortTableUrn);

              if (successfulRowsReceiver != null) {
                Descriptor descriptor = null;
                try {
                  descriptor =
                      TableRowToStorageApiProto.wrapDescriptorProto(
                          Preconditions.checkStateNotNull(appendClientInfo).getDescriptor());
                } catch (DescriptorValidationException e) {
                  LOG.warn(
                      "Failure getting proto descriptor. Successful output will not be produced.",
                      e);
                }
                if (descriptor != null) {
                  for (int i = 0; i < c.protoRows.getSerializedRowsCount(); ++i) {
                    ByteString rowBytes = c.protoRows.getSerializedRowsList().get(i);
                    try {
                      TableRow row =
                          TableRowToStorageApiProto.tableRowFromMessage(
                              DynamicMessage.parseFrom(descriptor, rowBytes),
                              true,
                              successfulRowsPredicate);
                      org.joda.time.Instant timestamp = c.timestamps.get(i);
                      successfulRowsReceiver.outputWithTimestamp(row, timestamp);
                    } catch (Exception e) {
                      LOG.warn("Failure parsing TableRow", e);
                    }
                  }
                }
              }
            },
            appendRowsContext);
        maybeTickleCache();
        return inserts.getSerializedRowsCount();
      }